summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/README.md51
-rw-r--r--streaming/compression.c36
-rw-r--r--streaming/rrdpush.c11
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c62
-rw-r--r--streaming/stream.conf6
6 files changed, 109 insertions, 59 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 71c1dc28..57c392f4 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -67,18 +67,18 @@ Alerts for the child can be triggered by any of the involved hosts that maintain
You can daisy-chain any number of Netdata, each with or without a database and
with or without alerts for the child metrics.
-### Mix and match with backends
+### Mix and match with exporting engine
-All nodes that maintain a database can also send their data to a backend database.
+All nodes that maintain a database can also send their data to an external database.
This allows quite complex setups.
Example:
1. Netdata nodes `A` and `B` do not maintain a database and stream metrics to Netdata node `C`(live streaming functionality).
-2. Netdata node `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (backends functionality).
+2. Netdata node `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (exporting functionality).
3. Netdata node `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality).
4. Netdata node `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`.
-5. Netdata node `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (backends functionality)
+5. Netdata node `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (exporting functionality)
6. Alerts are triggered by `H` for all hosts.
7. Users can use all Netdata nodes that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
@@ -107,15 +107,7 @@ This also disables the registry (there cannot be a registry without an API).
requests from its child nodes. 0 sets no limit, 1 means maximum once every second. If this is set, you may see error log
entries "... too busy to accept new streaming request. Will be allowed in X secs".
-```
-[backend]
- enabled = yes | no
- type = graphite | opentsdb
- destination = IP:PORT ...
- update every = 10
-```
-
-`[backend]` configures data archiving to a backend (it archives all databases maintained on
+You can [use](/exporting/README.md#configuration) the exporting engine to configure data archiving to an external database (it archives all databases maintained on
this host).
### Streaming configuration
@@ -156,7 +148,7 @@ a proxy).
```
This is an overview of how these options can be combined:
-| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|backend|alarms|dashboard|
+| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|exporting|alarms|dashboard|
|------|:-------------:|:----------:|:----------------:|:-----:|:----:|:-------:|
| headless collector|`none`|`none`|`yes`|only for `data source = as collected`|not possible|no|
| headless proxy|`none`|not `none`|`yes`|only for `data source = as collected`|not possible|no|
@@ -211,7 +203,6 @@ that use `*` as wildcard (any number of times) and a `!` prefix for a negative m
So: `allow from = !10.1.2.3 10.*` will allow all IPs in `10.*` except `10.1.2.3`. The order is
important: left to right, the first positive or negative match is used.
-`allow from` is available in Netdata v1.9+
##### Tracing
@@ -366,16 +357,30 @@ Note: The `stream-compression` status can be `"enabled" | "disabled" | "N/A"`.
A compressed data packet is determined and decompressed on the fly.
#### Limitations
- This limitation will be withdrawn asap and is work-in-progress.
-
-The current implementation of streaming data compression can support only a few number of dimensions in a chart with names that cannot exceed the size of 16384 bytes. In case you experience stream connection problems or gaps in the charts please disable stream compresssion in the `stream.conf` file. This limitation can be seen in the error.log file with the sequence of the following messages:
-```
-Compression error - data discarded
-Message size above limit:
+This limitation will be withdrawn asap and is work-in-progress.
+
+The current implementation of streaming data compression can support only a few number of dimensions in a chart with names that cannot exceed the size of 16384 bytes. In case your instance hit this limitation, the agent will deactivate compression during runtime to avoid stream corruption. This limitation can be seen in the error.log file with the sequence of the following messages:
+```
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: connecting...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: initializing communication...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: waiting response from remote netdata...
+netdata INFO : STREAM_SENDER[child01] : STREAM_COMPRESSION: Compressor Reset
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: established communication with a parent using protocol version 5 - ready to send metrics...
+...
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION: Compression Failed - Message size 27847 above compression buffer limit: 16384 (errno 9, Bad file descriptor)
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION: Deactivating compression to avoid stream corruption
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION child01 [send to my.parent.IP]: Restarting connection without compression
+...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: connecting...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: initializing communication...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: waiting response from remote netdata...
+netdata INFO : STREAM_SENDER[child01] : Stream is uncompressed! One of the agents (my.parent.IP <-> child01) does not support compression OR compression is disabled.
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: established communication with a parent using protocol version 4 - ready to send metrics...
+netdata INFO : WEB_SERVER[static4] : STREAM child01 [send]: sending metrics...
```
#### How to enable stream compression
-Netdata Agents are shipped with data compression disabled by default. You can also configure which streams will use compression.
+Netdata Agents are shipped with data compression enabled by default. You can also configure which streams will use compression.
With enabled stream compression, a Netdata Agent can negotiate streaming compression with other Netdata Agents. During the negotiation of streaming compression both Netdata Agents should support and enable compression in order to communicate over a compressed stream. The negotiation will result into an uncompressed stream, if one of the Netdata Agents doesn't support **or** has compression disabled.
@@ -727,4 +732,4 @@ file descriptor given is not a valid stream
After logging this error, Netdata will close the stream.
-[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fstreaming%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
+
diff --git a/streaming/compression.c b/streaming/compression.c
index 93810aae..d6178d6c 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -3,6 +3,8 @@
#ifdef ENABLE_COMPRESSION
#include "lz4.h"
+#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
+
#define LZ4_MAX_MSG_SIZE 0x4000
#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
@@ -29,7 +31,7 @@ static void lz4_compressor_reset(struct compressor_state *state)
if (state->data) {
if (state->data->stream) {
LZ4_resetStream_fast(state->data->stream);
- info("STREAM_COMPRESSION: Compressor resets stream fast!");
+ info("%s: Compressor Reset", STREAM_COMPRESSION_MSG);
}
state->data->stream_buffer_pos = 0;
}
@@ -46,18 +48,19 @@ static void lz4_compressor_destroy(struct compressor_state **state)
if (s->data->stream)
LZ4_freeStream(s->data->stream);
freez(s->data->stream_buffer);
+ freez(s->data);
}
freez(s->buffer);
freez(s);
*state = NULL;
- debug(D_STREAM, "STREAM_COMPRESSION: Compressor destroyed!");
+ debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
}
}
/*
* Compress the given block of data
- * Comprecced data will remain in the internal buffer until the next invokation
- * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
+ * Compressed data will remain in the internal buffer until the next invocation
+ * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
* or 0 in case of error
*/
static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
@@ -65,7 +68,7 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
if (!state || !size || !out)
return 0;
if (size > LZ4_MAX_MSG_SIZE) {
- error("Message size above limit: %lu", size);
+ error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE);
return 0;
}
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
@@ -84,7 +87,7 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
state->data->stream_buffer + state->data->stream_buffer_pos,
state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
if (compressed_data_size < 0) {
- error("Date compression error: %ld", compressed_data_size);
+ error("Data compression error: %ld", compressed_data_size);
return 0;
}
state->data->stream_buffer_pos += size;
@@ -93,12 +96,12 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
*(uint32_t *)state->buffer = len | SIGNATURE;
*out = state->buffer;
- debug(D_STREAM, "STREAM: Compressed data header: %ld", compressed_data_size);
+ debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
return compressed_data_size + SIGNATURE_SIZE;
}
/*
- * Create and initalize compressor state
+ * Create and initialize compressor state
* Return the pointer to compressor_state structure created
*/
struct compressor_state *create_compressor()
@@ -114,7 +117,7 @@ struct compressor_state *create_compressor()
state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
state->reset(state);
- debug(D_STREAM, "STREAM_COMPRESSION: Initialize streaming compression!");
+ debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
return state;
}
@@ -150,10 +153,11 @@ static void lz4_decompressor_destroy(struct decompressor_state **state)
if (state && *state) {
struct decompressor_state *s = *state;
if (s->data) {
- debug(D_STREAM, "STREAM_COMPRESSION: Destroying decompressor.");
+ debug(D_STREAM, "%s: Destroying decompressor.", STREAM_COMPRESSION_MSG);
if (s->data->stream)
LZ4_freeStreamDecode(s->data->stream);
freez(s->data->stream_buffer);
+ freez(s->data);
}
freez(s->buffer);
freez(s);
@@ -246,7 +250,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
if (!state)
return 0;
if (!state->buffer) {
- error("STREAM: No decompressor buffer allocated");
+ error("%s: No decompressor buffer allocated", STREAM_COMPRESSION_MSG);
return 0;
}
@@ -254,7 +258,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
state->data->stream_buffer + state->data->stream_buffer_pos,
state->buffer_len, state->data->stream_buffer_size - state->data->stream_buffer_pos);
if (decompressed_size < 0) {
- error("STREAM: Decompressor error %ld", decompressed_size);
+ error("%s: Decompressor error %ld", STREAM_COMPRESSION_MSG, decompressed_size);
return 0;
}
@@ -278,7 +282,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
size_t avg_size = state->total_uncompressed / state->packet_count;
if (old_avg_saving != avg_saving || old_avg_size != avg_size){
- debug(D_STREAM, "STREAM: Saving: %lu%% (avg. %lu%%), avg.size: %lu", saving, avg_saving, avg_size);
+ debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size);
}
return decompressed_size;
}
@@ -301,7 +305,7 @@ static size_t lz4_decompressor_get(struct decompressor_state *state, char *data,
if (!state || !size || !data)
return 0;
if (!state->out_buffer)
- fatal("STREAM: No decompressor output buffer allocated");
+ fatal("%s: No decompressor output buffer allocated", STREAM_COMPRESSION_MSG);
if (state->out_buffer_pos + size > state->out_buffer_len)
size = state->out_buffer_len - state->out_buffer_pos;
@@ -318,7 +322,7 @@ static size_t lz4_decompressor_get(struct decompressor_state *state, char *data,
}
/*
- * Create and initalize decompressor state
+ * Create and initialize decompressor state
* Return the pointer to decompressor_state structure created
*/
struct decompressor_state *create_decompressor()
@@ -339,7 +343,7 @@ struct decompressor_state *create_decompressor()
state->data->stream_buffer = mallocz(state->data->stream_buffer_size);
fatal_assert(state->data->stream_buffer);
state->reset(state);
- debug(D_STREAM, "STREAM_COMPRESSION: Initialize streaming decompression!");
+ debug(D_STREAM, "%s: Initialize streaming decompression!", STREAM_COMPRESSION_MSG);
return state;
}
#endif
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index ebd8327f..8829d1ee 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -41,7 +41,7 @@ struct config stream_config = {
unsigned int default_rrdpush_enabled = 0;
#ifdef ENABLE_COMPRESSION
-unsigned int default_compression_enabled = 0;
+unsigned int default_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
@@ -129,6 +129,13 @@ unsigned int remote_clock_resync_iterations = 60;
static inline int should_send_chart_matching(RRDSET *st) {
+ // Do not stream anomaly rates charts.
+ if (unlikely(st->state->is_ar_chart))
+ return false;
+
+ if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION))
+ return ml_streaming_enabled();
+
if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
@@ -781,4 +788,4 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
buffer_flush(w->response.data);
return 200;
-} \ No newline at end of file
+}
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 937ead6f..7eb2c6e5 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -62,7 +62,7 @@ struct decompressor_state {
size_t total_compressed;
size_t total_uncompressed;
size_t packet_count;
- struct decompressor_data *data; // Deompression API specific data
+ struct decompressor_data *data; // Decompression API specific data
void (*reset)(struct decompressor_state *state);
size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
size_t (*put)(struct decompressor_state *state, const char *data, size_t size);
diff --git a/streaming/sender.c b/streaming/sender.c
index 916d809a..72259c3a 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -13,24 +13,43 @@ void sender_start(struct sender_state *s) {
buffer_flush(s->build);
}
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
+
+#ifdef ENABLE_COMPRESSION
+/*
+* In case of stream compression buffer oveflow
+* Inform the user through the error log file and
+* deactivate compression by downgrading the stream protocol.
+*/
+static inline void deactivate_compression(struct sender_state *s)
+{
+ error("STREAM_COMPRESSION: Deactivating compression to avoid stream corruption");
+ default_compression_enabled = 0;
+ s->rrdpush_compression = 0;
+ s->version = STREAM_VERSION_CLABELS;
+ error("STREAM_COMPRESSION %s [send to %s]: Restarting connection without compression", s->host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(s->host);
+}
+#endif
+
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s) {
char *src = (char *)buffer_tostring(s->host->sender->build);
size_t src_len = s->host->sender->build->len;
#ifdef ENABLE_COMPRESSION
- do {
- if (src && src_len) {
- if (s->compressor && s->rrdpush_compression) {
- src_len = s->compressor->compress(s->compressor, src, src_len, &src);
- if (!src_len) {
- error("Compression error - data discarded");
- break;
- }
+ if (src && src_len) {
+ if (s->compressor && s->rrdpush_compression) {
+ src_len = s->compressor->compress(s->compressor, src, src_len, &src);
+ if (!src_len) {
+ deactivate_compression(s);
+ buffer_flush(s->build);
+ netdata_mutex_unlock(&s->mutex);
+ return;
}
- if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
- s->overflow = 1;
}
- } while (0);
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->overflow = 1;
+ }
#else
if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
s->overflow = 1;
@@ -252,6 +271,13 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
}
#endif
+#ifdef ENABLE_COMPRESSION
+// Negotiate stream VERSION_CLABELS if stream compression is not supported
+s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+if(!s->rrdpush_compression)
+ s->version = STREAM_VERSION_CLABELS;
+#endif //ENABLE_COMPRESSION
+
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
*/
@@ -274,7 +300,10 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
"&ml_capable=%d"
"&ml_enabled=%d"
"&tags=%s"
- "&ver=%u"
+ "&ver=%d"
+ "&NETDATA_INSTANCE_CLOUD_TYPE=%s"
+ "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
+ "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
@@ -316,7 +345,10 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, host->system_info->ml_capable
, host->system_info->ml_enabled
, (host->tags) ? host->tags : ""
- , STREAMING_PROTOCOL_CURRENT_VERSION
+ , s->version
+ , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
+ , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
+ , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
, se.os_name
, se.os_id
, (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
@@ -410,7 +442,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
s->version = version;
#ifdef ENABLE_COMPRESSION
- s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+ s->rrdpush_compression = (s->rrdpush_compression && (s->version >= STREAM_VERSION_COMPRESSION));
if(s->rrdpush_compression)
{
// parent supports compression
@@ -420,6 +452,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
else {
//parent does not support compression or has compression disabled
debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
+ infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
s->version = STREAM_VERSION_CLABELS;
}
#endif //ENABLE_COMPRESSION
@@ -664,6 +697,7 @@ void *rrdpush_sender_thread(void *ptr) {
error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
return NULL;
}
+ s->version = STREAMING_PROTOCOL_CURRENT_VERSION;
enum {
Collector,
diff --git a/streaming/stream.conf b/streaming/stream.conf
index 3c363fad..e65e76fa 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -60,7 +60,7 @@
# The API_KEY to use (as the sender)
api key =
- # Stream Compresssion
+ # Stream Compression
#
# The netdata child is configurated to enable stream compression by default.
# You can control stream compression in this agent with options: yes | no
@@ -162,7 +162,7 @@
#default proxy api key = API_KEY
#default proxy send charts matching = *
- # Stream Compresssion
+ # Stream Compression
#
# The stream with the child can be configurated to enable stream compression.
# You can control stream compression in this parent agent stream with options: yes | no
@@ -216,7 +216,7 @@
#proxy api key = API_KEY
#proxy send charts matching = *
- # Stream Compresssion
+ # Stream Compression
#
# The stream with the child can be configurated to enable stream compression.
# You can control stream compression in this parent agent stream with options: yes | no