summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/README.md1
-rw-r--r--streaming/compression.c305
-rw-r--r--streaming/receiver.c449
-rw-r--r--streaming/replication.c46
-rw-r--r--streaming/replication.h2
-rw-r--r--streaming/rrdpush.c211
-rw-r--r--streaming/rrdpush.h493
-rw-r--r--streaming/sender.c342
-rw-r--r--streaming/stream.conf10
9 files changed, 972 insertions, 887 deletions
diff --git a/streaming/README.md b/streaming/README.md
index bf11f32e..370186ac 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -55,6 +55,7 @@ node**. This file is automatically generated by Netdata the first time it is sta
| [`default memory mode`](#default-memory-mode) | `ram` | The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `map`, `save`, `ram`, or `none`. [Read more →](#default-memory-mode) |
| `health enabled by default` | `auto` | Whether alarms and notifications should be enabled for nodes using this `API_KEY`. `auto` enables alarms when the child is connected. `yes` enables alarms always, and `no` disables alarms. |
| `default postpone alarms on connect seconds` | `60` | Postpone alarms and notifications for a period of time after the child connects. |
+| `default health log history` | `432000` | History of health log events (in seconds) kept in the database. |
| `default proxy enabled` | ` ` | Route metrics through a proxy. |
| `default proxy destination` | ` ` | Space-separated list of `IP:PORT` for proxies. |
| `default proxy api key` | ` ` | The `API_KEY` of the proxy. |
diff --git a/streaming/compression.c b/streaming/compression.c
index 8f2517a8..6d4a128b 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -1,59 +1,44 @@
#include "rrdpush.h"
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
#include "lz4.h"
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
-// signature MUST end with a newline
-#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
-#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
-#define SIGNATURE_SIZE 4
-
-
-/*
- * LZ4 streaming API compressor specific data
- */
-struct compressor_data {
- LZ4_stream_t *stream;
- char *input_ring_buffer;
- size_t input_ring_buffer_size;
- size_t input_ring_buffer_pos;
-};
-
-
/*
* Reset compressor state for a new stream
*/
-static void lz4_compressor_reset(struct compressor_state *state)
-{
- if (state->data) {
- if (state->data->stream) {
- LZ4_resetStream_fast(state->data->stream);
- internal_error(true, "%s: compressor reset", STREAM_COMPRESSION_MSG);
- }
- state->data->input_ring_buffer_pos = 0;
+void rrdpush_compressor_reset(struct compressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+
+ state->stream.lz4_stream = LZ4_createStream();
+ state->stream.input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2);
+ state->stream.input_ring_buffer = callocz(1, state->stream.input_ring_buffer_size);
+ state->compression_result_buffer_size = 0;
}
+
+ LZ4_resetStream_fast(state->stream.lz4_stream);
+
+ state->stream.input_ring_buffer_pos = 0;
}
/*
* Destroy compressor state and all related data
*/
-static void lz4_compressor_destroy(struct compressor_state **state)
-{
- if (state && *state) {
- struct compressor_state *s = *state;
- if (s->data) {
- if (s->data->stream)
- LZ4_freeStream(s->data->stream);
- freez(s->data->input_ring_buffer);
- freez(s->data);
- }
- freez(s->compression_result_buffer);
- freez(s);
- *state = NULL;
- debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
+void rrdpush_compressor_destroy(struct compressor_state *state) {
+ if (state->stream.lz4_stream) {
+ LZ4_freeStream(state->stream.lz4_stream);
+ state->stream.lz4_stream = NULL;
}
+
+ freez(state->stream.input_ring_buffer);
+ state->stream.input_ring_buffer = NULL;
+
+ freez(state->compression_result_buffer);
+ state->compression_result_buffer = NULL;
+
+ state->initialized = false;
}
/*
@@ -62,18 +47,18 @@ static void lz4_compressor_destroy(struct compressor_state **state)
* Return the size of compressed data block as result and the pointer to internal buffer using the last argument
* or 0 in case of error
*/
-static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
-{
+size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out) {
if(unlikely(!state || !size || !out))
return 0;
if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) {
- error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE);
+ netdata_log_error("RRDPUSH COMPRESS: Compression Failed - Message size %lu above compression buffer limit: %d",
+ (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE);
return 0;
}
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
- size_t data_size = max_dst_size + SIGNATURE_SIZE;
+ size_t data_size = max_dst_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
if (!state->compression_result_buffer) {
state->compression_result_buffer = mallocz(data_size);
@@ -85,238 +70,112 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
}
// the ring buffer always has space for LZ4_MAX_MSG_SIZE
- memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size);
+ memcpy(state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, data, size);
// this call needs the last 64K of our previous data
// they are available in the ring buffer
long int compressed_data_size = LZ4_compress_fast_continue(
- state->data->stream,
- state->data->input_ring_buffer + state->data->input_ring_buffer_pos,
- state->compression_result_buffer + SIGNATURE_SIZE,
- size,
- max_dst_size,
+ state->stream.lz4_stream,
+ state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos,
+ state->compression_result_buffer + RRDPUSH_COMPRESSION_SIGNATURE_SIZE,
+ (int)size,
+ (int)max_dst_size,
1);
if (compressed_data_size < 0) {
- error("Data compression error: %ld", compressed_data_size);
+ netdata_log_error("Data compression error: %ld", compressed_data_size);
return 0;
}
// update the next writing position of the ring buffer
- state->data->input_ring_buffer_pos += size;
- if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE))
- state->data->input_ring_buffer_pos = 0;
+ state->stream.input_ring_buffer_pos += size;
+ if(unlikely(state->stream.input_ring_buffer_pos >= state->stream.input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE))
+ state->stream.input_ring_buffer_pos = 0;
// update the signature header
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
- *(uint32_t *)state->compression_result_buffer = len | SIGNATURE;
+ *(uint32_t *)state->compression_result_buffer = len | RRDPUSH_COMPRESSION_SIGNATURE;
*out = state->compression_result_buffer;
- debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
- return compressed_data_size + SIGNATURE_SIZE;
-}
-
-/*
- * Create and initialize compressor state
- * Return the pointer to compressor_state structure created
- */
-struct compressor_state *create_compressor()
-{
- struct compressor_state *state = callocz(1, sizeof(struct compressor_state));
-
- state->reset = lz4_compressor_reset;
- state->compress = lz4_compressor_compress;
- state->destroy = lz4_compressor_destroy;
-
- state->data = callocz(1, sizeof(struct compressor_data));
- state->data->stream = LZ4_createStream();
- state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2);
- state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size);
- state->compression_result_buffer_size = 0;
- state->reset(state);
- debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
- return state;
-}
-
-/*
- * LZ4 streaming API decompressor specific data
- */
-struct decompressor_stream {
- LZ4_streamDecode_t *lz4_stream;
- char *buffer;
- size_t size;
- size_t write_at;
- size_t read_at;
-};
-
-/*
- * Reset decompressor state for a new stream
- */
-static void lz4_decompressor_reset(struct decompressor_state *state)
-{
- if (state->stream) {
- if (state->stream->lz4_stream)
- LZ4_setStreamDecode(state->stream->lz4_stream, NULL, 0);
-
- state->stream->write_at = 0;
- state->stream->read_at = 0;
- }
-}
-
-/*
- * Destroy decompressor state and all related data
- */
-static void lz4_decompressor_destroy(struct decompressor_state **state)
-{
- if (state && *state) {
- struct decompressor_state *s = *state;
- if (s->stream) {
- debug(D_STREAM, "%s: Destroying decompressor.", STREAM_COMPRESSION_MSG);
- if (s->stream->lz4_stream)
- LZ4_freeStreamDecode(s->stream->lz4_stream);
- freez(s->stream->buffer);
- freez(s->stream);
- }
- freez(s);
- *state = NULL;
- }
-}
-
-static size_t decode_compress_header(const char *data, size_t data_size) {
- if (unlikely(!data || !data_size))
- return 0;
-
- if (unlikely(data_size != SIGNATURE_SIZE))
- return 0;
-
- uint32_t sign = *(uint32_t *)data;
- if (unlikely((sign & SIGNATURE_MASK) != SIGNATURE))
- return 0;
-
- size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
- return length;
-}
-
-/*
- * Start the collection of compressed data in an internal buffer
- * Return the size of compressed data or 0 for uncompressed data
- */
-static size_t lz4_decompressor_start(struct decompressor_state *state __maybe_unused, const char *header, size_t header_size) {
- if(unlikely(state->stream->read_at != state->stream->write_at))
- fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!"
- , STREAM_COMPRESSION_MSG);
-
- return decode_compress_header(header, header_size);
+ netdata_log_debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
+ return compressed_data_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
}
/*
* Decompress the compressed data in the internal buffer
* Return the size of uncompressed data or 0 for error
*/
-static size_t lz4_decompressor_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
if (unlikely(!state || !compressed_data || !compressed_size))
return 0;
- if(unlikely(state->stream->read_at != state->stream->write_at))
- fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!"
- , STREAM_COMPRESSION_MSG);
+ if(unlikely(state->stream.read_at != state->stream.write_at))
+ fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
- if (unlikely(state->stream->write_at >= state->stream->size / 2)) {
- state->stream->write_at = 0;
- state->stream->read_at = 0;
+ if (unlikely(state->stream.write_at >= state->stream.size / 2)) {
+ state->stream.write_at = 0;
+ state->stream.read_at = 0;
}
long int decompressed_size = LZ4_decompress_safe_continue(
- state->stream->lz4_stream
+ state->stream.lz4_stream
, compressed_data
- , state->stream->buffer + state->stream->write_at
+ , state->stream.buffer + state->stream.write_at
, (int)compressed_size
- , (int)(state->stream->size - state->stream->write_at)
+ , (int)(state->stream.size - state->stream.write_at)
);
if (unlikely(decompressed_size < 0)) {
- error("%s: decompressor returned negative decompressed bytes: %ld", STREAM_COMPRESSION_MSG, decompressed_size);
+ netdata_log_error("RRDPUSH DECOMPRESS: decompressor returned negative decompressed bytes: %ld", decompressed_size);
return 0;
}
- if(unlikely(decompressed_size + state->stream->write_at > state->stream->size))
- fatal("%s: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu"
- , STREAM_COMPRESSION_MSG
- , state->stream->size
- , state->stream->write_at
+ if(unlikely(decompressed_size + state->stream.write_at > state->stream.size))
+ fatal("RRDPUSH DECOMPRESS: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, "
+ "exceeding the buffer by %zu"
+ , state->stream.size
+ , state->stream.write_at
, decompressed_size
- , (size_t)(state->stream->write_at + decompressed_size - state->stream->size)
+ , (size_t)(state->stream.write_at + decompressed_size - state->stream.size)
);
- state->stream->write_at += decompressed_size;
+ state->stream.write_at += decompressed_size;
// statistics
- state->total_compressed += compressed_size + SIGNATURE_SIZE;
+ state->total_compressed += compressed_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
state->total_uncompressed += decompressed_size;
state->packet_count++;
return decompressed_size;
}
-/*
- * Return the size of uncompressed data left in the internal buffer or 0 for error
- */
-static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state) {
- if(unlikely(state->stream->read_at > state->stream->write_at))
- fatal("%s: invalid read/write stream positions"
- , STREAM_COMPRESSION_MSG);
-
- return state->stream->write_at - state->stream->read_at;
-}
+void rrdpush_decompressor_reset(struct decompressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream.lz4_stream = LZ4_createStreamDecode();
+ state->stream.size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2;
+ state->stream.buffer = mallocz(state->stream.size);
+ }
-/*
- * Fill the buffer provided with uncompressed data from the internal buffer
- * Return the size of uncompressed data copied or 0 for error
- */
-static size_t lz4_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
- if (unlikely(!state || !size || !dst))
- return 0;
+ LZ4_setStreamDecode(state->stream.lz4_stream, NULL, 0);
- size_t remaining = lz4_decompressor_decompressed_bytes_in_buffer(state);
- if(unlikely(!remaining))
- return 0;
+ state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
+ state->stream.write_at = 0;
+ state->stream.read_at = 0;
+}
- size_t bytes_to_return = size;
- if(bytes_to_return > remaining)
- bytes_to_return = remaining;
+void rrdpush_decompressor_destroy(struct decompressor_state *state) {
+ if(unlikely(!state->initialized))
+ return;
- memcpy(dst, state->stream->buffer + state->stream->read_at, bytes_to_return);
- state->stream->read_at += bytes_to_return;
+ if (state->stream.lz4_stream) {
+ LZ4_freeStreamDecode(state->stream.lz4_stream);
+ state->stream.lz4_stream = NULL;
+ }
- if(unlikely(state->stream->read_at > state->stream->write_at))
- fatal("%s: invalid read/write stream positions"
- , STREAM_COMPRESSION_MSG);
+ freez(state->stream.buffer);
+ state->stream.buffer = NULL;
- return bytes_to_return;
+ state->initialized = false;
}
-/*
- * Create and initialize decompressor state
- * Return the pointer to decompressor_state structure created
- */
-struct decompressor_state *create_decompressor()
-{
- struct decompressor_state *state = callocz(1, sizeof(struct decompressor_state));
- state->signature_size = SIGNATURE_SIZE;
- state->reset = lz4_decompressor_reset;
- state->start = lz4_decompressor_start;
- state->decompress = lz4_decompressor_decompress;
- state->get = lz4_decompressor_get;
- state->decompressed_bytes_in_buffer = lz4_decompressor_decompressed_bytes_in_buffer;
- state->destroy = lz4_decompressor_destroy;
-
- state->stream = callocz(1, sizeof(struct decompressor_stream));
- fatal_assert(state->stream);
- state->stream->lz4_stream = LZ4_createStreamDecode();
- state->stream->size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2;
- state->stream->buffer = mallocz(state->stream->size);
- fatal_assert(state->stream->buffer);
- state->reset(state);
- debug(D_STREAM, "%s: Initialize streaming decompression!", STREAM_COMPRESSION_MSG);
- return state;
-}
#endif
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 709f15bd..3ff022e9 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -2,17 +2,6 @@
#include "rrdpush.h"
-// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
-#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
-#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2)
-
-// this has to be the same at parser.h
-#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
-
-#if WORKER_PARSER_FIRST_JOB < 1
-#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
-#endif
-
extern struct config stream_config;
void receiver_state_free(struct receiver_state *rpt) {
@@ -39,13 +28,12 @@ void receiver_state_free(struct receiver_state *rpt) {
close(rpt->fd);
}
-#ifdef ENABLE_COMPRESSION
- if (rpt->decompressor)
- rpt->decompressor->destroy(&rpt->decompressor);
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ rrdpush_decompressor_destroy(&rpt->decompressor);
#endif
if(rpt->system_info)
- rrdhost_system_info_free(rpt->system_info);
+ rrdhost_system_info_free(rpt->system_info);
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
@@ -54,125 +42,96 @@ void receiver_state_free(struct receiver_state *rpt) {
#include "collectors/plugins.d/pluginsd_parser.h"
-PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
-{
- const char *host_uuid_str = get_word(words, num_words, 1);
- const char *claim_id_str = get_word(words, num_words, 2);
-
- if (!host_uuid_str || !claim_id_str) {
- error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
- host_uuid_str ? host_uuid_str : "[unset]",
- claim_id_str ? claim_id_str : "[unset]");
- return PARSER_RC_ERROR;
- }
-
- uuid_t uuid;
- RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
-
- // We don't need the parsed UUID
- // just do it to check the format
- if(uuid_parse(host_uuid_str, uuid)) {
- error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
- return PARSER_RC_ERROR;
- }
- if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) {
- error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
- return PARSER_RC_ERROR;
- }
-
- if(strcmp(host_uuid_str, host->machine_guid)) {
- error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
- return PARSER_RC_OK; //the message is OK problem must be somewhere else
- }
-
- rrdhost_aclk_state_lock(host);
- if (host->aclk_state.claimed_id)
- freez(host->aclk_state.claimed_id);
- host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
- rrdhost_aclk_state_unlock(host);
-
- rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
+// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
+#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
+#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2)
- rrdpush_claimed_id(host);
+// this has to be the same at parser.h
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
- return PARSER_RC_OK;
-}
+#if WORKER_PARSER_FIRST_JOB < 1
+#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
+#endif
-static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
+static inline int read_stream(struct receiver_state *r, char* buffer, size_t size) {
if(unlikely(!size)) {
internal_error(true, "%s() asked to read zero bytes", __FUNCTION__);
return 0;
}
+ int tries = 100;
ssize_t bytes_read;
+ do {
+ errno = 0;
+
#ifdef ENABLE_HTTPS
- if (SSL_connection(&r->ssl))
- bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
- else
- bytes_read = read(r->fd, buffer, size);
+ if (SSL_connection(&r->ssl))
+ bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
+ else
+ bytes_read = read(r->fd, buffer, size);
#else
- bytes_read = read(r->fd, buffer, size);
+ bytes_read = read(r->fd, buffer, size);
#endif
+ } while(bytes_read < 0 && errno == EINTR && tries--);
+
if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
- error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
+ netdata_log_error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
bytes_read = -3;
}
else if (bytes_read == 0) {
- error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
+ netdata_log_error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
bytes_read = -1;
}
else if (bytes_read < 0) {
- error("STREAM: %s() failed to read from socket!", __FUNCTION__);
+ netdata_log_error("STREAM: %s() failed to read from socket!", __FUNCTION__);
bytes_read = -2;
}
return (int)bytes_read;
}
-static bool receiver_read_uncompressed(struct receiver_state *r) {
+static inline bool receiver_read_uncompressed(struct receiver_state *r) {
#ifdef NETDATA_INTERNAL_CHECKS
- if(r->read_buffer[r->read_len] != '\0')
+ if(r->reader.read_buffer[r->reader.read_len] != '\0')
fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
#endif
- int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
+ int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
if(unlikely(bytes_read <= 0))
return false;
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
- r->read_len += bytes_read;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += bytes_read;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
return true;
}
-#ifdef ENABLE_COMPRESSION
-static bool receiver_read_compressed(struct receiver_state *r) {
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+static inline bool receiver_read_compressed(struct receiver_state *r) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if(r->read_buffer[r->read_len] != '\0')
- fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ );
-#endif
+ internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
+ "%s: read_buffer does not start with zero #2", __FUNCTION__ );
// first use any available uncompressed data
- if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
- size_t available = sizeof(r->read_buffer) - r->read_len - 1;
- if (available) {
- size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available);
- if (!len) {
+ if (likely(rrdpush_decompressed_bytes_in_buffer(&r->decompressor))) {
+ size_t available = sizeof(r->reader.read_buffer) - r->reader.read_len - 1;
+ if (likely(available)) {
+ size_t len = rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, available);
+ if (unlikely(!len)) {
internal_error(true, "decompressor returned zero length #1");
return false;
}
- r->read_len += (int)len;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += (int)len;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
}
else
- internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len);
+ internal_fatal(true, "The line to read is too big! Already have %zd bytes in read_buffer.", r->reader.read_len);
return true;
}
@@ -180,8 +139,9 @@ static bool receiver_read_compressed(struct receiver_state *r) {
// no decompressed data available
// read the compression signature of the next block
- if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) {
- internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len);
+ if(unlikely(r->reader.read_len + r->decompressor.signature_size > sizeof(r->reader.read_buffer) - 1)) {
+ internal_error(true, "The last incomplete line does not leave enough room for the next compression header! "
+ "Already have %zd bytes in read_buffer.", r->reader.read_len);
return false;
}
@@ -189,34 +149,34 @@ static bool receiver_read_compressed(struct receiver_state *r) {
// we have to do a loop here, because read_stream() may return less than the data we need
int bytes_read = 0;
do {
- int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read);
+ int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read);
if (unlikely(ret <= 0))
return false;
bytes_read += ret;
- } while(unlikely(bytes_read < (int)r->decompressor->signature_size));
+ } while(unlikely(bytes_read < (int)r->decompressor.signature_size));
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
- if(unlikely(bytes_read != (int)r->decompressor->signature_size))
- fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size);
+ if(unlikely(bytes_read != (int)r->decompressor.signature_size))
+ fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor.signature_size);
- size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read);
+ size_t compressed_message_size = rrdpush_decompressor_start(&r->decompressor, r->reader.read_buffer + r->reader.read_len, bytes_read);
if (unlikely(!compressed_message_size)) {
internal_error(true, "multiplexed uncompressed data in compressed stream!");
- r->read_len += bytes_read;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += bytes_read;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
return true;
}
if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) {
- error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.",
+ netdata_log_error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.",
compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE);
return false;
}
// delete compression header from our read buffer
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_buffer[r->reader.read_len] = '\0';
// Read the entire compressed block of compressed data
char compressed[compressed_message_size];
@@ -238,8 +198,8 @@ static bool receiver_read_compressed(struct receiver_state *r) {
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read);
// decompress the compressed block
- size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read);
- if (!bytes_to_parse) {
+ size_t bytes_to_parse = rrdpush_decompress(&r->decompressor, compressed, compressed_bytes_read);
+ if (unlikely(!bytes_to_parse)) {
internal_error(true, "no bytes to parse.");
return false;
}
@@ -247,38 +207,38 @@ static bool receiver_read_compressed(struct receiver_state *r) {
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse);
// fill read buffer with decompressed data
- size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
- if (!len) {
+ size_t len = (int) rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
+ if (unlikely(!len)) {
internal_error(true, "decompressor returned zero length #2");
return false;
}
- r->read_len += (int)len;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += (int)len;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
return true;
}
-#else // !ENABLE_COMPRESSION
-static bool receiver_read_compressed(struct receiver_state *r) {
+#else // !ENABLE_RRDPUSH_COMPRESSION
+static inline bool receiver_read_compressed(struct receiver_state *r) {
return receiver_read_uncompressed(r);
}
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) {
- size_t start = *pos;
+inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) {
+ size_t start = reader->pos;
- char *ss = &r->read_buffer[start];
- char *se = &r->read_buffer[r->read_len];
- char *ds = buffer;
- char *de = &buffer[buffer_length - 2];
+ char *ss = &reader->read_buffer[start];
+ char *se = &reader->read_buffer[reader->read_len];
+ char *ds = dst;
+ char *de = &dst[dst_size - 2];
if(ss >= se) {
*ds = '\0';
- *pos = 0;
- r->read_len = 0;
- r->read_buffer[r->read_len] = '\0';
+ reader->pos = 0;
+ reader->read_len = 0;
+ reader->read_buffer[reader->read_len] = '\0';
return NULL;
}
@@ -293,44 +253,73 @@ static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t b
*ds++ = *ss++; // copy the newline too
*ds = '\0';
- *pos = ss - r->read_buffer;
- return buffer;
+ reader->pos = ss - reader->read_buffer;
+ return dst;
}
// if the destination is full, oops!
if(ds == de) {
- error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
+ netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
*ds = '\0';
- *pos = ss - r->read_buffer;
- return buffer;
+ reader->pos = ss - reader->read_buffer;
+ return dst;
}
// no newline found in the r->read_buffer
// move everything to the beginning
- memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
- r->read_len -= (int)start;
- r->read_buffer[r->read_len] = '\0';
+ memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start);
+ reader->read_len -= (int)start;
+ reader->read_buffer[reader->read_len] = '\0';
*ds = '\0';
- *pos = 0;
+ reader->pos = 0;
return NULL;
}
bool plugin_is_enabled(struct plugind *cd);
+static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) {
+ if(force || !rpt->exit.reason)
+ rpt->exit.reason = reason;
+}
+
+static inline bool receiver_should_stop(struct receiver_state *rpt) {
+ static __thread size_t counter = 0;
+
+ if(unlikely(rpt->exit.shutdown)) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false);
+ return true;
+ }
+
+ if(unlikely(!service_running(SERVICE_STREAMING))) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, false);
+ return true;
+ }
+
+ if(unlikely((counter++ % 1000) == 0)) {
+ // check every 1000 lines read
+ netdata_thread_testcancel();
+ rpt->last_msg_t = now_monotonic_sec();
+ }
+
+ return false;
+}
+
static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
- size_t result;
-
- PARSER_USER_OBJECT user = {
- .enabled = plugin_is_enabled(cd),
- .host = rpt->host,
- .opaque = rpt,
- .cd = cd,
- .trust_durations = 1,
- .capabilities = rpt->capabilities,
- };
+ size_t result = 0;
- PARSER *parser = parser_init(&user, NULL, NULL, fd,
- PARSER_INPUT_SPLIT, ssl);
+ PARSER *parser = NULL;
+ {
+ PARSER_USER_OBJECT user = {
+ .enabled = plugin_is_enabled(cd),
+ .host = rpt->host,
+ .opaque = rpt,
+ .cd = cd,
+ .trust_durations = 1,
+ .capabilities = rpt->capabilities,
+ };
+
+ parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+ }
pluginsd_keywords_init(parser, PARSER_INIT_STREAMING);
@@ -340,72 +329,41 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
-
- user.parser = parser;
-
bool compressed_connection = false;
-#ifdef ENABLE_COMPRESSION
+
+#ifdef ENABLE_RRDPUSH_COMPRESSION
if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
compressed_connection = true;
-
- if (!rpt->decompressor)
- rpt->decompressor = create_decompressor();
- else
- rpt->decompressor->reset(rpt->decompressor);
+ rrdpush_decompressor_reset(&rpt->decompressor);
}
+ else
+ rrdpush_decompressor_destroy(&rpt->decompressor);
#endif
- rpt->read_buffer[0] = '\0';
- rpt->read_len = 0;
+ buffered_reader_init(&rpt->reader);
- size_t read_buffer_start = 0;
char buffer[PLUGINSD_LINE_MAX + 2] = "";
- while(service_running(SERVICE_STREAMING)) {
- netdata_thread_testcancel();
+ while(!receiver_should_stop(rpt)) {
- if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) {
- bool have_new_data;
- if(likely(compressed_connection))
- have_new_data = receiver_read_compressed(rpt);
- else
- have_new_data = receiver_read_uncompressed(rpt);
+ if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) {
+ bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
if(unlikely(!have_new_data)) {
- if(!rpt->exit.reason)
- rpt->exit.reason = "SOCKET READ ERROR";
-
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false);
break;
}
- rpt->last_msg_t = now_realtime_sec();
continue;
}
- if(unlikely(!service_running(SERVICE_STREAMING))) {
- if(!rpt->exit.reason)
- rpt->exit.reason = "NETDATA EXIT";
- goto done;
- }
- if(unlikely(rpt->exit.shutdown)) {
- if(!rpt->exit.reason)
- rpt->exit.reason = "SHUTDOWN REQUESTED";
-
- goto done;
- }
-
if (unlikely(parser_action(parser, buffer))) {
internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
-
- if(!rpt->exit.reason)
- rpt->exit.reason = "PARSER FAILED";
-
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
break;
}
}
-done:
- result = user.data_collections_count;
+ result = parser->user.data_collections_count;
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -423,67 +381,18 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) {
rrdhost_receiver_replicating_charts_zero(host);
}
-void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
- size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1;
-
- netdata_mutex_lock(&host->receiver_lock);
-
- buffer_json_member_add_object(wb, key);
- buffer_json_member_add_uint64(wb, "hops", receiver_hops);
-
- bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
- buffer_json_member_add_boolean(wb, "online", online);
-
- if(host->child_connect_time || host->child_disconnected_time) {
- time_t since = MAX(host->child_connect_time, host->child_disconnected_time);
- buffer_json_member_add_time_t(wb, "since", since);
- buffer_json_member_add_time_t(wb, "age", now - since);
- }
-
- if(!online && host->rrdpush_last_receiver_exit_reason)
- buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason);
-
- if(host != localhost && host->receiver) {
- buffer_json_member_add_object(wb, "replication");
- {
- size_t instances = rrdhost_receiver_replicating_charts(host);
- buffer_json_member_add_boolean(wb, "in_progress", instances);
- buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent);
- buffer_json_member_add_uint64(wb, "instances", instances);
- }
- buffer_json_object_close(wb); // replication
-
- buffer_json_member_add_object(wb, "source");
- {
-
- char buf[1024 + 1];
- SOCKET_PEERS peers = socket_peers(host->receiver->fd);
- bool ssl = SSL_connection(&host->receiver->ssl);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "local", buf);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "remote", buf);
-
- stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities");
- }
- buffer_json_object_close(wb); // source
- }
- buffer_json_object_close(wb); // collection
-
- netdata_mutex_unlock(&host->receiver_lock);
-}
-
static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
bool signal_rrdcontext = false;
bool set_this = false;
netdata_mutex_lock(&host->receiver_lock);
- if (!host->receiver || host->receiver == rpt) {
+ if (!host->receiver) {
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->rrdpush_receiver_connection_counter++;
+ __atomic_add_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
+
host->receiver = rpt;
rpt->host = host;
@@ -495,13 +404,15 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
if (rpt->config.alarms_delay > 0) {
host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
- log_health(
+ netdata_log_health(
"[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
rrdhost_hostname(host),
(int64_t) rpt->config.alarms_delay);
}
}
+ host->health_log.health_log_history = rpt->config.alarms_history;
+
// this is a test
// if(rpt->hops <= host->sender->hops)
// rrdpush_sender_thread_stop(host, "HOPS MISMATCH", false);
@@ -534,6 +445,9 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
// Make sure that we detach this thread and don't kill a freshly arriving receiver
if(host->receiver == rpt) {
+ __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
host->trigger_chart_obsoletion_check = 0;
host->child_connect_time = 0;
host->child_disconnected_time = now_realtime_sec();
@@ -541,7 +455,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO)
host->health.health_enabled = 0;
- rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false);
+ rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false);
signal_rrdcontext = true;
rrdpush_receiver_replication_reset(host);
@@ -560,7 +474,7 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
}
}
-bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
+bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) {
bool ret = false;
netdata_mutex_lock(&host->receiver_lock);
@@ -568,7 +482,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
if(host->receiver) {
if(!host->receiver->exit.shutdown) {
host->receiver->exit.shutdown = true;
- host->receiver->exit.reason = reason;
+ receiver_set_exit_reason(host->receiver, reason, true);
shutdown(host->receiver->fd, SHUT_RDWR);
}
@@ -586,7 +500,7 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
}
if(host->receiver)
- error("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
"thread %d takes too long to stop, giving up..."
, rrdhost_hostname(host)
, host->receiver->client_ip, host->receiver->client_port
@@ -619,25 +533,20 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con
(rpt->hostname && *rpt->hostname) ? rpt->hostname : "-",
status);
- info("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"%s. "
"STATUS: %s%s%s%s"
, rpt->hostname
, rpt->client_ip, rpt->client_port
, msg
, status
- , rpt->exit.reason?" (":""
- , rpt->exit.reason?rpt->exit.reason:""
- , rpt->exit.reason?")":""
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
+ , stream_handshake_error_to_string(rpt->exit.reason)
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
);
}
-static void rrdhost_reset_destinations(RRDHOST *host) {
- for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
- d->postpone_reconnection_until = 0;
-}
-
static void rrdpush_receive(struct receiver_state *rpt)
{
rpt->config.mode = default_rrd_memory_mode;
@@ -645,6 +554,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.health_enabled = (int)default_health_enabled;
rpt->config.alarms_delay = 60;
+ rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY;
rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled;
rpt->config.rrdpush_destination = default_rrdpush_destination;
@@ -666,7 +576,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode)));
if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
- error("STREAM '%s' [receive from %s:%s]: "
+ netdata_log_error("STREAM '%s' [receive from %s:%s]: "
"dbengine is not enabled, falling back to default."
, rpt->hostname
, rpt->client_ip, rpt->client_port
@@ -681,6 +591,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay);
rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay);
+ rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->key, "default health log history", rpt->config.alarms_history);
+ rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->machine_guid, "health log history", rpt->config.alarms_history);
+
rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled);
rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled);
@@ -702,12 +615,11 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
-#ifdef ENABLE_COMPRESSION
- rpt->config.rrdpush_compression = default_compression_enabled;
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ rpt->config.rrdpush_compression = default_rrdpush_compression_enabled;
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
- rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled);
-#endif //ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
@@ -763,9 +675,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
}
#ifdef NETDATA_INTERNAL_CHECKS
- info("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"client willing to stream metrics for host '%s' with machine_guid '%s': "
- "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
+ "update every = %d, history = %d, memory mode = %s, health %s,%s tags '%s'"
, rpt->hostname
, rpt->client_ip
, rpt->client_port
@@ -801,15 +713,15 @@ static void rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
- if (!rpt->rrdpush_compression)
+ if (!rpt->config.rrdpush_compression)
rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
}
-#endif
+#endif // ENABLE_RRDPUSH_COMPRESSION
{
- // info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ // netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
char initial_response[HTTP_HEADER_SIZE];
if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
log_receiver_capabilities(rpt);
@@ -828,7 +740,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
}
- debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
+ netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
ssize_t bytes_sent = send_timeout(
#ifdef ENABLE_HTTPS
&rpt->ssl,
@@ -845,7 +757,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
{
// remove the non-blocking flag from the socket
if(sock_delnonblock(rpt->fd) < 0)
- error("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
"cannot remove the non-blocking flag from socket %d"
, rrdhost_hostname(rpt->host)
, rpt->client_ip, rpt->client_port
@@ -855,7 +767,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
timeout.tv_sec = 600;
timeout.tv_usec = 0;
if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
- error("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
"cannot set timeout for socket %d"
, rrdhost_hostname(rpt->host)
, rpt->client_ip, rpt->client_port
@@ -867,14 +779,14 @@ static void rrdpush_receive(struct receiver_state *rpt)
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
- if (netdata_cloud_setting)
+ if (netdata_cloud_enabled)
aclk_host_state_update(rpt->host, 1);
#endif
- rrdhost_set_is_parent_label(++localhost->connected_children_count);
+ rrdhost_set_is_parent_label();
// let it reconnect to parent immediately
- rrdhost_reset_destinations(rpt->host);
+ rrdpush_reset_destinations_postpone_time(rpt->host);
size_t count = streaming_parser(rpt, &cd, rpt->fd,
#ifdef ENABLE_HTTPS
@@ -884,10 +796,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
#endif
);
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
-
- if(!rpt->exit.reason)
- rpt->exit.reason = "PARSER EXIT";
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false);
{
char msg[100 + 1];
@@ -898,12 +807,10 @@ static void rrdpush_receive(struct receiver_state *rpt)
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// a child disconnected
- if (netdata_cloud_setting)
+ if (netdata_cloud_enabled)
aclk_host_state_update(rpt->host, 0);
#endif
- rrdhost_set_is_parent_label(--localhost->connected_children_count);
-
cleanup:
;
}
@@ -914,13 +821,15 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
rrdhost_clear_receiver(rpt);
- info("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"receive thread ended (task id %d)"
, rpt->hostname ? rpt->hostname : "-"
, rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-"
, gettid());
receiver_state_free(rpt);
+
+ rrdhost_set_is_parent_label();
}
void *rrdpush_receiver_thread(void *ptr) {
@@ -933,7 +842,7 @@ void *rrdpush_receiver_thread(void *ptr) {
struct receiver_state *rpt = (struct receiver_state *)ptr;
rpt->tid = gettid();
- info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
+ netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
rrdpush_receive(rpt);
diff --git a/streaming/replication.c b/streaming/replication.c
index c6fafc35..0e5a0b40 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -40,9 +40,9 @@ static struct replication_query_statistics replication_queries = {
};
struct replication_query_statistics replication_get_query_statistics(void) {
- netdata_spinlock_lock(&replication_queries.spinlock);
+ spinlock_lock(&replication_queries.spinlock);
struct replication_query_statistics ret = replication_queries;
- netdata_spinlock_unlock(&replication_queries.spinlock);
+ spinlock_unlock(&replication_queries.spinlock);
return ret;
}
@@ -144,7 +144,7 @@ static struct replication_query *replication_query_prepare(
}
if(q->query.enable_streaming) {
- netdata_spinlock_lock(&st->data_collection_lock);
+ spinlock_lock(&st->data_collection_lock);
q->query.locked_data_collection = true;
if (st->last_updated.tv_sec > q->query.before) {
@@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare(
size_t count = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd)))
continue;
if (unlikely(rd_dfe.counter >= q->dimensions)) {
@@ -198,7 +198,7 @@ static struct replication_query *replication_query_prepare(
q->query.execute = false;
if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&st->data_collection_lock);
+ spinlock_unlock(&st->data_collection_lock);
q->query.locked_data_collection = false;
}
@@ -216,20 +216,20 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STRE
NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
rrddim_foreach_read(rd, st){
- if (!rd->exposed) continue;
+ if (!rrddim_check_exposed(rd)) continue;
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC +
- (usec_t) rd->last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->collector.last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_int64_encoded(wb, encoding, rd->last_collected_value);
+ buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value);
buffer_fast_strcat(wb, "\n", 1);
}
rrddim_foreach_done(rd);
@@ -248,7 +248,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&q->st->data_collection_lock);
+ spinlock_unlock(&q->st->data_collection_lock);
q->query.locked_data_collection = false;
}
@@ -269,7 +269,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
}
if(executed) {
- netdata_spinlock_lock(&replication_queries.spinlock);
+ spinlock_lock(&replication_queries.spinlock);
replication_queries.queries_started += queries;
replication_queries.queries_finished += queries;
replication_queries.points_read += q->points_read;
@@ -280,7 +280,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
s->replication.latest_completed_before_t = q->query.before;
}
- netdata_spinlock_unlock(&replication_queries.spinlock);
+ spinlock_unlock(&replication_queries.spinlock);
}
__atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
@@ -678,7 +678,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
}
if(locked_data_collection)
- netdata_spinlock_unlock(&st->data_collection_lock);
+ spinlock_unlock(&st->data_collection_lock);
return enable_streaming;
}
@@ -797,9 +797,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
(unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
- int ret = r->caller.callback(buffer, r->caller.data);
+ ssize_t ret = r->caller.callback(buffer, r->caller.data);
if (ret < 0) {
- error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+ netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)",
rrdhost_hostname(r->host), rrdset_id(r->st), ret);
return false;
}
@@ -1056,11 +1056,11 @@ static inline bool replication_recursive_lock_mode(char mode) {
if(mode == 'L') { // (L)ock
if(++recursions == 1)
- netdata_spinlock_lock(&replication_globals.spinlock);
+ spinlock_lock(&replication_globals.spinlock);
}
else if(mode == 'U') { // (U)nlock
if(--recursions == 0)
- netdata_spinlock_unlock(&replication_globals.spinlock);
+ spinlock_unlock(&replication_globals.spinlock);
}
else if(mode == 'C') { // (C)heck
if(recursions > 0)
@@ -1096,7 +1096,7 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) {
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
__atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
@@ -1120,7 +1120,7 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
}
static void replication_sort_entry_add(struct replication_request *rq) {
- if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
+ if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
rq->not_indexed_preprocessing = false;
@@ -1606,7 +1606,7 @@ static void verify_all_hosts_charts_are_streaming_now(void) {
dfe_done(host);
size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
- info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
+ netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
executed - replication_globals.main_thread.last_executed, errors);
replication_globals.main_thread.last_executed = executed;
}
@@ -1860,7 +1860,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
- error("replication threads given %d is invalid, resetting to 1", threads);
+ netdata_log_error("replication threads given %d is invalid, resetting to 1", threads);
threads = 1;
}
diff --git a/streaming/replication.h b/streaming/replication.h
index f5b64706..507b7c32 100644
--- a/streaming/replication.h
+++ b/streaming/replication.h
@@ -17,7 +17,7 @@ struct replication_query_statistics replication_get_query_statistics(void);
bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before);
-typedef int (*send_command)(const char *txt, void *data);
+typedef ssize_t (*send_command)(const char *txt, void *data);
bool replicate_chart_request(send_command callback, void *callback_data,
RRDHOST *rh, RRDSET *rs,
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index c481871c..67c43e41 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -39,8 +39,8 @@ struct config stream_config = {
};
unsigned int default_rrdpush_enabled = 0;
-#ifdef ENABLE_COMPRESSION
-unsigned int default_compression_enabled = 1;
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+unsigned int default_rrdpush_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
@@ -57,30 +57,47 @@ static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL)) {
- info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL))
- info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
}
-STREAM_CAPABILITIES stream_our_capabilities() {
- return STREAM_CAP_V1 |
- STREAM_CAP_V2 |
- STREAM_CAP_VN |
- STREAM_CAP_VCAPS |
- STREAM_CAP_HLABELS |
- STREAM_CAP_CLAIM |
- STREAM_CAP_CLABELS |
- STREAM_CAP_FUNCTIONS |
- STREAM_CAP_REPLICATION |
- STREAM_CAP_BINARY |
- STREAM_CAP_INTERPOLATED |
- STREAM_HAS_COMPRESSION |
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
+
+ // we can have DATA_WITH_ML when INTERPOLATED is available
+ bool ml_capability = true;
+
+ if(host && sender) {
+ // we have DATA_WITH_ML capability
+ // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
+ // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
+ ml_capability = false;
+
+ netdata_mutex_unlock(&host->receiver_lock);
+ }
+
+ return STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_HAS_COMPRESSION |
(ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
+ (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
0;
}
@@ -125,13 +142,13 @@ int rrdpush_init() {
rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
-#ifdef ENABLE_COMPRESSION
- default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
- "enable compression", default_compression_enabled);
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
+ "enable compression", default_rrdpush_compression_enabled);
#endif
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
- error("STREAM [send]: cannot enable sending thread - information is missing.");
+ netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
}
@@ -139,7 +156,7 @@ int rrdpush_init() {
netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
if(!netdata_ssl_validate_certificate_sender)
- info("SSL: streaming senders will skip SSL certificates verification.");
+ netdata_log_info("SSL: streaming senders will skip SSL certificates verification.");
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
@@ -247,7 +264,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the chart
buffer_sprintf(
wb
- , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
+ , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, rrdset_id(st)
, name
, rrdset_title(st)
@@ -274,7 +291,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
rrddim_foreach_read(rd, st) {
buffer_sprintf(
wb
- , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
+ , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
, rrddim_id(rd)
, rrddim_name(rd)
, rrd_algorithm_name(rd->algorithm)
@@ -284,7 +301,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
, rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
, rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rd->exposed = 1;
+ rrddim_set_exposed(rd);
}
rrddim_foreach_done(rd);
@@ -338,14 +355,14 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(unlikely(!rd->updated))
+ if(unlikely(!rrddim_check_updated(rd)))
continue;
- if(likely(rd->exposed)) {
+ if(likely(rrddim_check_exposed(rd))) {
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
- buffer_print_int64(wb, rd->collected_value);
+ buffer_print_int64(wb, rd->collector.collected_value);
buffer_fast_strcat(wb, "\n", 1);
}
else {
@@ -419,10 +436,10 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- if((NETDATA_DOUBLE)rd->last_collected_value == n)
+ if((NETDATA_DOUBLE)rd->collector.last_collected_value == n)
buffer_fast_strcat(wb, "#", 1);
else
buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
@@ -462,13 +479,13 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
- error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
- info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
@@ -504,6 +521,7 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR
buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
return 1;
}
+
void rrdpush_send_host_labels(RRDHOST *host) {
if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
|| !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
@@ -519,8 +537,23 @@ void rrdpush_send_host_labels(RRDHOST *host) {
sender_thread_buffer_free();
}
-void rrdpush_claimed_id(RRDHOST *host)
-{
+void rrdpush_send_global_functions(RRDHOST *host) {
+ if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ return;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ rrd_functions_expose_global_rrdpush(host, wb);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -555,7 +588,7 @@ int connect_to_one_of_destinations(
if(d->postpone_reconnection_until > now)
continue;
- info(
+ netdata_log_info(
"STREAM %s: connecting to '%s' (default port: %d)...",
rrdhost_hostname(host),
string2str(d->destination),
@@ -564,7 +597,8 @@ int connect_to_one_of_destinations(
if (reconnects_counter)
*reconnects_counter += 1;
- d->last_attempt = now;
+ d->since = now;
+ d->attempts++;
sock = connect_to_this(string2str(d->destination), default_port, timeout);
if (sock != -1) {
@@ -611,7 +645,7 @@ bool destinations_init_add_one(char *entry, void *data) {
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
t->count++;
- info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
+ netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
return false; // we return false, so that we will get all defined destinations
}
@@ -649,11 +683,11 @@ void rrdpush_destinations_free(RRDHOST *host) {
// Either the receiver lost the connection or the host is being destroyed.
// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
-void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) {
+void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait) {
if (!host->sender)
return;
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
@@ -664,42 +698,41 @@ void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) {
netdata_thread_cancel(host->rrdpush_sender_thread);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
if(wait) {
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
while(host->sender->tid) {
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
sleep_usec(10 * USEC_PER_MS);
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
}
-
// ----------------------------------------------------------------------------
// rrdpush receiver thread
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
- log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
+ netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
}
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
- error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
int rrdpush_receiver_permission_denied(struct web_client *w) {
@@ -750,7 +783,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
return rrdpush_receiver_too_busy_now(w);
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
- rpt->last_msg_t = now_realtime_sec();
+ rpt->last_msg_t = now_monotonic_sec();
rpt->capabilities = STREAM_CAP_INVALID;
rpt->hops = 1;
@@ -823,7 +856,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->tags = strdupz(value);
else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
- rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+ rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false);
else {
// An old Netdata child does not have a compatible streaming protocol, map to something sane.
@@ -846,10 +879,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
name = "NETDATA_HOST_OS_DETECTION";
else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
- rpt->capabilities = convert_stream_version_to_capabilities(1);
+ rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
- info("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"request has parameter '%s' = '%s', which is not used."
, (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
, rpt->client_ip, rpt->client_port
@@ -860,7 +893,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (rpt->capabilities & STREAM_CAP_INVALID)
// no version is supplied, assume version 0;
- rpt->capabilities = convert_stream_version_to_capabilities(0);
+ rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false);
// find the program name and version
if(w->user_agent && w->user_agent[0]) {
@@ -1042,7 +1075,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
#endif
rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
- error("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
"failed to reply."
, rpt->hostname
, rpt->client_ip, rpt->client_port
@@ -1058,13 +1091,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
static time_t last_stream_accepted_t = 0;
time_t now = now_realtime_sec();
- netdata_spinlock_lock(&spinlock);
+ spinlock_lock(&spinlock);
if(unlikely(last_stream_accepted_t == 0))
last_stream_accepted_t = now;
if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
char msg[100 + 1];
snprintfz(msg, 100,
@@ -1081,7 +1114,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
last_stream_accepted_t = now;
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
}
/*
@@ -1106,7 +1139,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (host) {
netdata_mutex_lock(&host->receiver_lock);
if (host->receiver) {
- age = now_realtime_sec() - host->receiver->last_msg_t;
+ age = now_monotonic_sec() - host->receiver->last_msg_t;
if (age < 30)
receiver_working = true;
@@ -1117,12 +1150,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
rrd_unlock();
- if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) {
+ if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) {
// we stopped the receiver
// we can proceed with this connection
receiver_stale = false;
- info("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"stopped previous stale receiver to accept this one."
, rpt->hostname
, rpt->client_ip, rpt->client_port
@@ -1152,7 +1185,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
}
- debug(D_SYSTEM, "starting STREAM receive thread.");
+ netdata_log_debug(D_SYSTEM, "starting STREAM receive thread.");
rrdpush_receiver_takeover_web_connection(w, rpt);
@@ -1177,20 +1210,20 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
void rrdpush_reset_destinations_postpone_time(RRDHOST *host) {
- struct rrdpush_destinations *d;
- for (d = host->destinations; d; d = d->next)
- d->postpone_reconnection_until = 0;
+ uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5;
+ time_t now = now_realtime_sec();
+ for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
+ d->postpone_reconnection_until = now + wait;
}
static struct {
STREAM_HANDSHAKE err;
const char *str;
} handshake_errors[] = {
- { STREAM_HANDSHAKE_OK_V5, "OK_V5" },
- { STREAM_HANDSHAKE_OK_V4, "OK_V4" },
- { STREAM_HANDSHAKE_OK_V3, "OK_V3" },
- { STREAM_HANDSHAKE_OK_V2, "OK_V2" },
- { STREAM_HANDSHAKE_OK_V1, "OK_V1" },
+ { STREAM_HANDSHAKE_OK_V3, "CONNECTED" },
+ { STREAM_HANDSHAKE_OK_V2, "CONNECTED" },
+ { STREAM_HANDSHAKE_OK_V1, "CONNECTED" },
+ { STREAM_HANDSHAKE_NEVER, "" },
{ STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" },
{ STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" },
{ STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" },
@@ -1202,17 +1235,31 @@ static struct {
{ STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" },
{ STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" },
{ STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" },
- { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" },
+ { STREAM_HANDSHAKE_INITIALIZATION, "REMOTE IS INITIALIZING" },
+ { STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, "DISCONNECTED HOST CLEANUP" },
+ { STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER, "DISCONNECTED STALE RECEIVER" },
+ { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" },
+ { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" },
+ { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" },
+ { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" },
+ { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" },
+ { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" },
+ { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" },
+ { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" },
{ 0, NULL },
};
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) {
+ if(handshake_error >= STREAM_HANDSHAKE_OK_V1)
+ // handshake_error is the whole version / capabilities number
+ return "CONNECTED";
+
for(size_t i = 0; handshake_errors[i].str ; i++) {
if(handshake_error == handshake_errors[i].err)
return handshake_errors[i].str;
}
- return "";
+ return "UNKNOWN";
}
static struct {
@@ -1232,6 +1279,7 @@ static struct {
{ STREAM_CAP_BINARY, "BINARY" },
{ STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{ STREAM_CAP_IEEE754, "IEEE754" },
+ { STREAM_CAP_DATA_WITH_ML, "ML" },
{ 0 , NULL },
};
@@ -1245,7 +1293,10 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
}
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) {
- buffer_json_member_add_array(wb, key);
+ if(key)
+ buffer_json_member_add_array(wb, key);
+ else
+ buffer_json_add_array_item_array(wb);
for(size_t i = 0; capability_names[i].str ; i++) {
if(caps & capability_names[i].cap)
@@ -1259,7 +1310,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, rpt->capabilities);
- info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
buffer_free(wb);
@@ -1269,13 +1320,13 @@ void log_sender_capabilities(struct sender_state *s) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, s->capabilities);
- info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
buffer_free(wb);
}
-STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
STREAM_CAPABILITIES caps = 0;
if(version <= 1) caps = STREAM_CAP_V1;
@@ -1294,7 +1345,13 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
if(caps & STREAM_CAP_V2)
caps &= ~(STREAM_CAP_V1);
- return caps & stream_our_capabilities();
+ STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender);
+
+ if(!(common_caps & STREAM_CAP_INTERPOLATED))
+ // DATA WITH ML requires INTERPOLATED
+ common_caps &= ~STREAM_CAP_DATA_WITH_ML;
+
+ return common_caps;
}
int32_t stream_capabilities_to_vn(uint32_t caps) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index f97c8ddf..73bd438c 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -43,19 +43,20 @@ typedef enum {
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
+ STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#else
#define STREAM_HAS_COMPRESSION 0
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
-STREAM_CAPABILITIES stream_our_capabilities();
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
@@ -77,11 +78,10 @@ STREAM_CAPABILITIES stream_our_capabilities();
#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
typedef enum {
- STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
- STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
- STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
- STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
- STREAM_HANDSHAKE_OK_V1 = 1,
+ STREAM_HANDSHAKE_OK_V3 = 3, // v3+
+ STREAM_HANDSHAKE_OK_V2 = 2, // v2
+ STREAM_HANDSHAKE_OK_V1 = 1, // v1
+ STREAM_HANDSHAKE_NEVER = 0, // never tried to connect
STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
@@ -94,20 +94,19 @@ typedef enum {
STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
STREAM_HANDSHAKE_INITIALIZATION = -12,
-} STREAM_HANDSHAKE;
-
-
-// ----------------------------------------------------------------------------
+ STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP = -13,
+ STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER = -14,
+ STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
+ STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
+ STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
+ STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
+ STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
+ STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
+ STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
-typedef enum __attribute__((packed)) {
- STREAM_TRAFFIC_TYPE_REPLICATION,
- STREAM_TRAFFIC_TYPE_FUNCTIONS,
- STREAM_TRAFFIC_TYPE_METADATA,
- STREAM_TRAFFIC_TYPE_DATA,
+} STREAM_HANDSHAKE;
- // terminator
- STREAM_TRAFFIC_TYPE_MAX,
-} STREAM_TRAFFIC_TYPE;
// ----------------------------------------------------------------------------
@@ -119,35 +118,115 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+// signature MUST end with a newline
+#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4
+
struct compressor_state {
+ bool initialized;
char *compression_result_buffer;
size_t compression_result_buffer_size;
- struct compressor_data *data; // Compression API specific data
- void (*reset)(struct compressor_state *state);
+ struct {
+ void *lz4_stream;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
+ } stream;
size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
void (*destroy)(struct compressor_state **state);
};
+void rrdpush_compressor_reset(struct compressor_state *state);
+void rrdpush_compressor_destroy(struct compressor_state *state);
+size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);
+
struct decompressor_state {
+ bool initialized;
size_t signature_size;
size_t total_compressed;
size_t total_uncompressed;
size_t packet_count;
- struct decompressor_stream *stream; // Decompression API specific data
- void (*reset)(struct decompressor_state *state);
- size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
- size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
- size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
- size_t (*get)(struct decompressor_state *state, char *data, size_t size);
- void (*destroy)(struct decompressor_state **state);
+ struct {
+ void *lz4_stream;
+ char *buffer;
+ size_t size;
+ size_t write_at;
+ size_t read_at;
+ } stream;
};
+
+void rrdpush_decompressor_destroy(struct decompressor_state *state);
+void rrdpush_decompressor_reset(struct decompressor_state *state);
+size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+
+static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
+ if (unlikely(!data || !data_size))
+ return 0;
+
+ if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
+ return 0;
+
+ uint32_t sign = *(uint32_t *)data;
+ if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
+ return 0;
+
+ size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
+ return length;
+}
+
+static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
+ if(unlikely(state->stream.read_at != state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
+
+ return rrdpush_decompress_decode_header(header, header_size);
+}
+
+static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
+ if(unlikely(state->stream.read_at > state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return state->stream.write_at - state->stream.read_at;
+}
+
+static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
+ if (unlikely(!state || !size || !dst))
+ return 0;
+
+ size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
+
+ if(unlikely(!remaining))
+ return 0;
+
+ size_t bytes_to_return = size;
+ if(bytes_to_return > remaining)
+ bytes_to_return = remaining;
+
+ memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
+ state->stream.read_at += bytes_to_return;
+
+ if(unlikely(state->stream.read_at > state->stream.write_at))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return bytes_to_return;
+}
#endif
// Thread-local storage
- // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
-typedef enum {
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION = 0,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
@@ -158,7 +237,7 @@ struct sender_state {
SENDER_FLAGS flags;
int timeout;
int default_port;
- usec_t reconnect_delay;
+ uint32_t reconnect_delay;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
size_t begin;
size_t reconnects_counter;
@@ -170,10 +249,10 @@ struct sender_state {
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
- netdata_mutex_t mutex;
+ SPINLOCK spinlock;
struct circular_buffer *buffer;
char read_buffer[PLUGINSD_LINE_MAX + 1];
- int read_len;
+ ssize_t read_len;
STREAM_CAPABILITIES capabilities;
size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
@@ -183,16 +262,17 @@ struct sender_state {
uint16_t hops;
-#ifdef ENABLE_COMPRESSION
- struct compressor_state *compressor;
-#endif
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ struct compressor_state compressor;
+#endif // ENABLE_RRDPUSH_COMPRESSION
+
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
#endif
struct {
bool shutdown;
- const char *reason;
+ STREAM_HANDSHAKE reason;
} exit;
struct {
@@ -216,6 +296,9 @@ struct sender_state {
} atomic;
};
+#define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
+#define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock)
+
#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
@@ -242,6 +325,44 @@ struct sender_state {
#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
+/*
+typedef enum {
+ STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE = (1 << 0),
+ STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST = (1 << 1),
+ STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2),
+ STREAM_NODE_INSTANCE_FEATURE_ML_SELF = (1 << 3),
+ STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED = (1 << 4),
+ STREAM_NODE_INSTANCE_FEATURE_SSL = (1 << 5),
+} STREAM_NODE_INSTANCE_FEATURES;
+
+typedef struct stream_node_instance {
+ uuid_t uuid;
+ STRING *agent;
+ STREAM_NODE_INSTANCE_FEATURES features;
+ uint32_t hops;
+
+ // receiver information on that agent
+ int32_t capabilities;
+ uint32_t local_port;
+ uint32_t remote_port;
+ STRING *local_ip;
+ STRING *remote_ip;
+} STREAM_NODE_INSTANCE;
+*/
+
+struct buffered_reader {
+ ssize_t read_len;
+ ssize_t pos;
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
+};
+
+char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size);
+static inline void buffered_reader_init(struct buffered_reader *reader) {
+ reader->read_buffer[0] = '\0';
+ reader->read_len = 0;
+ reader->pos = 0;
+}
+
struct receiver_state {
RRDHOST *host;
pid_t tid;
@@ -263,14 +384,14 @@ struct receiver_state {
struct rrdhost_system_info *system_info;
STREAM_CAPABILITIES capabilities;
time_t last_msg_t;
- char read_buffer[PLUGINSD_LINE_MAX + 1];
- int read_len;
+
+ struct buffered_reader reader;
uint16_t hops;
struct {
bool shutdown; // signal the streaming parser to exit
- const char *reason; // the reason of disconnection to log
+ STREAM_HANDSHAKE reason;
} exit;
struct {
@@ -279,6 +400,7 @@ struct receiver_state {
int update_every;
int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
time_t alarms_delay;
+ uint32_t alarms_history;
int rrdpush_enabled;
char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
@@ -292,31 +414,36 @@ struct receiver_state {
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl;
#endif
-#ifdef ENABLE_COMPRESSION
- unsigned int rrdpush_compression;
- struct decompressor_state *decompressor;
-#endif
time_t replication_first_time_t;
+
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ struct decompressor_state decompressor;
+#endif // ENABLE_RRDPUSH_COMPRESSION
+/*
+ struct {
+ uint32_t count;
+ STREAM_NODE_INSTANCE *array;
+ } instances;
+*/
};
struct rrdpush_destinations {
STRING *destination;
bool ssl;
-
- const char *last_error;
- time_t last_attempt;
+ uint32_t attempts;
+ time_t since;
time_t postpone_reconnection_until;
- STREAM_HANDSHAKE last_handshake;
+ STREAM_HANDSHAKE reason;
struct rrdpush_destinations *prev;
struct rrdpush_destinations *next;
};
extern unsigned int default_rrdpush_enabled;
-#ifdef ENABLE_COMPRESSION
-extern unsigned int default_compression_enabled;
-#endif
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+extern unsigned int default_rrdpush_compression_enabled;
+#endif // ENABLE_RRDPUSH_COMPRESSION
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@@ -352,13 +479,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
-void rrdpush_claimed_id(RRDHOST *host);
+void rrdpush_send_claimed_id(RRDHOST *host);
+void rrdpush_send_global_functions(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
-void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
+void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
@@ -373,27 +501,266 @@ int connect_to_one_of_destinations(
void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
struct compressor_state *create_compressor();
-struct decompressor_state *create_decompressor();
-#endif
+#endif // ENABLE_RRDPUSH_COMPRESSION
+
void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
-STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
int32_t stream_capabilities_to_vn(uint32_t caps);
void receiver_state_free(struct receiver_state *rpt);
-bool stop_streaming_receiver(RRDHOST *host, const char *reason);
+bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);
void sender_thread_buffer_free(void);
-void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-
#include "replication.h"
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_STATUS_INITIALIZING = 0,
+ RRDHOST_DB_STATUS_QUERYABLE,
+} RRDHOST_DB_STATUS;
+
+static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_DB_STATUS_QUERYABLE:
+ return "online";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_LIVENESS_STALE = 0,
+ RRDHOST_DB_LIVENESS_LIVE,
+} RRDHOST_DB_LIVENESS;
+
+static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_LIVENESS_STALE:
+ return "stale";
+
+ case RRDHOST_DB_LIVENESS_LIVE:
+ return "live";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_STATUS_ARCHIVED = 0,
+ RRDHOST_INGEST_STATUS_INITIALIZING,
+ RRDHOST_INGEST_STATUS_REPLICATING,
+ RRDHOST_INGEST_STATUS_ONLINE,
+ RRDHOST_INGEST_STATUS_OFFLINE,
+} RRDHOST_INGEST_STATUS;
+
+static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) {
+ switch(status) {
+ case RRDHOST_INGEST_STATUS_ARCHIVED:
+ return "archived";
+
+ case RRDHOST_INGEST_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_INGEST_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_INGEST_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_INGEST_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_TYPE_LOCALHOST = 0,
+ RRDHOST_INGEST_TYPE_VIRTUAL,
+ RRDHOST_INGEST_TYPE_CHILD,
+ RRDHOST_INGEST_TYPE_ARCHIVED,
+} RRDHOST_INGEST_TYPE;
+
+static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) {
+ switch(type) {
+ case RRDHOST_INGEST_TYPE_LOCALHOST:
+ return "localhost";
+
+ case RRDHOST_INGEST_TYPE_VIRTUAL:
+ return "virtual";
+
+ case RRDHOST_INGEST_TYPE_CHILD:
+ return "child";
+
+ default:
+ case RRDHOST_INGEST_TYPE_ARCHIVED:
+ return "archived";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_STREAM_STATUS_DISABLED = 0,
+ RRDHOST_STREAM_STATUS_REPLICATING,
+ RRDHOST_STREAM_STATUS_ONLINE,
+ RRDHOST_STREAM_STATUS_OFFLINE,
+} RRDHOST_STREAMING_STATUS;
+
+static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) {
+ switch(status) {
+ case RRDHOST_STREAM_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_STREAM_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_STREAM_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_STREAM_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_STATUS_DISABLED = 0,
+ RRDHOST_ML_STATUS_OFFLINE,
+ RRDHOST_ML_STATUS_RUNNING,
+} RRDHOST_ML_STATUS;
+
+static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) {
+ switch(status) {
+ case RRDHOST_ML_STATUS_RUNNING:
+ return "online";
+
+ case RRDHOST_ML_STATUS_OFFLINE:
+ return "offline";
+
+ default:
+ case RRDHOST_ML_STATUS_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_TYPE_DISABLED = 0,
+ RRDHOST_ML_TYPE_SELF,
+ RRDHOST_ML_TYPE_RECEIVED,
+} RRDHOST_ML_TYPE;
+
+static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) {
+ switch(type) {
+ case RRDHOST_ML_TYPE_SELF:
+ return "self";
+
+ case RRDHOST_ML_TYPE_RECEIVED:
+ return "received";
+
+ default:
+ case RRDHOST_ML_TYPE_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_HEALTH_STATUS_DISABLED = 0,
+ RRDHOST_HEALTH_STATUS_INITIALIZING,
+ RRDHOST_HEALTH_STATUS_RUNNING,
+} RRDHOST_HEALTH_STATUS;
+
+static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_HEALTH_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_HEALTH_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_HEALTH_STATUS_RUNNING:
+ return "online";
+ }
+}
+
+typedef struct rrdhost_status {
+ RRDHOST *host;
+ time_t now;
+
+ struct {
+ RRDHOST_DB_STATUS status;
+ RRDHOST_DB_LIVENESS liveness;
+ RRD_MEMORY_MODE mode;
+ time_t first_time_s;
+ time_t last_time_s;
+ size_t metrics;
+ size_t instances;
+ size_t contexts;
+ } db;
+
+ struct {
+ RRDHOST_ML_STATUS status;
+ RRDHOST_ML_TYPE type;
+ struct ml_metrics_statistics metrics;
+ } ml;
+
+ struct {
+ size_t hops;
+ RRDHOST_INGEST_TYPE type;
+ RRDHOST_INGEST_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ STREAM_HANDSHAKE reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+ } ingest;
+
+ struct {
+ size_t hops;
+ RRDHOST_STREAMING_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ bool compression;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ STREAM_HANDSHAKE reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+ } stream;
+
+ struct {
+ RRDHOST_HEALTH_STATUS status;
+ struct {
+ uint32_t undefined;
+ uint32_t uninitialized;
+ uint32_t clear;
+ uint32_t warning;
+ uint32_t critical;
+ } alerts;
+ } health;
+} RRDHOST_STATUS;
+
+void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
+bool rrdhost_state_cloud_emulation(RRDHOST *host);
+
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 6e58d9a2..76843518 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -11,7 +11,7 @@
#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
-#define WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR 9
+#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
@@ -66,7 +66,7 @@ BUFFER *sender_start(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
/*
* In case of stream compression buffer overflow
* Inform the user through the error log file and
@@ -74,9 +74,9 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
*/
static inline void deactivate_compression(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
- error("STREAM_COMPRESSION: Compression returned error, disabling it.");
+ netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it.");
s->flags &= ~SENDER_FLAG_COMPRESSION;
- error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_thread_close_socket(s->host);
}
#endif
@@ -100,7 +100,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
if(unlikely(!src || !src_len))
return;
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
// FILE *fp = fopen("/tmp/stream.txt", "a");
// fprintf(fp,
@@ -111,14 +111,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
// fclose(fp);
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
- info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
+ netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
-#ifdef ENABLE_COMPRESSION
- if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -144,19 +144,19 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
char *dst;
- size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if (!dst_len) {
- error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
+ netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
rrdhost_hostname(s->host), s->connected_to);
- s->compressor->reset(s->compressor);
- dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ rrdpush_compressor_reset(&s->compressor);
+ dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if(!dst_len) {
- error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
+ netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
rrdhost_hostname(s->host), s->connected_to);
deactivate_compression(s);
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
return;
}
}
@@ -189,7 +189,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
signal_sender = true;
}
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
if(signal_sender)
rrdpush_signal_sender_to_wake_up(s);
@@ -203,7 +203,7 @@ static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const
, rrdvar2number(rva)
);
- debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
+ netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
}
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
@@ -242,7 +242,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
- debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
}
}
@@ -258,7 +258,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rd->exposed = 0;
+ rrddim_clear_exposed(rd);
rrddim_foreach_done(rd);
}
rrdset_foreach_done(st);
@@ -273,7 +273,7 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t
return;
if(!have_mutex)
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
rrdpush_sender_last_buffer_recreate_set(s, now_s);
last_reset_time_s = now_s;
@@ -287,20 +287,20 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t
sender_thread_buffer_free();
if(!have_mutex)
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
}
static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
rrdpush_sender_set_flush_time(host->sender);
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
// flush the output buffer from any data it may have
cbuffer_flush(host->sender->buffer);
rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
replication_recalculate_buffer_used_ratio_unsafe(host->sender);
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
@@ -490,27 +490,26 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
break;
}
}
- const char *error = stream_responses[i].error;
- int worker_job_id = stream_responses[i].worker_job_id;
- time_t delay = stream_responses[i].postpone_reconnect_seconds;
if(version >= STREAM_HANDSHAKE_OK_V1) {
- host->destination->last_error = NULL;
- host->destination->last_handshake = version;
- host->destination->postpone_reconnection_until = 0;
- s->capabilities = convert_stream_version_to_capabilities(version);
+ host->destination->reason = version;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
+ s->capabilities = convert_stream_version_to_capabilities(version, host, true);
return true;
}
+ const char *error = stream_responses[i].error;
+ int worker_job_id = stream_responses[i].worker_job_id;
+ time_t delay = stream_responses[i].postpone_reconnect_seconds;
+
worker_is_busy(worker_job_id);
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = error;
- host->destination->last_handshake = version;
+ host->destination->reason = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
char buf[LOG_DATE_LENGTH];
log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
- error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
+ netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
rrdhost_hostname(host), s->connected_to, error, delay, buf);
return false;
@@ -532,8 +531,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "SSL error";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
return false;
}
@@ -543,10 +541,9 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
// certificate is not valid
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
+ netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
- host->destination->last_error = "invalid SSL certificate";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
return false;
}
@@ -554,7 +551,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
return true;
}
- error("SSL: failed to establish connection.");
+ netdata_log_error("SSL: failed to establish connection.");
return false;
#else
@@ -584,20 +581,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
);
if(unlikely(s->rrdpush_sender_socket == -1)) {
- // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
+ // netdata_log_error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
return false;
}
- // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
+ // netdata_log_info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
// reset our capabilities to default
- s->capabilities = stream_our_capabilities();
+ s->capabilities = stream_our_capabilities(host, true);
-#ifdef ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
// If we don't want compression, remove it from our capabilities
if(!(s->flags & SENDER_FLAG_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
-#endif // ENABLE_COMPRESSION
+#endif // ENABLE_RRDPUSH_COMPRESSION
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
@@ -708,7 +705,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(!rrdpush_sender_connect_ssl(s))
return false;
- ssize_t bytes, len = strlen(http);
+ ssize_t bytes, len = (ssize_t)strlen(http);
bytes = send_timeout(
#ifdef ENABLE_HTTPS
@@ -723,9 +720,8 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
- host->destination->last_error = "timeout while sending request";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
+ netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
return false;
}
@@ -743,36 +739,33 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
- host->destination->last_error = "timeout while expecting first response";
- host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
+ netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
return false;
}
if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
http[bytes] = '\0';
- debug(D_STREAM, "Response to sender from far end: %s", http);
+ netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
if(!rrdpush_sender_validate_response(host, s, http, bytes))
return false;
-#ifdef ENABLE_COMPRESSION
- if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
- if(!s->compressor)
- s->compressor = create_compressor();
- else
- s->compressor->reset(s->compressor);
- }
-#endif //ENABLE_COMPRESSION
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ rrdpush_compressor_reset(&s->compressor);
+ else
+ rrdpush_compressor_destroy(&s->compressor);
+#endif // ENABLE_RRDPUSH_COMPRESSION
log_sender_capabilities(s);
- debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
+ netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
return true;
}
@@ -820,18 +813,18 @@ static bool attempt_to_connect(struct sender_state *state)
return false;
}
-// TCP window is open and we have data to transmit.
+// TCP window is open, and we have data to transmit.
static ssize_t attempt_to_send(struct sender_state *s) {
- ssize_t ret = 0;
+ ssize_t ret;
#ifdef NETDATA_INTERNAL_CHECKS
struct circular_buffer *cb = s->buffer;
#endif
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
char *chunk;
size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
- debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
+ netdata_log_debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
#ifdef ENABLE_HTTPS
if(SSL_connection(&s->ssl))
@@ -846,21 +839,21 @@ static ssize_t attempt_to_send(struct sender_state *s) {
cbuffer_remove_unsafe(s->buffer, ret);
s->sent_bytes_on_this_connection += ret;
s->sent_bytes += ret;
- debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
+ netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
}
else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
- debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
else if (ret == -1) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
- debug(D_STREAM, "STREAM: Send failed - closing socket...");
- error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
+ netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket...");
+ netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
else
- debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
+ netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
replication_recalculate_buffer_used_ratio_unsafe(s);
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
return ret;
}
@@ -893,11 +886,11 @@ static ssize_t attempt_read(struct sender_state *s) {
if (ret == 0 || errno == ECONNRESET) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
- error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
}
else {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
- error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
+ netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
}
rrdpush_sender_thread_close_socket(s->host);
@@ -951,13 +944,13 @@ void execute_commands(struct sender_state *s) {
while( start < end && (newline = strchr(start, '\n')) ) {
*newline = '\0';
- log_access("STREAM: %d from '%s' for host '%s': %s",
+ netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
gettid(), s->connected_to, rrdhost_hostname(s->host), start);
// internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS);
+ size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
const char *keyword = get_word(words, num_words, 0);
@@ -969,7 +962,7 @@ void execute_commands(struct sender_state *s) {
char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
keyword,
transaction?transaction:"(unset)",
@@ -1002,7 +995,7 @@ void execute_commands(struct sender_state *s) {
const char *before = get_word(words, num_words, 4);
if (!chart_id || !start_streaming || !after || !before) {
- error("STREAM %s [send to %s] %s command is incomplete"
+ netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
" (chart=%s, start_streaming=%s, after=%s, before=%s)",
rrdhost_hostname(s->host), s->connected_to,
keyword,
@@ -1020,7 +1013,7 @@ void execute_commands(struct sender_state *s) {
}
}
else {
- error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
}
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
@@ -1051,7 +1044,7 @@ static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen)
int new_pipe_fds[2];
if(reopen) {
if(pipe(new_pipe_fds) != 0) {
- error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
new_pipe_fds[PIPE_READ] = -1;
new_pipe_fds[PIPE_WRITE] = -1;
ret = false;
@@ -1091,138 +1084,26 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
// signal the sender there are more data
if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
- error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
}
}
-static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) {
- size_t charts = rrdhost_sender_replicating_charts(host);
- NETDATA_DOUBLE completion;
- if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t)
- completion = 100.0;
- else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t)
- completion = 0.0;
- else {
- time_t total = now - host->sender->replication.oldest_request_after_t;
- time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t;
- completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total;
- }
-
- *instances = charts;
-
- return completion;
-}
-
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
- bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- buffer_json_member_add_object(wb, key);
-
- if(host->sender)
- buffer_json_member_add_uint64(wb, "hops", host->sender->hops);
-
- buffer_json_member_add_boolean(wb, "online", online);
-
- if(host->sender && host->sender->last_state_since_t) {
- buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t);
- buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t);
- }
-
- if(!online && host->sender && host->sender->exit.reason)
- buffer_json_member_add_string(wb, "reason", host->sender->exit.reason);
-
- buffer_json_member_add_object(wb, "replication");
- {
- size_t instances;
- NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances);
- buffer_json_member_add_boolean(wb, "in_progress", instances);
- buffer_json_member_add_double(wb, "completion", completion);
- buffer_json_member_add_uint64(wb, "instances", instances);
- }
- buffer_json_object_close(wb);
-
- if(host->sender) {
- netdata_mutex_lock(&host->sender->mutex);
-
- buffer_json_member_add_object(wb, "destination");
- {
- char buf[1024 + 1];
- if(online && host->sender->rrdpush_sender_socket != -1) {
- SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket);
- bool ssl = SSL_connection(&host->sender->ssl);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "local", buf);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "remote", buf);
-
- stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities");
-
- buffer_json_member_add_object(wb, "traffic");
- {
- bool compression = false;
-#ifdef ENABLE_COMPRESSION
- compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor);
-#endif
- buffer_json_member_add_boolean(wb, "compression", compression);
- buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]);
- buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]);
- buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]);
- buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]);
- }
- buffer_json_object_close(wb); // traffic
- }
-
- buffer_json_member_add_array(wb, "candidates");
- struct rrdpush_destinations *d;
- for (d = host->destinations; d; d = d->next) {
- buffer_json_add_array_item_object(wb);
- {
-
- if (d->ssl) {
- snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
- buffer_json_member_add_string(wb, "destination", buf);
- }
- else
- buffer_json_member_add_string(wb, "destination", string2str(d->destination));
-
- buffer_json_member_add_time_t(wb, "last_check", d->last_attempt);
- buffer_json_member_add_time_t(wb, "age", now - d->last_attempt);
- buffer_json_member_add_string(wb, "last_error", d->last_error);
- buffer_json_member_add_string(wb, "last_handshake",
- stream_handshake_error_to_string(d->last_handshake));
- buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until);
- buffer_json_member_add_time_t(wb, "next_in",
- (d->postpone_reconnection_until > now) ?
- d->postpone_reconnection_until - now : 0);
- }
- buffer_json_object_close(wb); // each candidate
- }
- buffer_json_array_close(wb); // candidates
- }
- buffer_json_object_close(wb); // destination
-
- netdata_mutex_unlock(&host->sender->mutex);
- }
-
- buffer_json_object_close(wb); // streaming
-}
-
static bool rrdhost_set_sender(RRDHOST *host) {
if(unlikely(!host->sender)) return false;
bool ret = false;
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(!host->sender->tid) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+ host->rrdpush_sender_connection_counter++;
host->sender->tid = gettid();
host->sender->last_state_since_t = now_realtime_sec();
- host->sender->exit.reason = NULL;
+ host->sender->exit.reason = STREAM_HANDSHAKE_NEVER;
ret = true;
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
rrdpush_reset_destinations_postpone_time(host);
@@ -1237,6 +1118,10 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
host->sender->exit.shutdown = false;
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
host->sender->last_state_since_t = now_realtime_sec();
+ if(host->destination) {
+ host->destination->since = host->sender->last_state_since_t;
+ host->destination->reason = host->sender->exit.reason;
+ }
}
rrdpush_reset_destinations_postpone_time(host);
@@ -1248,25 +1133,25 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) {
if(unlikely(!service_running(SERVICE_STREAMING))) {
if(!s->exit.reason)
- s->exit.reason = "NETDATA EXIT";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT;
return true;
}
if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
if(!s->exit.reason)
- s->exit.reason = "NON STREAMABLE HOST";
+ s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST;
return true;
}
if(unlikely(s->exit.shutdown)) {
if(!s->exit.reason)
- s->exit.reason = "SENDER SHUTDOWN REQUESTED";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN;
return true;
}
if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
if(!s->exit.reason)
- s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)";
+ s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST;
return true;
}
@@ -1279,16 +1164,16 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
RRDHOST *host = s->host;
- netdata_mutex_lock(&host->sender->mutex);
- info("STREAM %s [send]: sending thread exits %s",
+ sender_lock(host->sender);
+ netdata_log_info("STREAM %s [send]: sending thread exits %s",
rrdhost_hostname(host),
- host->sender->exit.reason ? host->sender->exit.reason : "");
+ host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
rrdpush_sender_thread_close_socket(host);
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
rrdhost_clear_sender___while_having_sender_mutex(host);
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
freez(s->pipe_buffer);
freez(s);
@@ -1297,10 +1182,10 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
#ifdef ENABLE_HTTPS
static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
- netdata_spinlock_lock(&sp);
+ spinlock_lock(&sp);
if(netdata_ssl_streaming_sender_ctx || !host) {
- netdata_spinlock_unlock(&sp);
+ spinlock_unlock(&sp);
return;
}
@@ -1316,7 +1201,7 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
}
}
- netdata_spinlock_unlock(&sp);
+ spinlock_unlock(&sp);
#endif
}
@@ -1331,7 +1216,7 @@ void *rrdpush_sender_thread(void *ptr) {
// disconnection reasons
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
@@ -1353,20 +1238,20 @@ void *rrdpush_sender_thread(void *ptr) {
if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
- error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
+ netdata_log_error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
rrdhost_hostname(s->host), gettid());
return NULL;
}
if(!rrdhost_set_sender(s->host)) {
- error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
+ netdata_log_error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
rrdhost_hostname(s->host), gettid());
return NULL;
}
rrdpush_initialize_ssl_ctx(s->host);
- info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
+ netdata_log_info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
s->timeout = (int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
@@ -1397,7 +1282,7 @@ void *rrdpush_sender_thread(void *ptr) {
pipe_buffer_size = 10 * 1024;
if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
- error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
rrdhost_hostname(s->host));
return NULL;
}
@@ -1433,12 +1318,13 @@ void *rrdpush_sender_thread(void *ptr) {
break;
now_s = s->last_traffic_seen_t = now_monotonic_sec();
- rrdpush_claimed_id(s->host);
+ rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
+ rrdpush_send_global_functions(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
continue;
}
@@ -1452,19 +1338,19 @@ void *rrdpush_sender_thread(void *ptr) {
!rrdpush_sender_replicating_charts(s)
)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
+ netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
rrdpush_sender_thread_close_socket(s->host);
continue;
}
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->buffer);
if (unlikely(!outstanding)) {
rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
}
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
@@ -1473,7 +1359,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
- error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
rrdhost_hostname(s->host));
rrdpush_sender_thread_close_socket(s->host);
break;
@@ -1502,7 +1388,7 @@ void *rrdpush_sender_thread(void *ptr) {
int poll_rc = poll(fds, 2, 1000);
- debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
+ netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
fds[Collector].revents, fds[Socket].revents, outstanding);
if(unlikely(rrdhost_sender_should_exit(s)))
@@ -1517,7 +1403,7 @@ void *rrdpush_sender_thread(void *ptr) {
// Spurious wake-ups without error - loop again
if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
netdata_thread_testcancel();
- debug(D_STREAM, "Spurious wakeup");
+ netdata_log_debug(D_STREAM, "Spurious wakeup");
now_s = now_monotonic_sec();
continue;
}
@@ -1525,7 +1411,7 @@ void *rrdpush_sender_thread(void *ptr) {
// Only errors from poll() are internal, but try restarting the connection
if(unlikely(poll_rc == -1)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
- error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
rrdpush_sender_thread_close_socket(s->host);
continue;
@@ -1544,10 +1430,10 @@ void *rrdpush_sender_thread(void *ptr) {
// If the collector woke us up then empty the pipe to remove the signal
if (fds[Collector].revents & (POLLIN|POLLPRI)) {
worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
- debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
+ netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
- error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
+ netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
}
// Read as much as possible to fill the buffer, split into full lines for execution.
@@ -1575,7 +1461,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(error) {
rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
- error("STREAM %s [send to %s]: restarting internal pipe: %s.",
+ netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.",
rrdhost_hostname(s->host), s->connected_to, error);
}
}
@@ -1591,8 +1477,8 @@ void *rrdpush_sender_thread(void *ptr) {
error = "connection is invalid (POLLNVAL)";
if(unlikely(error)) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR);
- error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR);
+ netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
@@ -1602,7 +1488,7 @@ void *rrdpush_sender_thread(void *ptr) {
if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
errno = 0;
- error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
+ netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
diff --git a/streaming/stream.conf b/streaming/stream.conf
index 7c9ccc9b..94e94cab 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -146,8 +146,8 @@
# 3 possible values:
# yes enable alarms
# no do not enable alarms
- # auto enable alarms, only when the sending netdata is connected. For ephemeral child nodes or child system restarts,
- # ensure that the netdata process on the child is gracefully stopped, to prevent invalid last_collected alarms
+ # auto enable alarms, only when the sending netdata is connected.
+ # Health monitoring will be disabled as soon as the connection is closed.
# You can also set it per host, below.
# The default is taken from [health].enabled of netdata.conf
#health enabled by default = auto
@@ -155,6 +155,9 @@
# postpone alarms for a short period after the sender is connected
default postpone alarms on connect seconds = 60
+ # seconds of health log events to keep
+ #default health log history = 432000
+
# need to route metrics differently? set these.
# the defaults are the ones at the [stream] section (above)
#default proxy enabled = yes | no
@@ -223,6 +226,9 @@
# postpone alarms when the sender connects
postpone alarms on connect seconds = 60
+ # seconds of health log events to keep
+ #health log history = 432000
+
# need to route metrics differently?
# the defaults are the ones at the [API KEY] section
#proxy enabled = yes | no