diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/README.md | 1 | ||||
-rw-r--r-- | streaming/compression.c | 305 | ||||
-rw-r--r-- | streaming/receiver.c | 449 | ||||
-rw-r--r-- | streaming/replication.c | 46 | ||||
-rw-r--r-- | streaming/replication.h | 2 | ||||
-rw-r--r-- | streaming/rrdpush.c | 211 | ||||
-rw-r--r-- | streaming/rrdpush.h | 493 | ||||
-rw-r--r-- | streaming/sender.c | 342 | ||||
-rw-r--r-- | streaming/stream.conf | 10 |
9 files changed, 972 insertions, 887 deletions
diff --git a/streaming/README.md b/streaming/README.md index bf11f32e..370186ac 100644 --- a/streaming/README.md +++ b/streaming/README.md @@ -55,6 +55,7 @@ node**. This file is automatically generated by Netdata the first time it is sta | [`default memory mode`](#default-memory-mode) | `ram` | The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `map`, `save`, `ram`, or `none`. [Read more →](#default-memory-mode) | | `health enabled by default` | `auto` | Whether alarms and notifications should be enabled for nodes using this `API_KEY`. `auto` enables alarms when the child is connected. `yes` enables alarms always, and `no` disables alarms. | | `default postpone alarms on connect seconds` | `60` | Postpone alarms and notifications for a period of time after the child connects. | +| `default health log history` | `432000` | History of health log events (in seconds) kept in the database. | | `default proxy enabled` | ` ` | Route metrics through a proxy. | | `default proxy destination` | ` ` | Space-separated list of `IP:PORT` for proxies. | | `default proxy api key` | ` ` | The `API_KEY` of the proxy. | diff --git a/streaming/compression.c b/streaming/compression.c index 8f2517a8..6d4a128b 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -1,59 +1,44 @@ #include "rrdpush.h" -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION #include "lz4.h" #define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION" -// signature MUST end with a newline -#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) -#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) -#define SIGNATURE_SIZE 4 - - -/* - * LZ4 streaming API compressor specific data - */ -struct compressor_data { - LZ4_stream_t *stream; - char *input_ring_buffer; - size_t input_ring_buffer_size; - size_t input_ring_buffer_pos; -}; - - /* * Reset compressor state for a new stream */ -static void lz4_compressor_reset(struct compressor_state *state) -{ - if (state->data) { - if (state->data->stream) { - LZ4_resetStream_fast(state->data->stream); - internal_error(true, "%s: compressor reset", STREAM_COMPRESSION_MSG); - } - state->data->input_ring_buffer_pos = 0; +void rrdpush_compressor_reset(struct compressor_state *state) { + if(!state->initialized) { + state->initialized = true; + + state->stream.lz4_stream = LZ4_createStream(); + state->stream.input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2); + state->stream.input_ring_buffer = callocz(1, state->stream.input_ring_buffer_size); + state->compression_result_buffer_size = 0; } + + LZ4_resetStream_fast(state->stream.lz4_stream); + + state->stream.input_ring_buffer_pos = 0; } /* * Destroy compressor state and all related data */ -static void lz4_compressor_destroy(struct compressor_state **state) -{ - if (state && *state) { - struct compressor_state *s = *state; - if (s->data) { - if (s->data->stream) - LZ4_freeStream(s->data->stream); - freez(s->data->input_ring_buffer); - freez(s->data); - } - freez(s->compression_result_buffer); - freez(s); - *state = NULL; - debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG); +void rrdpush_compressor_destroy(struct compressor_state *state) { + if (state->stream.lz4_stream) { + LZ4_freeStream(state->stream.lz4_stream); + state->stream.lz4_stream = NULL; } + + freez(state->stream.input_ring_buffer); + state->stream.input_ring_buffer = NULL; + + freez(state->compression_result_buffer); + state->compression_result_buffer = NULL; + + state->initialized = false; } /* @@ -62,18 +47,18 @@ static void lz4_compressor_destroy(struct compressor_state **state) * Return the size of compressed data block as result and the pointer to internal buffer using the last argument * or 0 in case of error */ -static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out) -{ +size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out) { if(unlikely(!state || !size || !out)) return 0; if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) { - error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE); + netdata_log_error("RRDPUSH COMPRESS: Compression Failed - Message size %lu above compression buffer limit: %d", + (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE); return 0; } size_t max_dst_size = LZ4_COMPRESSBOUND(size); - size_t data_size = max_dst_size + SIGNATURE_SIZE; + size_t data_size = max_dst_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; if (!state->compression_result_buffer) { state->compression_result_buffer = mallocz(data_size); @@ -85,238 +70,112 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char } // the ring buffer always has space for LZ4_MAX_MSG_SIZE - memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size); + memcpy(state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, data, size); // this call needs the last 64K of our previous data // they are available in the ring buffer long int compressed_data_size = LZ4_compress_fast_continue( - state->data->stream, - state->data->input_ring_buffer + state->data->input_ring_buffer_pos, - state->compression_result_buffer + SIGNATURE_SIZE, - size, - max_dst_size, + state->stream.lz4_stream, + state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, + state->compression_result_buffer + RRDPUSH_COMPRESSION_SIGNATURE_SIZE, + (int)size, + (int)max_dst_size, 1); if (compressed_data_size < 0) { - error("Data compression error: %ld", compressed_data_size); + netdata_log_error("Data compression error: %ld", compressed_data_size); return 0; } // update the next writing position of the ring buffer - state->data->input_ring_buffer_pos += size; - if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE)) - state->data->input_ring_buffer_pos = 0; + state->stream.input_ring_buffer_pos += size; + if(unlikely(state->stream.input_ring_buffer_pos >= state->stream.input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE)) + state->stream.input_ring_buffer_pos = 0; // update the signature header uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; - *(uint32_t *)state->compression_result_buffer = len | SIGNATURE; + *(uint32_t *)state->compression_result_buffer = len | RRDPUSH_COMPRESSION_SIGNATURE; *out = state->compression_result_buffer; - debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size); - return compressed_data_size + SIGNATURE_SIZE; -} - -/* - * Create and initialize compressor state - * Return the pointer to compressor_state structure created - */ -struct compressor_state *create_compressor() -{ - struct compressor_state *state = callocz(1, sizeof(struct compressor_state)); - - state->reset = lz4_compressor_reset; - state->compress = lz4_compressor_compress; - state->destroy = lz4_compressor_destroy; - - state->data = callocz(1, sizeof(struct compressor_data)); - state->data->stream = LZ4_createStream(); - state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2); - state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size); - state->compression_result_buffer_size = 0; - state->reset(state); - debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG); - return state; -} - -/* - * LZ4 streaming API decompressor specific data - */ -struct decompressor_stream { - LZ4_streamDecode_t *lz4_stream; - char *buffer; - size_t size; - size_t write_at; - size_t read_at; -}; - -/* - * Reset decompressor state for a new stream - */ -static void lz4_decompressor_reset(struct decompressor_state *state) -{ - if (state->stream) { - if (state->stream->lz4_stream) - LZ4_setStreamDecode(state->stream->lz4_stream, NULL, 0); - - state->stream->write_at = 0; - state->stream->read_at = 0; - } -} - -/* - * Destroy decompressor state and all related data - */ -static void lz4_decompressor_destroy(struct decompressor_state **state) -{ - if (state && *state) { - struct decompressor_state *s = *state; - if (s->stream) { - debug(D_STREAM, "%s: Destroying decompressor.", STREAM_COMPRESSION_MSG); - if (s->stream->lz4_stream) - LZ4_freeStreamDecode(s->stream->lz4_stream); - freez(s->stream->buffer); - freez(s->stream); - } - freez(s); - *state = NULL; - } -} - -static size_t decode_compress_header(const char *data, size_t data_size) { - if (unlikely(!data || !data_size)) - return 0; - - if (unlikely(data_size != SIGNATURE_SIZE)) - return 0; - - uint32_t sign = *(uint32_t *)data; - if (unlikely((sign & SIGNATURE_MASK) != SIGNATURE)) - return 0; - - size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); - return length; -} - -/* - * Start the collection of compressed data in an internal buffer - * Return the size of compressed data or 0 for uncompressed data - */ -static size_t lz4_decompressor_start(struct decompressor_state *state __maybe_unused, const char *header, size_t header_size) { - if(unlikely(state->stream->read_at != state->stream->write_at)) - fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!" - , STREAM_COMPRESSION_MSG); - - return decode_compress_header(header, header_size); + netdata_log_debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size); + return compressed_data_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; } /* * Decompress the compressed data in the internal buffer * Return the size of uncompressed data or 0 for error */ -static size_t lz4_decompressor_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { +size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { if (unlikely(!state || !compressed_data || !compressed_size)) return 0; - if(unlikely(state->stream->read_at != state->stream->write_at)) - fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!" - , STREAM_COMPRESSION_MSG); + if(unlikely(state->stream.read_at != state->stream.write_at)) + fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); - if (unlikely(state->stream->write_at >= state->stream->size / 2)) { - state->stream->write_at = 0; - state->stream->read_at = 0; + if (unlikely(state->stream.write_at >= state->stream.size / 2)) { + state->stream.write_at = 0; + state->stream.read_at = 0; } long int decompressed_size = LZ4_decompress_safe_continue( - state->stream->lz4_stream + state->stream.lz4_stream , compressed_data - , state->stream->buffer + state->stream->write_at + , state->stream.buffer + state->stream.write_at , (int)compressed_size - , (int)(state->stream->size - state->stream->write_at) + , (int)(state->stream.size - state->stream.write_at) ); if (unlikely(decompressed_size < 0)) { - error("%s: decompressor returned negative decompressed bytes: %ld", STREAM_COMPRESSION_MSG, decompressed_size); + netdata_log_error("RRDPUSH DECOMPRESS: decompressor returned negative decompressed bytes: %ld", decompressed_size); return 0; } - if(unlikely(decompressed_size + state->stream->write_at > state->stream->size)) - fatal("%s: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu" - , STREAM_COMPRESSION_MSG - , state->stream->size - , state->stream->write_at + if(unlikely(decompressed_size + state->stream.write_at > state->stream.size)) + fatal("RRDPUSH DECOMPRESS: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, " + "exceeding the buffer by %zu" + , state->stream.size + , state->stream.write_at , decompressed_size - , (size_t)(state->stream->write_at + decompressed_size - state->stream->size) + , (size_t)(state->stream.write_at + decompressed_size - state->stream.size) ); - state->stream->write_at += decompressed_size; + state->stream.write_at += decompressed_size; // statistics - state->total_compressed += compressed_size + SIGNATURE_SIZE; + state->total_compressed += compressed_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; state->total_uncompressed += decompressed_size; state->packet_count++; return decompressed_size; } -/* - * Return the size of uncompressed data left in the internal buffer or 0 for error - */ -static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state) { - if(unlikely(state->stream->read_at > state->stream->write_at)) - fatal("%s: invalid read/write stream positions" - , STREAM_COMPRESSION_MSG); - - return state->stream->write_at - state->stream->read_at; -} +void rrdpush_decompressor_reset(struct decompressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream.lz4_stream = LZ4_createStreamDecode(); + state->stream.size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2; + state->stream.buffer = mallocz(state->stream.size); + } -/* - * Fill the buffer provided with uncompressed data from the internal buffer - * Return the size of uncompressed data copied or 0 for error - */ -static size_t lz4_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { - if (unlikely(!state || !size || !dst)) - return 0; + LZ4_setStreamDecode(state->stream.lz4_stream, NULL, 0); - size_t remaining = lz4_decompressor_decompressed_bytes_in_buffer(state); - if(unlikely(!remaining)) - return 0; + state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE; + state->stream.write_at = 0; + state->stream.read_at = 0; +} - size_t bytes_to_return = size; - if(bytes_to_return > remaining) - bytes_to_return = remaining; +void rrdpush_decompressor_destroy(struct decompressor_state *state) { + if(unlikely(!state->initialized)) + return; - memcpy(dst, state->stream->buffer + state->stream->read_at, bytes_to_return); - state->stream->read_at += bytes_to_return; + if (state->stream.lz4_stream) { + LZ4_freeStreamDecode(state->stream.lz4_stream); + state->stream.lz4_stream = NULL; + } - if(unlikely(state->stream->read_at > state->stream->write_at)) - fatal("%s: invalid read/write stream positions" - , STREAM_COMPRESSION_MSG); + freez(state->stream.buffer); + state->stream.buffer = NULL; - return bytes_to_return; + state->initialized = false; } -/* - * Create and initialize decompressor state - * Return the pointer to decompressor_state structure created - */ -struct decompressor_state *create_decompressor() -{ - struct decompressor_state *state = callocz(1, sizeof(struct decompressor_state)); - state->signature_size = SIGNATURE_SIZE; - state->reset = lz4_decompressor_reset; - state->start = lz4_decompressor_start; - state->decompress = lz4_decompressor_decompress; - state->get = lz4_decompressor_get; - state->decompressed_bytes_in_buffer = lz4_decompressor_decompressed_bytes_in_buffer; - state->destroy = lz4_decompressor_destroy; - - state->stream = callocz(1, sizeof(struct decompressor_stream)); - fatal_assert(state->stream); - state->stream->lz4_stream = LZ4_createStreamDecode(); - state->stream->size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2; - state->stream->buffer = mallocz(state->stream->size); - fatal_assert(state->stream->buffer); - state->reset(state); - debug(D_STREAM, "%s: Initialize streaming decompression!", STREAM_COMPRESSION_MSG); - return state; -} #endif diff --git a/streaming/receiver.c b/streaming/receiver.c index 709f15bd..3ff022e9 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -2,17 +2,6 @@ #include "rrdpush.h" -// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly -#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) -#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) - -// this has to be the same at parser.h -#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - -#if WORKER_PARSER_FIRST_JOB < 1 -#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 -#endif - extern struct config stream_config; void receiver_state_free(struct receiver_state *rpt) { @@ -39,13 +28,12 @@ void receiver_state_free(struct receiver_state *rpt) { close(rpt->fd); } -#ifdef ENABLE_COMPRESSION - if (rpt->decompressor) - rpt->decompressor->destroy(&rpt->decompressor); +#ifdef ENABLE_RRDPUSH_COMPRESSION + rrdpush_decompressor_destroy(&rpt->decompressor); #endif if(rpt->system_info) - rrdhost_system_info_free(rpt->system_info); + rrdhost_system_info_free(rpt->system_info); __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); @@ -54,125 +42,96 @@ void receiver_state_free(struct receiver_state *rpt) { #include "collectors/plugins.d/pluginsd_parser.h" -PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) -{ - const char *host_uuid_str = get_word(words, num_words, 1); - const char *claim_id_str = get_word(words, num_words, 2); - - if (!host_uuid_str || !claim_id_str) { - error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", - host_uuid_str ? host_uuid_str : "[unset]", - claim_id_str ? claim_id_str : "[unset]"); - return PARSER_RC_ERROR; - } - - uuid_t uuid; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; - - // We don't need the parsed UUID - // just do it to check the format - if(uuid_parse(host_uuid_str, uuid)) { - error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); - return PARSER_RC_ERROR; - } - if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) { - error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); - return PARSER_RC_ERROR; - } - - if(strcmp(host_uuid_str, host->machine_guid)) { - error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); - return PARSER_RC_OK; //the message is OK problem must be somewhere else - } - - rrdhost_aclk_state_lock(host); - if (host->aclk_state.claimed_id) - freez(host->aclk_state.claimed_id); - host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - rrdhost_aclk_state_unlock(host); - - rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); +// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) +#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2) - rrdpush_claimed_id(host); +// this has to be the same at parser.h +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - return PARSER_RC_OK; -} +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif -static int read_stream(struct receiver_state *r, char* buffer, size_t size) { +static inline int read_stream(struct receiver_state *r, char* buffer, size_t size) { if(unlikely(!size)) { internal_error(true, "%s() asked to read zero bytes", __FUNCTION__); return 0; } + int tries = 100; ssize_t bytes_read; + do { + errno = 0; + #ifdef ENABLE_HTTPS - if (SSL_connection(&r->ssl)) - bytes_read = netdata_ssl_read(&r->ssl, buffer, size); - else - bytes_read = read(r->fd, buffer, size); + if (SSL_connection(&r->ssl)) + bytes_read = netdata_ssl_read(&r->ssl, buffer, size); + else + bytes_read = read(r->fd, buffer, size); #else - bytes_read = read(r->fd, buffer, size); + bytes_read = read(r->fd, buffer, size); #endif + } while(bytes_read < 0 && errno == EINTR && tries--); + if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) { - error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); + netdata_log_error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__); bytes_read = -3; } else if (bytes_read == 0) { - error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__); + netdata_log_error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__); bytes_read = -1; } else if (bytes_read < 0) { - error("STREAM: %s() failed to read from socket!", __FUNCTION__); + netdata_log_error("STREAM: %s() failed to read from socket!", __FUNCTION__); bytes_read = -2; } return (int)bytes_read; } -static bool receiver_read_uncompressed(struct receiver_state *r) { +static inline bool receiver_read_uncompressed(struct receiver_state *r) { #ifdef NETDATA_INTERNAL_CHECKS - if(r->read_buffer[r->read_len] != '\0') + if(r->reader.read_buffer[r->reader.read_len] != '\0') fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); #endif - int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); + int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1); if(unlikely(bytes_read <= 0)) return false; worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); - r->read_len += bytes_read; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += bytes_read; + r->reader.read_buffer[r->reader.read_len] = '\0'; return true; } -#ifdef ENABLE_COMPRESSION -static bool receiver_read_compressed(struct receiver_state *r) { +#ifdef ENABLE_RRDPUSH_COMPRESSION +static inline bool receiver_read_compressed(struct receiver_state *r) { -#ifdef NETDATA_INTERNAL_CHECKS - if(r->read_buffer[r->read_len] != '\0') - fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ ); -#endif + internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0', + "%s: read_buffer does not start with zero #2", __FUNCTION__ ); // first use any available uncompressed data - if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { - size_t available = sizeof(r->read_buffer) - r->read_len - 1; - if (available) { - size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); - if (!len) { + if (likely(rrdpush_decompressed_bytes_in_buffer(&r->decompressor))) { + size_t available = sizeof(r->reader.read_buffer) - r->reader.read_len - 1; + if (likely(available)) { + size_t len = rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, available); + if (unlikely(!len)) { internal_error(true, "decompressor returned zero length #1"); return false; } - r->read_len += (int)len; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += (int)len; + r->reader.read_buffer[r->reader.read_len] = '\0'; } else - internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len); + internal_fatal(true, "The line to read is too big! Already have %zd bytes in read_buffer.", r->reader.read_len); return true; } @@ -180,8 +139,9 @@ static bool receiver_read_compressed(struct receiver_state *r) { // no decompressed data available // read the compression signature of the next block - if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) { - internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len); + if(unlikely(r->reader.read_len + r->decompressor.signature_size > sizeof(r->reader.read_buffer) - 1)) { + internal_error(true, "The last incomplete line does not leave enough room for the next compression header! " + "Already have %zd bytes in read_buffer.", r->reader.read_len); return false; } @@ -189,34 +149,34 @@ static bool receiver_read_compressed(struct receiver_state *r) { // we have to do a loop here, because read_stream() may return less than the data we need int bytes_read = 0; do { - int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read); + int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read); if (unlikely(ret <= 0)) return false; bytes_read += ret; - } while(unlikely(bytes_read < (int)r->decompressor->signature_size)); + } while(unlikely(bytes_read < (int)r->decompressor.signature_size)); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); - if(unlikely(bytes_read != (int)r->decompressor->signature_size)) - fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size); + if(unlikely(bytes_read != (int)r->decompressor.signature_size)) + fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor.signature_size); - size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read); + size_t compressed_message_size = rrdpush_decompressor_start(&r->decompressor, r->reader.read_buffer + r->reader.read_len, bytes_read); if (unlikely(!compressed_message_size)) { internal_error(true, "multiplexed uncompressed data in compressed stream!"); - r->read_len += bytes_read; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += bytes_read; + r->reader.read_buffer[r->reader.read_len] = '\0'; return true; } if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) { - error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.", + netdata_log_error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.", compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE); return false; } // delete compression header from our read buffer - r->read_buffer[r->read_len] = '\0'; + r->reader.read_buffer[r->reader.read_len] = '\0'; // Read the entire compressed block of compressed data char compressed[compressed_message_size]; @@ -238,8 +198,8 @@ static bool receiver_read_compressed(struct receiver_state *r) { worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read); // decompress the compressed block - size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read); - if (!bytes_to_parse) { + size_t bytes_to_parse = rrdpush_decompress(&r->decompressor, compressed, compressed_bytes_read); + if (unlikely(!bytes_to_parse)) { internal_error(true, "no bytes to parse."); return false; } @@ -247,38 +207,38 @@ static bool receiver_read_compressed(struct receiver_state *r) { worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse); // fill read buffer with decompressed data - size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1); - if (!len) { + size_t len = (int) rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1); + if (unlikely(!len)) { internal_error(true, "decompressor returned zero length #2"); return false; } - r->read_len += (int)len; - r->read_buffer[r->read_len] = '\0'; + r->reader.read_len += (int)len; + r->reader.read_buffer[r->reader.read_len] = '\0'; return true; } -#else // !ENABLE_COMPRESSION -static bool receiver_read_compressed(struct receiver_state *r) { +#else // !ENABLE_RRDPUSH_COMPRESSION +static inline bool receiver_read_compressed(struct receiver_state *r) { return receiver_read_uncompressed(r); } -#endif // ENABLE_COMPRESSION +#endif // ENABLE_RRDPUSH_COMPRESSION /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { - size_t start = *pos; +inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) { + size_t start = reader->pos; - char *ss = &r->read_buffer[start]; - char *se = &r->read_buffer[r->read_len]; - char *ds = buffer; - char *de = &buffer[buffer_length - 2]; + char *ss = &reader->read_buffer[start]; + char *se = &reader->read_buffer[reader->read_len]; + char *ds = dst; + char *de = &dst[dst_size - 2]; if(ss >= se) { *ds = '\0'; - *pos = 0; - r->read_len = 0; - r->read_buffer[r->read_len] = '\0'; + reader->pos = 0; + reader->read_len = 0; + reader->read_buffer[reader->read_len] = '\0'; return NULL; } @@ -293,44 +253,73 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b *ds++ = *ss++; // copy the newline too *ds = '\0'; - *pos = ss - r->read_buffer; - return buffer; + reader->pos = ss - reader->read_buffer; + return dst; } // if the destination is full, oops! if(ds == de) { - error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); + netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); *ds = '\0'; - *pos = ss - r->read_buffer; - return buffer; + reader->pos = ss - reader->read_buffer; + return dst; } // no newline found in the r->read_buffer // move everything to the beginning - memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start); - r->read_len -= (int)start; - r->read_buffer[r->read_len] = '\0'; + memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start); + reader->read_len -= (int)start; + reader->read_buffer[reader->read_len] = '\0'; *ds = '\0'; - *pos = 0; + reader->pos = 0; return NULL; } bool plugin_is_enabled(struct plugind *cd); +static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) { + if(force || !rpt->exit.reason) + rpt->exit.reason = reason; +} + +static inline bool receiver_should_stop(struct receiver_state *rpt) { + static __thread size_t counter = 0; + + if(unlikely(rpt->exit.shutdown)) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); + return true; + } + + if(unlikely(!service_running(SERVICE_STREAMING))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, false); + return true; + } + + if(unlikely((counter++ % 1000) == 0)) { + // check every 1000 lines read + netdata_thread_testcancel(); + rpt->last_msg_t = now_monotonic_sec(); + } + + return false; +} + static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { - size_t result; - - PARSER_USER_OBJECT user = { - .enabled = plugin_is_enabled(cd), - .host = rpt->host, - .opaque = rpt, - .cd = cd, - .trust_durations = 1, - .capabilities = rpt->capabilities, - }; + size_t result = 0; - PARSER *parser = parser_init(&user, NULL, NULL, fd, - PARSER_INPUT_SPLIT, ssl); + PARSER *parser = NULL; + { + PARSER_USER_OBJECT user = { + .enabled = plugin_is_enabled(cd), + .host = rpt->host, + .opaque = rpt, + .cd = cd, + .trust_durations = 1, + .capabilities = rpt->capabilities, + }; + + parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl); + } pluginsd_keywords_init(parser, PARSER_INIT_STREAMING); @@ -340,72 +329,41 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - - user.parser = parser; - bool compressed_connection = false; -#ifdef ENABLE_COMPRESSION + +#ifdef ENABLE_RRDPUSH_COMPRESSION if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { compressed_connection = true; - - if (!rpt->decompressor) - rpt->decompressor = create_decompressor(); - else - rpt->decompressor->reset(rpt->decompressor); + rrdpush_decompressor_reset(&rpt->decompressor); } + else + rrdpush_decompressor_destroy(&rpt->decompressor); #endif - rpt->read_buffer[0] = '\0'; - rpt->read_len = 0; + buffered_reader_init(&rpt->reader); - size_t read_buffer_start = 0; char buffer[PLUGINSD_LINE_MAX + 2] = ""; - while(service_running(SERVICE_STREAMING)) { - netdata_thread_testcancel(); + while(!receiver_should_stop(rpt)) { - if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { - bool have_new_data; - if(likely(compressed_connection)) - have_new_data = receiver_read_compressed(rpt); - else - have_new_data = receiver_read_uncompressed(rpt); + if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) { + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); if(unlikely(!have_new_data)) { - if(!rpt->exit.reason) - rpt->exit.reason = "SOCKET READ ERROR"; - + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false); break; } - rpt->last_msg_t = now_realtime_sec(); continue; } - if(unlikely(!service_running(SERVICE_STREAMING))) { - if(!rpt->exit.reason) - rpt->exit.reason = "NETDATA EXIT"; - goto done; - } - if(unlikely(rpt->exit.shutdown)) { - if(!rpt->exit.reason) - rpt->exit.reason = "SHUTDOWN REQUESTED"; - - goto done; - } - if (unlikely(parser_action(parser, buffer))) { internal_error(true, "parser_action() failed on keyword '%s'.", buffer); - - if(!rpt->exit.reason) - rpt->exit.reason = "PARSER FAILED"; - + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); break; } } -done: - result = user.data_collections_count; + result = parser->user.data_collections_count; // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -423,67 +381,18 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) { rrdhost_receiver_replicating_charts_zero(host); } -void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { - size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1; - - netdata_mutex_lock(&host->receiver_lock); - - buffer_json_member_add_object(wb, key); - buffer_json_member_add_uint64(wb, "hops", receiver_hops); - - bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - buffer_json_member_add_boolean(wb, "online", online); - - if(host->child_connect_time || host->child_disconnected_time) { - time_t since = MAX(host->child_connect_time, host->child_disconnected_time); - buffer_json_member_add_time_t(wb, "since", since); - buffer_json_member_add_time_t(wb, "age", now - since); - } - - if(!online && host->rrdpush_last_receiver_exit_reason) - buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason); - - if(host != localhost && host->receiver) { - buffer_json_member_add_object(wb, "replication"); - { - size_t instances = rrdhost_receiver_replicating_charts(host); - buffer_json_member_add_boolean(wb, "in_progress", instances); - buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent); - buffer_json_member_add_uint64(wb, "instances", instances); - } - buffer_json_object_close(wb); // replication - - buffer_json_member_add_object(wb, "source"); - { - - char buf[1024 + 1]; - SOCKET_PEERS peers = socket_peers(host->receiver->fd); - bool ssl = SSL_connection(&host->receiver->ssl); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "local", buf); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "remote", buf); - - stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities"); - } - buffer_json_object_close(wb); // source - } - buffer_json_object_close(wb); // collection - - netdata_mutex_unlock(&host->receiver_lock); -} - static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { bool signal_rrdcontext = false; bool set_this = false; netdata_mutex_lock(&host->receiver_lock); - if (!host->receiver || host->receiver == rpt) { + if (!host->receiver) { rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + host->rrdpush_receiver_connection_counter++; + __atomic_add_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED); + host->receiver = rpt; rpt->host = host; @@ -495,13 +404,15 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { if (rpt->config.alarms_delay > 0) { host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; - log_health( + netdata_log_health( "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", rrdhost_hostname(host), (int64_t) rpt->config.alarms_delay); } } + host->health_log.health_log_history = rpt->config.alarms_history; + // this is a test // if(rpt->hops <= host->sender->hops) // rrdpush_sender_thread_stop(host, "HOPS MISMATCH", false); @@ -534,6 +445,9 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { // Make sure that we detach this thread and don't kill a freshly arriving receiver if(host->receiver == rpt) { + __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED); + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + host->trigger_chart_obsoletion_check = 0; host->child_connect_time = 0; host->child_disconnected_time = now_realtime_sec(); @@ -541,7 +455,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO) host->health.health_enabled = 0; - rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false); + rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false); signal_rrdcontext = true; rrdpush_receiver_replication_reset(host); @@ -560,7 +474,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) { } } -bool stop_streaming_receiver(RRDHOST *host, const char *reason) { +bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) { bool ret = false; netdata_mutex_lock(&host->receiver_lock); @@ -568,7 +482,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { if(host->receiver) { if(!host->receiver->exit.shutdown) { host->receiver->exit.shutdown = true; - host->receiver->exit.reason = reason; + receiver_set_exit_reason(host->receiver, reason, true); shutdown(host->receiver->fd, SHUT_RDWR); } @@ -586,7 +500,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) { } if(host->receiver) - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "thread %d takes too long to stop, giving up..." , rrdhost_hostname(host) , host->receiver->client_ip, host->receiver->client_port @@ -619,25 +533,20 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", status); - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "%s. " "STATUS: %s%s%s%s" , rpt->hostname , rpt->client_ip, rpt->client_port , msg , status - , rpt->exit.reason?" (":"" - , rpt->exit.reason?rpt->exit.reason:"" - , rpt->exit.reason?")":"" + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" + , stream_handshake_error_to_string(rpt->exit.reason) + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" ); } -static void rrdhost_reset_destinations(RRDHOST *host) { - for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) - d->postpone_reconnection_until = 0; -} - static void rrdpush_receive(struct receiver_state *rpt) { rpt->config.mode = default_rrd_memory_mode; @@ -645,6 +554,7 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.health_enabled = (int)default_health_enabled; rpt->config.alarms_delay = 60; + rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY; rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled; rpt->config.rrdpush_destination = default_rrdpush_destination; @@ -666,7 +576,7 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode))); if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { - error("STREAM '%s' [receive from %s:%s]: " + netdata_log_error("STREAM '%s' [receive from %s:%s]: " "dbengine is not enabled, falling back to default." , rpt->hostname , rpt->client_ip, rpt->client_port @@ -681,6 +591,9 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay); rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay); + rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->key, "default health log history", rpt->config.alarms_history); + rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->machine_guid, "health log history", rpt->config.alarms_history); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled); rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled); @@ -702,12 +615,11 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); -#ifdef ENABLE_COMPRESSION - rpt->config.rrdpush_compression = default_compression_enabled; +#ifdef ENABLE_RRDPUSH_COMPRESSION + rpt->config.rrdpush_compression = default_rrdpush_compression_enabled; rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); - rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled); -#endif //ENABLE_COMPRESSION +#endif // ENABLE_RRDPUSH_COMPRESSION (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); @@ -763,9 +675,9 @@ static void rrdpush_receive(struct receiver_state *rpt) } #ifdef NETDATA_INTERNAL_CHECKS - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "client willing to stream metrics for host '%s' with machine_guid '%s': " - "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" + "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'" , rpt->hostname , rpt->client_ip , rpt->client_port @@ -801,15 +713,15 @@ static void rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - if (!rpt->rrdpush_compression) + if (!rpt->config.rrdpush_compression) rpt->capabilities &= ~STREAM_CAP_COMPRESSION; } -#endif +#endif // ENABLE_RRDPUSH_COMPRESSION { - // info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + // netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); char initial_response[HTTP_HEADER_SIZE]; if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { log_receiver_capabilities(rpt); @@ -828,7 +740,7 @@ static void rrdpush_receive(struct receiver_state *rpt) sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); } - debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); + netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); ssize_t bytes_sent = send_timeout( #ifdef ENABLE_HTTPS &rpt->ssl, @@ -845,7 +757,7 @@ static void rrdpush_receive(struct receiver_state *rpt) { // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "cannot remove the non-blocking flag from socket %d" , rrdhost_hostname(rpt->host) , rpt->client_ip, rpt->client_port @@ -855,7 +767,7 @@ static void rrdpush_receive(struct receiver_state *rpt) timeout.tv_sec = 600; timeout.tv_usec = 0; if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "cannot set timeout for socket %d" , rrdhost_hostname(rpt->host) , rpt->client_ip, rpt->client_port @@ -867,14 +779,14 @@ static void rrdpush_receive(struct receiver_state *rpt) #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected - if (netdata_cloud_setting) + if (netdata_cloud_enabled) aclk_host_state_update(rpt->host, 1); #endif - rrdhost_set_is_parent_label(++localhost->connected_children_count); + rrdhost_set_is_parent_label(); // let it reconnect to parent immediately - rrdhost_reset_destinations(rpt->host); + rrdpush_reset_destinations_postpone_time(rpt->host); size_t count = streaming_parser(rpt, &cd, rpt->fd, #ifdef ENABLE_HTTPS @@ -884,10 +796,7 @@ static void rrdpush_receive(struct receiver_state *rpt) #endif ); - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); - - if(!rpt->exit.reason) - rpt->exit.reason = "PARSER EXIT"; + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false); { char msg[100 + 1]; @@ -898,12 +807,10 @@ static void rrdpush_receive(struct receiver_state *rpt) #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // a child disconnected - if (netdata_cloud_setting) + if (netdata_cloud_enabled) aclk_host_state_update(rpt->host, 0); #endif - rrdhost_set_is_parent_label(--localhost->connected_children_count); - cleanup: ; } @@ -914,13 +821,15 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { rrdhost_clear_receiver(rpt); - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "receive thread ended (task id %d)" , rpt->hostname ? rpt->hostname : "-" , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-" , gettid()); receiver_state_free(rpt); + + rrdhost_set_is_parent_label(); } void *rrdpush_receiver_thread(void *ptr) { @@ -933,7 +842,7 @@ void *rrdpush_receiver_thread(void *ptr) { struct receiver_state *rpt = (struct receiver_state *)ptr; rpt->tid = gettid(); - info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); + netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); rrdpush_receive(rpt); diff --git a/streaming/replication.c b/streaming/replication.c index c6fafc35..0e5a0b40 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -40,9 +40,9 @@ static struct replication_query_statistics replication_queries = { }; struct replication_query_statistics replication_get_query_statistics(void) { - netdata_spinlock_lock(&replication_queries.spinlock); + spinlock_lock(&replication_queries.spinlock); struct replication_query_statistics ret = replication_queries; - netdata_spinlock_unlock(&replication_queries.spinlock); + spinlock_unlock(&replication_queries.spinlock); return ret; } @@ -144,7 +144,7 @@ static struct replication_query *replication_query_prepare( } if(q->query.enable_streaming) { - netdata_spinlock_lock(&st->data_collection_lock); + spinlock_lock(&st->data_collection_lock); q->query.locked_data_collection = true; if (st->last_updated.tv_sec > q->query.before) { @@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare( size_t count = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { - if (unlikely(!rd || !rd_dfe.item || !rd->exposed)) + if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd))) continue; if (unlikely(rd_dfe.counter >= q->dimensions)) { @@ -198,7 +198,7 @@ static struct replication_query *replication_query_prepare( q->query.execute = false; if(q->query.locked_data_collection) { - netdata_spinlock_unlock(&st->data_collection_lock); + spinlock_unlock(&st->data_collection_lock); q->query.locked_data_collection = false; } @@ -216,20 +216,20 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STRE NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; RRDDIM *rd; rrddim_foreach_read(rd, st){ - if (!rd->exposed) continue; + if (!rrddim_check_exposed(rd)) continue; buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); - buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC + - (usec_t) rd->last_collected_time.tv_usec); + buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + + (usec_t) rd->collector.last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); - buffer_print_int64_encoded(wb, encoding, rd->last_collected_value); + buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value); + buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value); + buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value); buffer_fast_strcat(wb, "\n", 1); } rrddim_foreach_done(rd); @@ -248,7 +248,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, replication_send_chart_collection_state(wb, q->st, q->query.capabilities); if(q->query.locked_data_collection) { - netdata_spinlock_unlock(&q->st->data_collection_lock); + spinlock_unlock(&q->st->data_collection_lock); q->query.locked_data_collection = false; } @@ -269,7 +269,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, } if(executed) { - netdata_spinlock_lock(&replication_queries.spinlock); + spinlock_lock(&replication_queries.spinlock); replication_queries.queries_started += queries; replication_queries.queries_finished += queries; replication_queries.points_read += q->points_read; @@ -280,7 +280,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, s->replication.latest_completed_before_t = q->query.before; } - netdata_spinlock_unlock(&replication_queries.spinlock); + spinlock_unlock(&replication_queries.spinlock); } __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); @@ -678,7 +678,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size } if(locked_data_collection) - netdata_spinlock_unlock(&st->data_collection_lock); + spinlock_unlock(&st->data_collection_lock); return enable_streaming; } @@ -797,9 +797,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c rrdset_id(st), r->wanted.start_streaming ? "true" : "false", (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); - int ret = r->caller.callback(buffer, r->caller.data); + ssize_t ret = r->caller.callback(buffer, r->caller.data); if (ret < 0) { - error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)", + netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)", rrdhost_hostname(r->host), rrdset_id(r->st), ret); return false; } @@ -1056,11 +1056,11 @@ static inline bool replication_recursive_lock_mode(char mode) { if(mode == 'L') { // (L)ock if(++recursions == 1) - netdata_spinlock_lock(&replication_globals.spinlock); + spinlock_lock(&replication_globals.spinlock); } else if(mode == 'U') { // (U)nlock if(--recursions == 0) - netdata_spinlock_unlock(&replication_globals.spinlock); + spinlock_unlock(&replication_globals.spinlock); } else if(mode == 'C') { // (C)heck if(recursions > 0) @@ -1096,7 +1096,7 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) { // ---------------------------------------------------------------------------- // replication sort entry management -static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { +static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse); __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); @@ -1120,7 +1120,7 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { } static void replication_sort_entry_add(struct replication_request *rq) { - if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { + if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; rq->not_indexed_preprocessing = false; @@ -1606,7 +1606,7 @@ static void verify_all_hosts_charts_are_streaming_now(void) { dfe_done(host); size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); - info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", + netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.main_thread.last_executed, errors); replication_globals.main_thread.last_executed = executed; } @@ -1860,7 +1860,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1); if(threads < 1 || threads > MAX_REPLICATION_THREADS) { - error("replication threads given %d is invalid, resetting to 1", threads); + netdata_log_error("replication threads given %d is invalid, resetting to 1", threads); threads = 1; } diff --git a/streaming/replication.h b/streaming/replication.h index f5b64706..507b7c32 100644 --- a/streaming/replication.h +++ b/streaming/replication.h @@ -17,7 +17,7 @@ struct replication_query_statistics replication_get_query_statistics(void); bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before); -typedef int (*send_command)(const char *txt, void *data); +typedef ssize_t (*send_command)(const char *txt, void *data); bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *rh, RRDSET *rs, diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index c481871c..67c43e41 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -39,8 +39,8 @@ struct config stream_config = { }; unsigned int default_rrdpush_enabled = 0; -#ifdef ENABLE_COMPRESSION -unsigned int default_compression_enabled = 1; +#ifdef ENABLE_RRDPUSH_COMPRESSION +unsigned int default_rrdpush_compression_enabled = 1; #endif char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; @@ -57,30 +57,47 @@ static void load_stream_conf() { errno = 0; char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) { - info("CONFIG: cannot load user config '%s'. Will try stock config.", filename); + netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename); freez(filename); filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) - info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); + netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); } freez(filename); } -STREAM_CAPABILITIES stream_our_capabilities() { - return STREAM_CAP_V1 | - STREAM_CAP_V2 | - STREAM_CAP_VN | - STREAM_CAP_VCAPS | - STREAM_CAP_HLABELS | - STREAM_CAP_CLAIM | - STREAM_CAP_CLABELS | - STREAM_CAP_FUNCTIONS | - STREAM_CAP_REPLICATION | - STREAM_CAP_BINARY | - STREAM_CAP_INTERPOLATED | - STREAM_HAS_COMPRESSION | +STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { + + // we can have DATA_WITH_ML when INTERPOLATED is available + bool ml_capability = true; + + if(host && sender) { + // we have DATA_WITH_ML capability + // we should remove the DATA_WITH_ML capability if our database does not have anomaly info + // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML + netdata_mutex_lock(&host->receiver_lock); + + if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) + ml_capability = false; + + netdata_mutex_unlock(&host->receiver_lock); + } + + return STREAM_CAP_V1 | + STREAM_CAP_V2 | + STREAM_CAP_VN | + STREAM_CAP_VCAPS | + STREAM_CAP_HLABELS | + STREAM_CAP_CLAIM | + STREAM_CAP_CLABELS | + STREAM_CAP_FUNCTIONS | + STREAM_CAP_REPLICATION | + STREAM_CAP_BINARY | + STREAM_CAP_INTERPOLATED | + STREAM_HAS_COMPRESSION | (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | + (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | 0; } @@ -125,13 +142,13 @@ int rrdpush_init() { rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s); -#ifdef ENABLE_COMPRESSION - default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, - "enable compression", default_compression_enabled); +#ifdef ENABLE_RRDPUSH_COMPRESSION + default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, + "enable compression", default_rrdpush_compression_enabled); #endif if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { - error("STREAM [send]: cannot enable sending thread - information is missing."); + netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; } @@ -139,7 +156,7 @@ int rrdpush_init() { netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate); if(!netdata_ssl_validate_certificate_sender) - info("SSL: streaming senders will skip SSL certificates verification."); + netdata_log_info("SSL: streaming senders will skip SSL certificates verification."); netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL); netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL); @@ -247,7 +264,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the chart buffer_sprintf( wb - , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" + , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , rrdset_id(st) , name , rrdset_title(st) @@ -274,7 +291,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { rrddim_foreach_read(rd, st) { buffer_sprintf( wb - , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n" + , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" , rrddim_id(rd) , rrddim_name(rd) , rrd_algorithm_name(rd->algorithm) @@ -284,7 +301,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" ); - rd->exposed = 1; + rrddim_set_exposed(rd); } rrddim_foreach_done(rd); @@ -338,14 +355,14 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta RRDDIM *rd; rrddim_foreach_read(rd, st) { - if(unlikely(!rd->updated)) + if(unlikely(!rrddim_check_updated(rd))) continue; - if(likely(rd->exposed)) { + if(likely(rrddim_check_exposed(rd))) { buffer_fast_strcat(wb, "SET \"", 5); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "\" = ", 4); - buffer_print_int64(wb, rd->collected_value); + buffer_print_int64(wb, rd->collector.collected_value); buffer_fast_strcat(wb, "\n", 1); } else { @@ -419,10 +436,10 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); - buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value); + buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); - if((NETDATA_DOUBLE)rd->last_collected_value == n) + if((NETDATA_DOUBLE)rd->collector.last_collected_value == n) buffer_fast_strcat(wb, "#", 1); else buffer_print_netdata_double_encoded(wb, doubles_encoding, n); @@ -462,13 +479,13 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); - error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); } return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; } else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { - info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); + netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } @@ -504,6 +521,7 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); return 1; } + void rrdpush_send_host_labels(RRDHOST *host) { if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !stream_has_capability(host->sender, STREAM_CAP_HLABELS))) @@ -519,8 +537,23 @@ void rrdpush_send_host_labels(RRDHOST *host) { sender_thread_buffer_free(); } -void rrdpush_claimed_id(RRDHOST *host) -{ +void rrdpush_send_global_functions(RRDHOST *host) { + if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) + return; + + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) + return; + + BUFFER *wb = sender_start(host->sender); + + rrd_functions_expose_global_rrdpush(host, wb); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -555,7 +588,7 @@ int connect_to_one_of_destinations( if(d->postpone_reconnection_until > now) continue; - info( + netdata_log_info( "STREAM %s: connecting to '%s' (default port: %d)...", rrdhost_hostname(host), string2str(d->destination), @@ -564,7 +597,8 @@ int connect_to_one_of_destinations( if (reconnects_counter) *reconnects_counter += 1; - d->last_attempt = now; + d->since = now; + d->attempts++; sock = connect_to_this(string2str(d->destination), default_port, timeout); if (sock != -1) { @@ -611,7 +645,7 @@ bool destinations_init_add_one(char *entry, void *data) { DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next); t->count++; - info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); + netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); return false; // we return false, so that we will get all defined destinations } @@ -649,11 +683,11 @@ void rrdpush_destinations_free(RRDHOST *host) { // Either the receiver lost the connection or the host is being destroyed. // The sender mutex guards thread creation, any spurious data is wiped on reconnection. -void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) { +void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait) { if (!host->sender) return; - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { @@ -664,42 +698,41 @@ void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) { netdata_thread_cancel(host->rrdpush_sender_thread); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); if(wait) { - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); while(host->sender->tid) { - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); sleep_usec(10 * USEC_PER_MS); - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } } - // ---------------------------------------------------------------------------- // rrdpush receiver thread void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) { - log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); + netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); } static void rrdpush_sender_thread_spawn(RRDHOST *host) { - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host)); if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender)) - error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } int rrdpush_receiver_permission_denied(struct web_client *w) { @@ -750,7 +783,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri return rrdpush_receiver_too_busy_now(w); struct receiver_state *rpt = callocz(1, sizeof(*rpt)); - rpt->last_msg_t = now_realtime_sec(); + rpt->last_msg_t = now_monotonic_sec(); rpt->capabilities = STREAM_CAP_INVALID; rpt->hops = 1; @@ -823,7 +856,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt->tags = strdupz(value); else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID)) - rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0)); + rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false); else { // An old Netdata child does not have a compatible streaming protocol, map to something sane. @@ -846,10 +879,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri name = "NETDATA_HOST_OS_DETECTION"; else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID)) - rpt->capabilities = convert_stream_version_to_capabilities(1); + rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false); if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "request has parameter '%s' = '%s', which is not used." , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" , rpt->client_ip, rpt->client_port @@ -860,7 +893,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (rpt->capabilities & STREAM_CAP_INVALID) // no version is supplied, assume version 0; - rpt->capabilities = convert_stream_version_to_capabilities(0); + rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false); // find the program name and version if(w->user_agent && w->user_agent[0]) { @@ -1042,7 +1075,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri #endif rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "failed to reply." , rpt->hostname , rpt->client_ip, rpt->client_port @@ -1058,13 +1091,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri static time_t last_stream_accepted_t = 0; time_t now = now_realtime_sec(); - netdata_spinlock_lock(&spinlock); + spinlock_lock(&spinlock); if(unlikely(last_stream_accepted_t == 0)) last_stream_accepted_t = now; if(now - last_stream_accepted_t < web_client_streaming_rate_t) { - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); char msg[100 + 1]; snprintfz(msg, 100, @@ -1081,7 +1114,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } last_stream_accepted_t = now; - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); } /* @@ -1106,7 +1139,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (host) { netdata_mutex_lock(&host->receiver_lock); if (host->receiver) { - age = now_realtime_sec() - host->receiver->last_msg_t; + age = now_monotonic_sec() - host->receiver->last_msg_t; if (age < 30) receiver_working = true; @@ -1117,12 +1150,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } rrd_unlock(); - if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) { + if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) { // we stopped the receiver // we can proceed with this connection receiver_stale = false; - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "stopped previous stale receiver to accept this one." , rpt->hostname , rpt->client_ip, rpt->client_port @@ -1152,7 +1185,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } } - debug(D_SYSTEM, "starting STREAM receive thread."); + netdata_log_debug(D_SYSTEM, "starting STREAM receive thread."); rrdpush_receiver_takeover_web_connection(w, rpt); @@ -1177,20 +1210,20 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } void rrdpush_reset_destinations_postpone_time(RRDHOST *host) { - struct rrdpush_destinations *d; - for (d = host->destinations; d; d = d->next) - d->postpone_reconnection_until = 0; + uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5; + time_t now = now_realtime_sec(); + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = now + wait; } static struct { STREAM_HANDSHAKE err; const char *str; } handshake_errors[] = { - { STREAM_HANDSHAKE_OK_V5, "OK_V5" }, - { STREAM_HANDSHAKE_OK_V4, "OK_V4" }, - { STREAM_HANDSHAKE_OK_V3, "OK_V3" }, - { STREAM_HANDSHAKE_OK_V2, "OK_V2" }, - { STREAM_HANDSHAKE_OK_V1, "OK_V1" }, + { STREAM_HANDSHAKE_OK_V3, "CONNECTED" }, + { STREAM_HANDSHAKE_OK_V2, "CONNECTED" }, + { STREAM_HANDSHAKE_OK_V1, "CONNECTED" }, + { STREAM_HANDSHAKE_NEVER, "" }, { STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" }, { STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" }, { STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" }, @@ -1202,17 +1235,31 @@ static struct { { STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" }, { STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" }, { STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" }, - { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" }, + { STREAM_HANDSHAKE_INITIALIZATION, "REMOTE IS INITIALIZING" }, + { STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, "DISCONNECTED HOST CLEANUP" }, + { STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER, "DISCONNECTED STALE RECEIVER" }, + { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" }, + { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" }, + { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" }, + { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" }, + { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" }, + { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" }, + { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" }, + { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" }, { 0, NULL }, }; const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) { + if(handshake_error >= STREAM_HANDSHAKE_OK_V1) + // handshake_error is the whole version / capabilities number + return "CONNECTED"; + for(size_t i = 0; handshake_errors[i].str ; i++) { if(handshake_error == handshake_errors[i].err) return handshake_errors[i].str; } - return ""; + return "UNKNOWN"; } static struct { @@ -1232,6 +1279,7 @@ static struct { { STREAM_CAP_BINARY, "BINARY" }, { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, { STREAM_CAP_IEEE754, "IEEE754" }, + { STREAM_CAP_DATA_WITH_ML, "ML" }, { 0 , NULL }, }; @@ -1245,7 +1293,10 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) } void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) { - buffer_json_member_add_array(wb, key); + if(key) + buffer_json_member_add_array(wb, key); + else + buffer_json_add_array_item_array(wb); for(size_t i = 0; capability_names[i].str ; i++) { if(caps & capability_names[i].cap) @@ -1259,7 +1310,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); - info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", + netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); buffer_free(wb); @@ -1269,13 +1320,13 @@ void log_sender_capabilities(struct sender_state *s) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); - info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", + netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); buffer_free(wb); } -STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) { +STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) { STREAM_CAPABILITIES caps = 0; if(version <= 1) caps = STREAM_CAP_V1; @@ -1294,7 +1345,13 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) { if(caps & STREAM_CAP_V2) caps &= ~(STREAM_CAP_V1); - return caps & stream_our_capabilities(); + STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender); + + if(!(common_caps & STREAM_CAP_INTERPOLATED)) + // DATA WITH ML requires INTERPOLATED + common_caps &= ~STREAM_CAP_DATA_WITH_ML; + + return common_caps; } int32_t stream_capabilities_to_vn(uint32_t caps) { diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index f97c8ddf..73bd438c 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -43,19 +43,20 @@ typedef enum { STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values + STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit // needed for negotiating errors between parent and child } STREAM_CAPABILITIES; -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION #define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION #else #define STREAM_HAS_COMPRESSION 0 -#endif // ENABLE_COMPRESSION +#endif // ENABLE_RRDPUSH_COMPRESSION -STREAM_CAPABILITIES stream_our_capabilities(); +STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender); #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability)) @@ -77,11 +78,10 @@ STREAM_CAPABILITIES stream_our_capabilities(); #define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later." typedef enum { - STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION - STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS - STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM - STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS - STREAM_HANDSHAKE_OK_V1 = 1, + STREAM_HANDSHAKE_OK_V3 = 3, // v3+ + STREAM_HANDSHAKE_OK_V2 = 2, // v2 + STREAM_HANDSHAKE_OK_V1 = 1, // v1 + STREAM_HANDSHAKE_NEVER = 0, // never tried to connect STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1, STREAM_HANDSHAKE_ERROR_LOCALHOST = -2, STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3, @@ -94,20 +94,19 @@ typedef enum { STREAM_HANDSHAKE_BUSY_TRY_LATER = -10, STREAM_HANDSHAKE_INTERNAL_ERROR = -11, STREAM_HANDSHAKE_INITIALIZATION = -12, -} STREAM_HANDSHAKE; - - -// ---------------------------------------------------------------------------- + STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP = -13, + STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER = -14, + STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15, + STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16, + STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18, + STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19, + STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20, + STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21, + STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22, -typedef enum __attribute__((packed)) { - STREAM_TRAFFIC_TYPE_REPLICATION, - STREAM_TRAFFIC_TYPE_FUNCTIONS, - STREAM_TRAFFIC_TYPE_METADATA, - STREAM_TRAFFIC_TYPE_DATA, +} STREAM_HANDSHAKE; - // terminator - STREAM_TRAFFIC_TYPE_MAX, -} STREAM_TRAFFIC_TYPE; // ---------------------------------------------------------------------------- @@ -119,35 +118,115 @@ typedef struct { char *kernel_version; } stream_encoded_t; -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION +// signature MUST end with a newline +#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) +#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) +#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4 + struct compressor_state { + bool initialized; char *compression_result_buffer; size_t compression_result_buffer_size; - struct compressor_data *data; // Compression API specific data - void (*reset)(struct compressor_state *state); + struct { + void *lz4_stream; + char *input_ring_buffer; + size_t input_ring_buffer_size; + size_t input_ring_buffer_pos; + } stream; size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer); void (*destroy)(struct compressor_state **state); }; +void rrdpush_compressor_reset(struct compressor_state *state); +void rrdpush_compressor_destroy(struct compressor_state *state); +size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out); + struct decompressor_state { + bool initialized; size_t signature_size; size_t total_compressed; size_t total_uncompressed; size_t packet_count; - struct decompressor_stream *stream; // Decompression API specific data - void (*reset)(struct decompressor_state *state); - size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size); - size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); - size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state); - size_t (*get)(struct decompressor_state *state, char *data, size_t size); - void (*destroy)(struct decompressor_state **state); + struct { + void *lz4_stream; + char *buffer; + size_t size; + size_t write_at; + size_t read_at; + } stream; }; + +void rrdpush_decompressor_destroy(struct decompressor_state *state); +void rrdpush_decompressor_reset(struct decompressor_state *state); +size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); + +static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) { + if (unlikely(!data || !data_size)) + return 0; + + if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE)) + return 0; + + uint32_t sign = *(uint32_t *)data; + if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE)) + return 0; + + size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); + return length; +} + +static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) { + if(unlikely(state->stream.read_at != state->stream.write_at)) + fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); + + return rrdpush_decompress_decode_header(header, header_size); +} + +static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) { + if(unlikely(state->stream.read_at > state->stream.write_at)) + fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); + + return state->stream.write_at - state->stream.read_at; +} + +static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { + if (unlikely(!state || !size || !dst)) + return 0; + + size_t remaining = rrdpush_decompressed_bytes_in_buffer(state); + + if(unlikely(!remaining)) + return 0; + + size_t bytes_to_return = size; + if(bytes_to_return > remaining) + bytes_to_return = remaining; + + memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return); + state->stream.read_at += bytes_to_return; + + if(unlikely(state->stream.read_at > state->stream.write_at)) + fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); + + return bytes_to_return; +} #endif // Thread-local storage - // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. +// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. -typedef enum { +typedef enum __attribute__((packed)) { + STREAM_TRAFFIC_TYPE_REPLICATION = 0, + STREAM_TRAFFIC_TYPE_FUNCTIONS, + STREAM_TRAFFIC_TYPE_METADATA, + STREAM_TRAFFIC_TYPE_DATA, + + // terminator + STREAM_TRAFFIC_TYPE_MAX, +} STREAM_TRAFFIC_TYPE; + +typedef enum __attribute__((packed)) { SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; @@ -158,7 +237,7 @@ struct sender_state { SENDER_FLAGS flags; int timeout; int default_port; - usec_t reconnect_delay; + uint32_t reconnect_delay; char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c size_t begin; size_t reconnects_counter; @@ -170,10 +249,10 @@ struct sender_state { size_t not_connected_loops; // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here. - netdata_mutex_t mutex; + SPINLOCK spinlock; struct circular_buffer *buffer; char read_buffer[PLUGINSD_LINE_MAX + 1]; - int read_len; + ssize_t read_len; STREAM_CAPABILITIES capabilities; size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX]; @@ -183,16 +262,17 @@ struct sender_state { uint16_t hops; -#ifdef ENABLE_COMPRESSION - struct compressor_state *compressor; -#endif +#ifdef ENABLE_RRDPUSH_COMPRESSION + struct compressor_state compressor; +#endif // ENABLE_RRDPUSH_COMPRESSION + #ifdef ENABLE_HTTPS NETDATA_SSL ssl; // structure used to encrypt the connection #endif struct { bool shutdown; - const char *reason; + STREAM_HANDSHAKE reason; } exit; struct { @@ -216,6 +296,9 @@ struct sender_state { } atomic; }; +#define sender_lock(sender) spinlock_lock(&(sender)->spinlock) +#define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock) + #define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED) #define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED) #define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED) @@ -242,6 +325,44 @@ struct sender_state { #define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED) #define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED) +/* +typedef enum { + STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE = (1 << 0), + STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST = (1 << 1), + STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2), + STREAM_NODE_INSTANCE_FEATURE_ML_SELF = (1 << 3), + STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED = (1 << 4), + STREAM_NODE_INSTANCE_FEATURE_SSL = (1 << 5), +} STREAM_NODE_INSTANCE_FEATURES; + +typedef struct stream_node_instance { + uuid_t uuid; + STRING *agent; + STREAM_NODE_INSTANCE_FEATURES features; + uint32_t hops; + + // receiver information on that agent + int32_t capabilities; + uint32_t local_port; + uint32_t remote_port; + STRING *local_ip; + STRING *remote_ip; +} STREAM_NODE_INSTANCE; +*/ + +struct buffered_reader { + ssize_t read_len; + ssize_t pos; + char read_buffer[PLUGINSD_LINE_MAX + 1]; +}; + +char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size); +static inline void buffered_reader_init(struct buffered_reader *reader) { + reader->read_buffer[0] = '\0'; + reader->read_len = 0; + reader->pos = 0; +} + struct receiver_state { RRDHOST *host; pid_t tid; @@ -263,14 +384,14 @@ struct receiver_state { struct rrdhost_system_info *system_info; STREAM_CAPABILITIES capabilities; time_t last_msg_t; - char read_buffer[PLUGINSD_LINE_MAX + 1]; - int read_len; + + struct buffered_reader reader; uint16_t hops; struct { bool shutdown; // signal the streaming parser to exit - const char *reason; // the reason of disconnection to log + STREAM_HANDSHAKE reason; } exit; struct { @@ -279,6 +400,7 @@ struct receiver_state { int update_every; int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO time_t alarms_delay; + uint32_t alarms_history; int rrdpush_enabled; char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig @@ -292,31 +414,36 @@ struct receiver_state { #ifdef ENABLE_HTTPS NETDATA_SSL ssl; #endif -#ifdef ENABLE_COMPRESSION - unsigned int rrdpush_compression; - struct decompressor_state *decompressor; -#endif time_t replication_first_time_t; + +#ifdef ENABLE_RRDPUSH_COMPRESSION + struct decompressor_state decompressor; +#endif // ENABLE_RRDPUSH_COMPRESSION +/* + struct { + uint32_t count; + STREAM_NODE_INSTANCE *array; + } instances; +*/ }; struct rrdpush_destinations { STRING *destination; bool ssl; - - const char *last_error; - time_t last_attempt; + uint32_t attempts; + time_t since; time_t postpone_reconnection_until; - STREAM_HANDSHAKE last_handshake; + STREAM_HANDSHAKE reason; struct rrdpush_destinations *prev; struct rrdpush_destinations *next; }; extern unsigned int default_rrdpush_enabled; -#ifdef ENABLE_COMPRESSION -extern unsigned int default_compression_enabled; -#endif +#ifdef ENABLE_RRDPUSH_COMPRESSION +extern unsigned int default_rrdpush_compression_enabled; +#endif // ENABLE_RRDPUSH_COMPRESSION extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; @@ -352,13 +479,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ bool rrdset_push_chart_definition_now(RRDSET *st); void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); -void rrdpush_claimed_id(RRDHOST *host); +void rrdpush_send_claimed_id(RRDHOST *host); +void rrdpush_send_global_functions(RRDHOST *host); #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string); -void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait); +void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait); void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); @@ -373,27 +501,266 @@ int connect_to_one_of_destinations( void rrdpush_signal_sender_to_wake_up(struct sender_state *s); -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION struct compressor_state *create_compressor(); -struct decompressor_state *create_decompressor(); -#endif +#endif // ENABLE_RRDPUSH_COMPRESSION + void rrdpush_reset_destinations_postpone_time(RRDHOST *host); const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error); void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key); void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status); void log_receiver_capabilities(struct receiver_state *rpt); void log_sender_capabilities(struct sender_state *s); -STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); +STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender); int32_t stream_capabilities_to_vn(uint32_t caps); void receiver_state_free(struct receiver_state *rpt); -bool stop_streaming_receiver(RRDHOST *host, const char *reason); +bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason); void sender_thread_buffer_free(void); -void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused); -void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused); - #include "replication.h" +typedef enum __attribute__((packed)) { + RRDHOST_DB_STATUS_INITIALIZING = 0, + RRDHOST_DB_STATUS_QUERYABLE, +} RRDHOST_DB_STATUS; + +static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) { + switch(status) { + default: + case RRDHOST_DB_STATUS_INITIALIZING: + return "initializing"; + + case RRDHOST_DB_STATUS_QUERYABLE: + return "online"; + } +} + +typedef enum __attribute__((packed)) { + RRDHOST_DB_LIVENESS_STALE = 0, + RRDHOST_DB_LIVENESS_LIVE, +} RRDHOST_DB_LIVENESS; + +static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) { + switch(status) { + default: + case RRDHOST_DB_LIVENESS_STALE: + return "stale"; + + case RRDHOST_DB_LIVENESS_LIVE: + return "live"; + } +} + +typedef enum __attribute__((packed)) { + RRDHOST_INGEST_STATUS_ARCHIVED = 0, + RRDHOST_INGEST_STATUS_INITIALIZING, + RRDHOST_INGEST_STATUS_REPLICATING, + RRDHOST_INGEST_STATUS_ONLINE, + RRDHOST_INGEST_STATUS_OFFLINE, +} RRDHOST_INGEST_STATUS; + +static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) { + switch(status) { + case RRDHOST_INGEST_STATUS_ARCHIVED: + return "archived"; + + case RRDHOST_INGEST_STATUS_INITIALIZING: + return "initializing"; + + case RRDHOST_INGEST_STATUS_REPLICATING: + return "replicating"; + + case RRDHOST_INGEST_STATUS_ONLINE: + return "online"; + + default: + case RRDHOST_INGEST_STATUS_OFFLINE: + return "offline"; + } +} + +typedef enum __attribute__((packed)) { + RRDHOST_INGEST_TYPE_LOCALHOST = 0, + RRDHOST_INGEST_TYPE_VIRTUAL, + RRDHOST_INGEST_TYPE_CHILD, + RRDHOST_INGEST_TYPE_ARCHIVED, +} RRDHOST_INGEST_TYPE; + +static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) { + switch(type) { + case RRDHOST_INGEST_TYPE_LOCALHOST: + return "localhost"; + + case RRDHOST_INGEST_TYPE_VIRTUAL: + return "virtual"; + + case RRDHOST_INGEST_TYPE_CHILD: + return "child"; + + default: + case RRDHOST_INGEST_TYPE_ARCHIVED: + return "archived"; + } +} + +typedef enum __attribute__((packed)) { + RRDHOST_STREAM_STATUS_DISABLED = 0, + RRDHOST_STREAM_STATUS_REPLICATING, + RRDHOST_STREAM_STATUS_ONLINE, + RRDHOST_STREAM_STATUS_OFFLINE, +} RRDHOST_STREAMING_STATUS; + +static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) { + switch(status) { + case RRDHOST_STREAM_STATUS_DISABLED: + return "disabled"; + + case RRDHOST_STREAM_STATUS_REPLICATING: + return "replicating"; + + case RRDHOST_STREAM_STATUS_ONLINE: + return "online"; + + default: + case RRDHOST_STREAM_STATUS_OFFLINE: + return "offline"; + } +} + +typedef enum __attribute__((packed)) { + RRDHOST_ML_STATUS_DISABLED = 0, + RRDHOST_ML_STATUS_OFFLINE, + RRDHOST_ML_STATUS_RUNNING, +} RRDHOST_ML_STATUS; + +static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) { + switch(status) { + case RRDHOST_ML_STATUS_RUNNING: + return "online"; + + case RRDHOST_ML_STATUS_OFFLINE: + return "offline"; + + default: + case RRDHOST_ML_STATUS_DISABLED: + return "disabled"; + } +} + +typedef enum __attribute__((packed)) { + RRDHOST_ML_TYPE_DISABLED = 0, + RRDHOST_ML_TYPE_SELF, + RRDHOST_ML_TYPE_RECEIVED, +} RRDHOST_ML_TYPE; + +static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) { + switch(type) { + case RRDHOST_ML_TYPE_SELF: + return "self"; + + case RRDHOST_ML_TYPE_RECEIVED: + return "received"; + + default: + case RRDHOST_ML_TYPE_DISABLED: + return "disabled"; + } +} + +typedef enum __attribute__((packed)) { + RRDHOST_HEALTH_STATUS_DISABLED = 0, + RRDHOST_HEALTH_STATUS_INITIALIZING, + RRDHOST_HEALTH_STATUS_RUNNING, +} RRDHOST_HEALTH_STATUS; + +static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) { + switch(status) { + default: + case RRDHOST_HEALTH_STATUS_DISABLED: + return "disabled"; + + case RRDHOST_HEALTH_STATUS_INITIALIZING: + return "initializing"; + + case RRDHOST_HEALTH_STATUS_RUNNING: + return "online"; + } +} + +typedef struct rrdhost_status { + RRDHOST *host; + time_t now; + + struct { + RRDHOST_DB_STATUS status; + RRDHOST_DB_LIVENESS liveness; + RRD_MEMORY_MODE mode; + time_t first_time_s; + time_t last_time_s; + size_t metrics; + size_t instances; + size_t contexts; + } db; + + struct { + RRDHOST_ML_STATUS status; + RRDHOST_ML_TYPE type; + struct ml_metrics_statistics metrics; + } ml; + + struct { + size_t hops; + RRDHOST_INGEST_TYPE type; + RRDHOST_INGEST_STATUS status; + SOCKET_PEERS peers; + bool ssl; + STREAM_CAPABILITIES capabilities; + uint32_t id; + time_t since; + STREAM_HANDSHAKE reason; + + struct { + bool in_progress; + NETDATA_DOUBLE completion; + size_t instances; + } replication; + } ingest; + + struct { + size_t hops; + RRDHOST_STREAMING_STATUS status; + SOCKET_PEERS peers; + bool ssl; + bool compression; + STREAM_CAPABILITIES capabilities; + uint32_t id; + time_t since; + STREAM_HANDSHAKE reason; + + struct { + bool in_progress; + NETDATA_DOUBLE completion; + size_t instances; + } replication; + + size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX]; + } stream; + + struct { + RRDHOST_HEALTH_STATUS status; + struct { + uint32_t undefined; + uint32_t uninitialized; + uint32_t clear; + uint32_t warning; + uint32_t critical; + } alerts; + } health; +} RRDHOST_STATUS; + +void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s); +bool rrdhost_state_cloud_emulation(RRDHOST *host); + #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 6e58d9a2..76843518 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -11,7 +11,7 @@ #define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 #define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 #define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 -#define WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR 9 +#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 #define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 #define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 #define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 @@ -66,7 +66,7 @@ BUFFER *sender_start(struct sender_state *s) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION /* * In case of stream compression buffer overflow * Inform the user through the error log file and @@ -74,9 +74,9 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); */ static inline void deactivate_compression(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); - error("STREAM_COMPRESSION: Compression returned error, disabling it."); + netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it."); s->flags &= ~SENDER_FLAG_COMPRESSION; - error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); rrdpush_sender_thread_close_socket(s->host); } #endif @@ -100,7 +100,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) if(unlikely(!src || !src_len)) return; - netdata_mutex_lock(&s->mutex); + sender_lock(s); // FILE *fp = fopen("/tmp/stream.txt", "a"); // fprintf(fp, @@ -111,14 +111,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) // fclose(fp); if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { - info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", + netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE); s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } -#ifdef ENABLE_COMPRESSION - if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) { +#ifdef ENABLE_RRDPUSH_COMPRESSION + if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) { while(src_len) { size_t size_to_compress = src_len; @@ -144,19 +144,19 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) } char *dst; - size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst); + size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if (!dst_len) { - error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", + netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", rrdhost_hostname(s->host), s->connected_to); - s->compressor->reset(s->compressor); - dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst); + rrdpush_compressor_reset(&s->compressor); + dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if(!dst_len) { - error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", + netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", rrdhost_hostname(s->host), s->connected_to); deactivate_compression(s); - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); return; } } @@ -189,7 +189,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) signal_sender = true; } - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); if(signal_sender) rrdpush_signal_sender_to_wake_up(s); @@ -203,7 +203,7 @@ static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const , rrdvar2number(rva) ); - debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva)); + netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva)); } void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) { @@ -242,7 +242,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); - debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); + netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); } } @@ -258,7 +258,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { RRDDIM *rd; rrddim_foreach_read(rd, st) - rd->exposed = 0; + rrddim_clear_exposed(rd); rrddim_foreach_done(rd); } rrdset_foreach_done(st); @@ -273,7 +273,7 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t return; if(!have_mutex) - netdata_mutex_lock(&s->mutex); + sender_lock(s); rrdpush_sender_last_buffer_recreate_set(s, now_s); last_reset_time_s = now_s; @@ -287,20 +287,20 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t sender_thread_buffer_free(); if(!have_mutex) - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); } static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { rrdpush_sender_set_flush_time(host->sender); - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); // flush the output buffer from any data it may have cbuffer_flush(host->sender->buffer); rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true); replication_recalculate_buffer_used_ratio_unsafe(host->sender); - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) { @@ -490,27 +490,26 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender break; } } - const char *error = stream_responses[i].error; - int worker_job_id = stream_responses[i].worker_job_id; - time_t delay = stream_responses[i].postpone_reconnect_seconds; if(version >= STREAM_HANDSHAKE_OK_V1) { - host->destination->last_error = NULL; - host->destination->last_handshake = version; - host->destination->postpone_reconnection_until = 0; - s->capabilities = convert_stream_version_to_capabilities(version); + host->destination->reason = version; + host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay; + s->capabilities = convert_stream_version_to_capabilities(version, host, true); return true; } + const char *error = stream_responses[i].error; + int worker_job_id = stream_responses[i].worker_job_id; + time_t delay = stream_responses[i].postpone_reconnect_seconds; + worker_is_busy(worker_job_id); rrdpush_sender_thread_close_socket(host); - host->destination->last_error = error; - host->destination->last_handshake = version; + host->destination->reason = version; host->destination->postpone_reconnection_until = now_realtime_sec() + delay; char buf[LOG_DATE_LENGTH]; log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); - error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", rrdhost_hostname(host), s->connected_to, error, delay, buf); return false; @@ -532,8 +531,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "SSL error"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR; + host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR; host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; return false; } @@ -543,10 +541,9 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) { // certificate is not valid worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); - error("SSL: closing the stream connection, because the server SSL certificate is not valid."); + netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid."); rrdpush_sender_thread_close_socket(host); - host->destination->last_error = "invalid SSL certificate"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; + host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE; host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60; return false; } @@ -554,7 +551,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) { return true; } - error("SSL: failed to establish connection."); + netdata_log_error("SSL: failed to establish connection."); return false; #else @@ -584,20 +581,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p ); if(unlikely(s->rrdpush_sender_socket == -1)) { - // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); + // netdata_log_error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination); return false; } - // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); + // netdata_log_info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to); // reset our capabilities to default - s->capabilities = stream_our_capabilities(); + s->capabilities = stream_our_capabilities(host, true); -#ifdef ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION // If we don't want compression, remove it from our capabilities if(!(s->flags & SENDER_FLAG_COMPRESSION)) s->capabilities &= ~STREAM_CAP_COMPRESSION; -#endif // ENABLE_COMPRESSION +#endif // ENABLE_RRDPUSH_COMPRESSION /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the version negotiation resulted in a high enough version. @@ -708,7 +705,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(!rrdpush_sender_connect_ssl(s)) return false; - ssize_t bytes, len = strlen(http); + ssize_t bytes, len = (ssize_t)strlen(http); bytes = send_timeout( #ifdef ENABLE_HTTPS @@ -723,9 +720,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(bytes <= 0) { // timeout is 0 worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); - host->destination->last_error = "timeout while sending request"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; + netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; return false; } @@ -743,36 +739,33 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(bytes <= 0) { // timeout is 0 worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); - host->destination->last_error = "timeout while expecting first response"; - host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; + netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 30; return false; } if(sock_setnonblock(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); http[bytes] = '\0'; - debug(D_STREAM, "Response to sender from far end: %s", http); + netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http); if(!rrdpush_sender_validate_response(host, s, http, bytes)) return false; -#ifdef ENABLE_COMPRESSION - if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) { - if(!s->compressor) - s->compressor = create_compressor(); - else - s->compressor->reset(s->compressor); - } -#endif //ENABLE_COMPRESSION +#ifdef ENABLE_RRDPUSH_COMPRESSION + if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) + rrdpush_compressor_reset(&s->compressor); + else + rrdpush_compressor_destroy(&s->compressor); +#endif // ENABLE_RRDPUSH_COMPRESSION log_sender_capabilities(s); - debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); + netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); return true; } @@ -820,18 +813,18 @@ static bool attempt_to_connect(struct sender_state *state) return false; } -// TCP window is open and we have data to transmit. +// TCP window is open, and we have data to transmit. static ssize_t attempt_to_send(struct sender_state *s) { - ssize_t ret = 0; + ssize_t ret; #ifdef NETDATA_INTERNAL_CHECKS struct circular_buffer *cb = s->buffer; #endif - netdata_mutex_lock(&s->mutex); + sender_lock(s); char *chunk; size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk); - debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); + netdata_log_debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); #ifdef ENABLE_HTTPS if(SSL_connection(&s->ssl)) @@ -846,21 +839,21 @@ static ssize_t attempt_to_send(struct sender_state *s) { cbuffer_remove_unsafe(s->buffer, ret); s->sent_bytes_on_this_connection += ret; s->sent_bytes += ret; - debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret); + netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret); } else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)) - debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to); + netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to); else if (ret == -1) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR); - debug(D_STREAM, "STREAM: Send failed - closing socket..."); - error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection); + netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket..."); + netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } else - debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); + netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); replication_recalculate_buffer_used_ratio_unsafe(s); - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); return ret; } @@ -893,11 +886,11 @@ static ssize_t attempt_read(struct sender_state *s) { if (ret == 0 || errno == ECONNRESET) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED); - error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to); } else { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR); - error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret); + netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret); } rrdpush_sender_thread_close_socket(s->host); @@ -951,13 +944,13 @@ void execute_commands(struct sender_state *s) { while( start < end && (newline = strchr(start, '\n')) ) { *newline = '\0'; - log_access("STREAM: %d from '%s' for host '%s': %s", + netdata_log_access("STREAM: %d from '%s' for host '%s': %s", gettid(), s->connected_to, rrdhost_hostname(s->host), start); // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS); + size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS); const char *keyword = get_word(words, num_words, 0); @@ -969,7 +962,7 @@ void execute_commands(struct sender_state *s) { char *function = get_word(words, num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, keyword, transaction?transaction:"(unset)", @@ -1002,7 +995,7 @@ void execute_commands(struct sender_state *s) { const char *before = get_word(words, num_words, 4); if (!chart_id || !start_streaming || !after || !before) { - error("STREAM %s [send to %s] %s command is incomplete" + netdata_log_error("STREAM %s [send to %s] %s command is incomplete" " (chart=%s, start_streaming=%s, after=%s, before=%s)", rrdhost_hostname(s->host), s->connected_to, keyword, @@ -1020,7 +1013,7 @@ void execute_commands(struct sender_state *s) { } } else { - error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); + netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); } worker_is_busy(WORKER_SENDER_JOB_EXECUTE); @@ -1051,7 +1044,7 @@ static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) int new_pipe_fds[2]; if(reopen) { if(pipe(new_pipe_fds) != 0) { - error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host)); new_pipe_fds[PIPE_READ] = -1; new_pipe_fds[PIPE_WRITE] = -1; ret = false; @@ -1091,138 +1084,26 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) { // signal the sender there are more data if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) { - error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host)); rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true); } } -static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) { - size_t charts = rrdhost_sender_replicating_charts(host); - NETDATA_DOUBLE completion; - if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t) - completion = 100.0; - else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t) - completion = 0.0; - else { - time_t total = now - host->sender->replication.oldest_request_after_t; - time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t; - completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total; - } - - *instances = charts; - - return completion; -} - -void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) { - bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - buffer_json_member_add_object(wb, key); - - if(host->sender) - buffer_json_member_add_uint64(wb, "hops", host->sender->hops); - - buffer_json_member_add_boolean(wb, "online", online); - - if(host->sender && host->sender->last_state_since_t) { - buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t); - buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t); - } - - if(!online && host->sender && host->sender->exit.reason) - buffer_json_member_add_string(wb, "reason", host->sender->exit.reason); - - buffer_json_member_add_object(wb, "replication"); - { - size_t instances; - NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances); - buffer_json_member_add_boolean(wb, "in_progress", instances); - buffer_json_member_add_double(wb, "completion", completion); - buffer_json_member_add_uint64(wb, "instances", instances); - } - buffer_json_object_close(wb); - - if(host->sender) { - netdata_mutex_lock(&host->sender->mutex); - - buffer_json_member_add_object(wb, "destination"); - { - char buf[1024 + 1]; - if(online && host->sender->rrdpush_sender_socket != -1) { - SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket); - bool ssl = SSL_connection(&host->sender->ssl); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "local", buf); - - snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : ""); - buffer_json_member_add_string(wb, "remote", buf); - - stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities"); - - buffer_json_member_add_object(wb, "traffic"); - { - bool compression = false; -#ifdef ENABLE_COMPRESSION - compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor); -#endif - buffer_json_member_add_boolean(wb, "compression", compression); - buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]); - buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]); - buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]); - buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]); - } - buffer_json_object_close(wb); // traffic - } - - buffer_json_member_add_array(wb, "candidates"); - struct rrdpush_destinations *d; - for (d = host->destinations; d; d = d->next) { - buffer_json_add_array_item_object(wb); - { - - if (d->ssl) { - snprintfz(buf, 1024, "%s:SSL", string2str(d->destination)); - buffer_json_member_add_string(wb, "destination", buf); - } - else - buffer_json_member_add_string(wb, "destination", string2str(d->destination)); - - buffer_json_member_add_time_t(wb, "last_check", d->last_attempt); - buffer_json_member_add_time_t(wb, "age", now - d->last_attempt); - buffer_json_member_add_string(wb, "last_error", d->last_error); - buffer_json_member_add_string(wb, "last_handshake", - stream_handshake_error_to_string(d->last_handshake)); - buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until); - buffer_json_member_add_time_t(wb, "next_in", - (d->postpone_reconnection_until > now) ? - d->postpone_reconnection_until - now : 0); - } - buffer_json_object_close(wb); // each candidate - } - buffer_json_array_close(wb); // candidates - } - buffer_json_object_close(wb); // destination - - netdata_mutex_unlock(&host->sender->mutex); - } - - buffer_json_object_close(wb); // streaming -} - static bool rrdhost_set_sender(RRDHOST *host) { if(unlikely(!host->sender)) return false; bool ret = false; - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(!host->sender->tid) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); + host->rrdpush_sender_connection_counter++; host->sender->tid = gettid(); host->sender->last_state_since_t = now_realtime_sec(); - host->sender->exit.reason = NULL; + host->sender->exit.reason = STREAM_HANDSHAKE_NEVER; ret = true; } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); rrdpush_reset_destinations_postpone_time(host); @@ -1237,6 +1118,10 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) { host->sender->exit.shutdown = false; rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); host->sender->last_state_since_t = now_realtime_sec(); + if(host->destination) { + host->destination->since = host->sender->last_state_since_t; + host->destination->reason = host->sender->exit.reason; + } } rrdpush_reset_destinations_postpone_time(host); @@ -1248,25 +1133,25 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) { if(unlikely(!service_running(SERVICE_STREAMING))) { if(!s->exit.reason) - s->exit.reason = "NETDATA EXIT"; + s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT; return true; } if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) { if(!s->exit.reason) - s->exit.reason = "NON STREAMABLE HOST"; + s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST; return true; } if(unlikely(s->exit.shutdown)) { if(!s->exit.reason) - s->exit.reason = "SENDER SHUTDOWN REQUESTED"; + s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN; return true; } if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) { if(!s->exit.reason) - s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)"; + s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST; return true; } @@ -1279,16 +1164,16 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { RRDHOST *host = s->host; - netdata_mutex_lock(&host->sender->mutex); - info("STREAM %s [send]: sending thread exits %s", + sender_lock(host->sender); + netdata_log_info("STREAM %s [send]: sending thread exits %s", rrdhost_hostname(host), - host->sender->exit.reason ? host->sender->exit.reason : ""); + host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : ""); rrdpush_sender_thread_close_socket(host); rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); rrdhost_clear_sender___while_having_sender_mutex(host); - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); freez(s->pipe_buffer); freez(s); @@ -1297,10 +1182,10 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { void rrdpush_initialize_ssl_ctx(RRDHOST *host) { #ifdef ENABLE_HTTPS static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; - netdata_spinlock_lock(&sp); + spinlock_lock(&sp); if(netdata_ssl_streaming_sender_ctx || !host) { - netdata_spinlock_unlock(&sp); + spinlock_unlock(&sp); return; } @@ -1316,7 +1201,7 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host) { } } - netdata_spinlock_unlock(&sp); + spinlock_unlock(&sp); #endif } @@ -1331,7 +1216,7 @@ void *rrdpush_sender_thread(void *ptr) { // disconnection reasons worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error"); - worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed"); @@ -1353,20 +1238,20 @@ void *rrdpush_sender_thread(void *ptr) { if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { - error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", + netdata_log_error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", rrdhost_hostname(s->host), gettid()); return NULL; } if(!rrdhost_set_sender(s->host)) { - error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.", + netdata_log_error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.", rrdhost_hostname(s->host), gettid()); return NULL; } rrdpush_initialize_ssl_ctx(s->host); - info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); + netdata_log_info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid()); s->timeout = (int)appconfig_get_number( &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600); @@ -1397,7 +1282,7 @@ void *rrdpush_sender_thread(void *ptr) { pipe_buffer_size = 10 * 1024; if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { - error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", + netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", rrdhost_hostname(s->host)); return NULL; } @@ -1433,12 +1318,13 @@ void *rrdpush_sender_thread(void *ptr) { break; now_s = s->last_traffic_seen_t = now_monotonic_sec(); - rrdpush_claimed_id(s->host); + rrdpush_send_claimed_id(s->host); rrdpush_send_host_labels(s->host); + rrdpush_send_global_functions(s->host); s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); + netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); continue; } @@ -1452,19 +1338,19 @@ void *rrdpush_sender_thread(void *ptr) { !rrdpush_sender_replicating_charts(s) )) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); - error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts); + netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts); rrdpush_sender_thread_close_socket(s->host); continue; } - netdata_mutex_lock(&s->mutex); + sender_lock(s); size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL); size_t available = cbuffer_available_size_unsafe(s->buffer); if (unlikely(!outstanding)) { rrdpush_sender_pipe_clear_pending_data(s); rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); } - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); @@ -1473,7 +1359,7 @@ void *rrdpush_sender_thread(void *ptr) { if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) { if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { - error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", + netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.", rrdhost_hostname(s->host)); rrdpush_sender_thread_close_socket(s->host); break; @@ -1502,7 +1388,7 @@ void *rrdpush_sender_thread(void *ptr) { int poll_rc = poll(fds, 2, 1000); - debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", + netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...", fds[Collector].revents, fds[Socket].revents, outstanding); if(unlikely(rrdhost_sender_should_exit(s))) @@ -1517,7 +1403,7 @@ void *rrdpush_sender_thread(void *ptr) { // Spurious wake-ups without error - loop again if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) { netdata_thread_testcancel(); - debug(D_STREAM, "Spurious wakeup"); + netdata_log_debug(D_STREAM, "Spurious wakeup"); now_s = now_monotonic_sec(); continue; } @@ -1525,7 +1411,7 @@ void *rrdpush_sender_thread(void *ptr) { // Only errors from poll() are internal, but try restarting the connection if(unlikely(poll_rc == -1)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR); - error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to); rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true); rrdpush_sender_thread_close_socket(s->host); continue; @@ -1544,10 +1430,10 @@ void *rrdpush_sender_thread(void *ptr) { // If the collector woke us up then empty the pipe to remove the signal if (fds[Collector].revents & (POLLIN|POLLPRI)) { worker_is_busy(WORKER_SENDER_JOB_PIPE_READ); - debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding); + netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding); if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1) - error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to); + netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to); } // Read as much as possible to fill the buffer, split into full lines for execution. @@ -1575,7 +1461,7 @@ void *rrdpush_sender_thread(void *ptr) { if(error) { rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true); - error("STREAM %s [send to %s]: restarting internal pipe: %s.", + netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.", rrdhost_hostname(s->host), s->connected_to, error); } } @@ -1591,8 +1477,8 @@ void *rrdpush_sender_thread(void *ptr) { error = "connection is invalid (POLLNVAL)"; if(unlikely(error)) { - worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR); - error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.", + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR); + netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.", rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } @@ -1602,7 +1488,7 @@ void *rrdpush_sender_thread(void *ptr) { if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW); errno = 0; - error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection", + netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection", rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection); rrdpush_sender_thread_close_socket(s->host); } diff --git a/streaming/stream.conf b/streaming/stream.conf index 7c9ccc9b..94e94cab 100644 --- a/streaming/stream.conf +++ b/streaming/stream.conf @@ -146,8 +146,8 @@ # 3 possible values: # yes enable alarms # no do not enable alarms - # auto enable alarms, only when the sending netdata is connected. For ephemeral child nodes or child system restarts, - # ensure that the netdata process on the child is gracefully stopped, to prevent invalid last_collected alarms + # auto enable alarms, only when the sending netdata is connected. + # Health monitoring will be disabled as soon as the connection is closed. # You can also set it per host, below. # The default is taken from [health].enabled of netdata.conf #health enabled by default = auto @@ -155,6 +155,9 @@ # postpone alarms for a short period after the sender is connected default postpone alarms on connect seconds = 60 + # seconds of health log events to keep + #default health log history = 432000 + # need to route metrics differently? set these. # the defaults are the ones at the [stream] section (above) #default proxy enabled = yes | no @@ -223,6 +226,9 @@ # postpone alarms when the sender connects postpone alarms on connect seconds = 60 + # seconds of health log events to keep + #health log history = 432000 + # need to route metrics differently? # the defaults are the ones at the [API KEY] section #proxy enabled = yes | no |