summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/README.md226
-rw-r--r--streaming/compression.c345
-rw-r--r--streaming/receiver.c215
-rw-r--r--streaming/rrdpush.c32
-rw-r--r--streaming/rrdpush.h69
-rw-r--r--streaming/sender.c120
-rw-r--r--streaming/stream.conf19
7 files changed, 912 insertions, 114 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 7f74fb31f..71c1dc289 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -1,89 +1,90 @@
-<!--
+---
title: "Streaming and replication"
description: "Replicate and mirror Netdata's metrics through real-time streaming from child to parent nodes. Then combine, correlate, and export."
custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README.md
--->
+---
-# Streaming and replication
-Each Netdata is able to replicate/mirror its database to another Netdata, by streaming the collected
-metrics in real-time to it. This is quite different to [data archiving to third party time-series
+Each Netdata node is able to replicate/mirror its database to another Netdata node, by streaming the collected
+metrics in real-time. This is quite different to [data archiving to third party time-series
databases](/exporting/README.md).
+The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
+
+There are also **proxy** nodes, which collect metrics from a child and sends it to a parent.
-When Netdata streams metrics to another Netdata, the receiving one is able to perform everything a Netdata instance is
-capable of. This includes the following:
+When one Netdata node streams metrics another, the receiving instance can use the data for all features of a typical Netdata node, for example:
- Visualize metrics with a dashboard
- Run health checks that trigger alarms and send alarm notifications
- Export metrics to an external time-series database
-The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
-There are also **proxy** nodes, which collects metrics from a child and sends it to a parent.
+
+
## Supported configurations
### Netdata without a database or web API (headless collector)
-Local Netdata (child), **without any database or alarms**, collects metrics and sends them to another Netdata
+A local Netdata Agent (child), **without any database or alarms**, collects metrics and sends them to another Netdata node
(parent).
+The same parent can collect data for any number of child nodes and serves alerts for each child.
The node menu shows a list of all "databases streamed to" the parent. Clicking one of those links allows the user to
view the full dashboard of the child node. The URL has the form
`http://parent-host:parent-port/host/child-host/`.
-Alarms for the child are served by the parent.
-In this mode the child is just a plain data collector. It spawns all external plugins, but instead of maintaining a
-local database and accepting dashboard requests, it streams all metrics to the parent. The memory footprint is reduced
-significantly, to between 6 MiB and 40 MiB, depending on the enabled plugins. To reduce the memory usage as much as
+In a headless setup, the child acts as a plain data collector. It spawns all external plugins, but instead of maintaining a
+local database and accepting dashboard requests, it streams all metrics to the parent.
+
+This setup works great to reduce the memory footprint. Depending on the enabled plugins, memory usage is between 6 MiB and 40 MiB. To reduce the memory usage as much as
possible, refer to the [performance optimization guide](/docs/guides/configure/performance.md).
-The same parent can collect data for any number of child nodes.
### Database Replication
-Local Netdata (child), **with a local database (and possibly alarms)**, collects metrics and
-sends them to another Netdata (parent).
+The local Netdata Agent (child), **with a local database (and possibly alarms)**, collects metrics and
+sends them to another Netdata node (parent).
The user can use all the functions **at both** `http://child-ip:child-port/` and
`http://parent-host:parent-port/host/child-host/`.
The child and the parent may have different data retention policies for the same metrics.
-Alarms for the child are triggered by **both** the child and the parent (and actually
-each can have different alarms configurations or have alarms disabled).
+Alerts for the child are triggered by **both** the child and the parent.
+It is possible to enable different alert configurations on the parent and the child.
-Take a note, that custom chart names, configured on the child, should be in the form `type.name` to work correctly. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
+In order for custom chart names on the child to work correctly, follow the form `type.name`. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
### Netdata proxies
-Local Netdata (child), with or without a database, collects metrics and sends them to another
-Netdata (**proxy**), which may or may not maintain a database, which forwards them to another
+The local Netdata Agent(child), with or without a database, collects metrics and sends them to another
+Netdata node(**proxy**), which may or may not maintain a database, which forwards them to another
Netdata (parent).
-Alarms for the child can be triggered by any of the involved hosts that maintains a database.
+Alerts for the child can be triggered by any of the involved hosts that maintains a database.
-Any number of daisy chaining Netdata servers are supported, each with or without a database and
-with or without alarms for the child metrics.
+You can daisy-chain any number of Netdata, each with or without a database and
+with or without alerts for the child metrics.
-### mix and match with backends
+### Mix and match with backends
All nodes that maintain a database can also send their data to a backend database.
This allows quite complex setups.
Example:
-1. Netdata `A`, `B` do not maintain a database and stream metrics to Netdata `C`(live streaming functionality, i.e. this PR)
-2. Netdata `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (backends functionality)
-3. Netdata `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality, i.e. this PR)
-4. Netdata `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`
-5. Netdata `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (backends functionality)
-6. alarms are triggered by `H` for all hosts
-7. users can use all the Netdata that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
+1. Netdata nodes `A` and `B` do not maintain a database and stream metrics to Netdata node `C`(live streaming functionality).
+2. Netdata node `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (backends functionality).
+3. Netdata node `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality).
+4. Netdata node `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`.
+5. Netdata node `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (backends functionality)
+6. Alerts are triggered by `H` for all hosts.
+7. Users can use all Netdata nodes that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
## Configuration
-These are options that affect the operation of Netdata in this area:
+The following options affect how Netdata streams:
```
[global]
@@ -91,7 +92,7 @@ These are options that affect the operation of Netdata in this area:
```
`[global].memory mode = none` disables the database at this host. This also disables health
-monitoring (there cannot be health monitoring without a database).
+monitoring because a node can't have health monitoring without a database.
```
[web]
@@ -117,22 +118,27 @@ entries "... too busy to accept new streaming request. Will be allowed in X secs
`[backend]` configures data archiving to a backend (it archives all databases maintained on
this host).
-### streaming configuration
+### Streaming configuration
+
+The new file `stream.conf` contains streaming configuration for a sending and a receiving Netdata node.
+
+To configure streaming on your system:
+1. Generate an API key using `uuidgen`. Note: API keys are just random GUIDs. You can use the same API key on all your Netdata, or use a different API key for any pair of sending-receiving Netdata nodes.
-A new file is introduced: `stream.conf` (to edit it on your system run
-`/etc/netdata/edit-config stream.conf`). This file holds streaming configuration for both the
-sending and the receiving Netdata.
+2. Authorize the communication between a pair of sending-receiving Netdata nodes using the generated API key.
+Once the communication is authorized, the sending Netdata node can push metrics for any number of hosts.
-API keys are used to authorize the communication of a pair of sending-receiving Netdata.
-Once the communication is authorized, the sending Netdata can push metrics for any number of hosts.
+3. To edit `stream.conf`, run `/etc/netdata/edit-config stream.conf`
-You can generate an API key with the command `uuidgen`. API keys are just random GUIDs.
-You can use the same API key on all your Netdata, or use a different API key for any pair of
-sending-receiving Netdata.
+The following sections describe how you can configure sending and receiving Netdata nodes.
-##### options for the sending node
-This is the section for the sending Netdata. On the receiving node, `[stream].enabled` can be `no`.
+
+
+
+##### Options for the sending node
+
+This is the section for the sending Netdata node. On the receiving node, `[stream].enabled` can be `no`.
If it is `yes`, the receiving node will also stream the metrics to another node (i.e. it will be
a proxy).
@@ -141,8 +147,13 @@ a proxy).
enabled = yes | no
destination = IP:PORT[:SSL] ...
api key = XXXXXXXXXXX
-```
+[API_KEY]
+ enabled = yes | no
+
+[MACHINE_GUID]
+ enabled = yes | no
+```
This is an overview of how these options can be combined:
| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|backend|alarms|dashboard|
@@ -154,9 +165,10 @@ This is an overview of how these options can be combined:
For the options to encrypt the data stream between the child and the parent, refer to [securing the communication](#securing-streaming-communications)
-##### options for the receiving node
-`stream.conf` looks like this:
+##### Options for the receiving node
+
+For a receiving Netdata node, the `stream.conf` looks like this:
```sh
# replace API_KEY with your uuidgen generated GUID
@@ -192,7 +204,7 @@ You can also use `default memory mode = dbengine` for an API key or `memory mode
a single host. The additional `page cache size` and `dbengine multihost disk space` configuration options
are inherited from the global Netdata configuration.
-##### allow from
+##### Allow from
`allow from` settings are [Netdata simple patterns](/libnetdata/simple_pattern/README.md): string matches
that use `*` as wildcard (any number of times) and a `!` prefix for a negative match.
@@ -201,7 +213,7 @@ important: left to right, the first positive or negative match is used.
`allow from` is available in Netdata v1.9+
-##### tracing
+##### Tracing
When a child is trying to push metrics to a parent or proxy, it logs entries like these:
@@ -229,7 +241,10 @@ For Netdata v1.9+, streaming can also be monitored via `access.log`.
### Securing streaming communications
-Netdata does not activate TLS encryption by default. To encrypt streaming connections, you first need to [enable TLS support](/web/server/README.md#enabling-tls-support) on the parent. With encryption enabled on the receiving side, you need to instruct the child to use TLS/SSL as well. On the child's `stream.conf`, configure the destination as follows:
+Netdata does not activate TLS encryption by default. To encrypt streaming connections:
+1. On the parent node (receiving node), [enable TLS support](/web/server/README.md#enabling-tls-support).
+2. On the child node (sending node), [enable TLS support](/web/server/README.md#enabling-tls-support).
+3. On the child's `stream.conf`, configure the destination as follows:
```
[stream]
@@ -316,6 +331,87 @@ With the introduction of TLS/SSL, the parent-child communication behaves as show
| Yes|-/force/optional|Yes|no|The parent-child stream is encrypted, provided that the parent has a valid TLS/SSL certificate. Otherwise, the child refuses to connect.|
| Yes|-/force/optional|Yes|yes|The parent-child stream is encrypted.|
+### Streaming compression
+
+[![Supported version Netdata Agent release](https://img.shields.io/badge/Supported%20Netdata%20Agent-v1.33%2B-brightgreen)](https://github.com/netdata/netdata/releases/latest)
+
+[![Supported version Netdata Agent release](https://img.shields.io/badge/Supported%20Netdata%20stream%20version-v5%2B-blue)](https://github.com/netdata/netdata/releases/latest)
+
+#### OS dependencies
+* Streaming compression is based on [lz4 v1.9.0+](https://github.com/lz4/lz4). The [lz4 v1.9.0+](https://github.com/lz4/lz4) library must be installed in your OS in order to enable streaming compression. Any lower version will disable Netdata streaming compression for compatibility purposes between the older versions of Netdata agents.
+
+To check if your Netdata Agent supports stream compression run the following GET request in your browser or terminal:
+
+```
+curl -X GET http://localhost:19999/api/v1/info | grep 'Stream Compression'
+```
+
+**Output**
+```
+"buildinfo": "dbengine|Native HTTPS|Netdata Cloud|ACLK Next Generation|New Cloud Protocol Support|ACLK Legacy|TLS Host Verification|Machine Learning|Stream Compression|protobuf|JSON-C|libcrypto|libm|LWS v3.2.2|mosquitto|zlib|apps|cgroup Network Tracking|EBPF|perf|slabinfo",
+```
+> Note: If your OS doesn't support Netdata compression the `buildinfo` will not contain the `Stream Compression` statement.
+
+To check if your Netdata Agent has stream compression enabled, run the following GET request in your browser or terminal:
+
+```
+ curl -X GET http://localhost:19999/api/v1/info | grep 'stream-compression'
+```
+**Output**
+```
+"stream-compression": "enabled"
+```
+Note: The `stream-compression` status can be `"enabled" | "disabled" | "N/A"`.
+
+A compressed data packet is determined and decompressed on the fly.
+
+#### Limitations
+ This limitation will be withdrawn asap and is work-in-progress.
+
+The current implementation of streaming data compression can support only a few number of dimensions in a chart with names that cannot exceed the size of 16384 bytes. In case you experience stream connection problems or gaps in the charts please disable stream compresssion in the `stream.conf` file. This limitation can be seen in the error.log file with the sequence of the following messages:
+```
+Compression error - data discarded
+Message size above limit:
+```
+
+#### How to enable stream compression
+Netdata Agents are shipped with data compression disabled by default. You can also configure which streams will use compression.
+
+With enabled stream compression, a Netdata Agent can negotiate streaming compression with other Netdata Agents. During the negotiation of streaming compression both Netdata Agents should support and enable compression in order to communicate over a compressed stream. The negotiation will result into an uncompressed stream, if one of the Netdata Agents doesn't support **or** has compression disabled.
+
+To enable stream compression:
+
+1. Edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`.
+
+2. In the `[stream]` section, set `enable compression` to `yes`.
+```
+# This is the default stream compression flag for an agent.
+
+[stream]
+ enable compression = yes | no
+```
+
+
+| Parent | Stream compression | Child |
+|----------------------|--------------------|----------------------|
+| Supported & Enabled | compressed | Supported & Enabled |
+| (Supported & Disabled)/Not supported | uncompressed | Supported & Enabled |
+| Supported & Enabled | uncompressed | (Supported & Disabled)/Not supported |
+| (Supported & Disabled)/Not supported | uncompressed | (Supported & Disabled)/Not supported |
+
+In case of parents with multiple children you can select which streams will be compressed by using the same configuration under the `[API_KEY]`, `[MACHINE_GUID]` section.
+
+This configuration uses AND logic with the default stream compression configuration under the `[stream]` section. This means the stream compression from child to parent will be enabled only if the outcome of the AND logic operation is true (`default compression enabled` && `api key compression enabled`). So both should be enabled to get stream compression otherwise stream compression is disabled.
+```
+[API_KEY]
+ enable compression = yes | no
+```
+Same thing applies with the `[MACHINE_GUID]` configuration.
+```
+[MACHINE_GUID]
+ enable compression = yes | no
+```
## Viewing remote host dashboards, using mirrored databases
On any receiving Netdata, that maintains remote databases and has its web server enabled,
@@ -385,7 +481,12 @@ I will use this API_KEY: `11111111-2222-3333-4444-555555555555`. Replace it with
#### Configuring the parent
-On the parent, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+To configure the parent node:
+
+1. On the parent node, edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`
+
+2. Set the following parameters:
```bash
[11111111-2222-3333-4444-555555555555]
@@ -414,7 +515,12 @@ the `netdata` process, but a system power issue would leave the connection open
#### Configuring the child nodes
-On each of the child nodes, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+To configure the child node:
+
+1. On the child node, edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`
+
+2. Set the following parameters:
```bash
[stream]
@@ -446,9 +552,9 @@ _`netdata.conf` configuration on child nodes, to disable the local database and
Keep in mind that setting `memory mode = none` will also force `[health].enabled = no` (health checks require access to a local database). But you can keep the database and disable health checks if you need to. You are however sending all the metrics to the parent node, which can handle the health checking (`[health].enabled = yes`)
-#### Netdata unique id
+#### Netdata unique ID
-The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each Netdata**. This file is automatically generated, by Netdata, the first time it is started and remains unaltered forever.
+The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each Netdata Agent**. This file is automatically generated, by Netdata, the first time it is started and remains unaltered forever.
> If you are building an image to be used for automated provisioning of autoscaled VMs, it important to delete that file from the image, so that each instance of your image will generate its own.
@@ -456,7 +562,7 @@ The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random
Both parent and child nodes log information at `/var/log/netdata/error.log`.
-Run the following on both the parent and child nodes:
+To obtain the error logs, run the following on both the parent and child nodes:
```
tail -f /var/log/netdata/error.log | grep STREAM
@@ -511,7 +617,7 @@ This means a setup like the following is also possible:
## Proxies
-A proxy is a Netdata instance that is receiving metrics from a Netdata, and streams them to another Netdata.
+A proxy is a Netdata node that is receiving metrics from a Netdata node, and streams them to another Netdata node.
Netdata proxies may or may not maintain a database for the metrics passing through them.
When they maintain a database, they can also run health checks (alarms and notifications)
@@ -571,11 +677,11 @@ down), you will see the following in the child's `error.log`.
ERROR : STREAM_SENDER[HOSTNAME] : Failed to connect to 'PARENT IP', port 'PARENT PORT' (errno 113, No route to host)
```
-### 'Is this a Netdata?'
+### 'Is this a Netdata node?'
This question can appear when Netdata starts the stream and receives an unexpected response. This error can appear when
the parent is using SSL and the child tries to connect using plain text. You will also see this message when
-Netdata connects to another server that isn't Netdata. The complete error message will look like this:
+Netdata connects to another server that isn't a Netdata node. The complete error message will look like this:
```
ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM child HOSTNAME [send to PARENT HOSTNAME:PARENT PORT]: server is not replying properly (is it a netdata?).
diff --git a/streaming/compression.c b/streaming/compression.c
new file mode 100644
index 000000000..917f05bd6
--- /dev/null
+++ b/streaming/compression.c
@@ -0,0 +1,345 @@
+#include "rrdpush.h"
+#include "lz4.h"
+
+#ifdef ENABLE_COMPRESSION
+
+#define LZ4_MAX_MSG_SIZE 0x4000
+#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
+
+#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
+#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
+#define SIGNATURE_SIZE 4
+
+
+/*
+ * LZ4 streaming API compressor specific data
+ */
+struct compressor_data {
+ LZ4_stream_t *stream;
+ char *stream_buffer;
+ size_t stream_buffer_pos;
+};
+
+
+/*
+ * Reset compressor state for a new stream
+ */
+static void lz4_compressor_reset(struct compressor_state *state)
+{
+ if (state->data) {
+ if (state->data->stream) {
+ LZ4_resetStream_fast(state->data->stream);
+ info("STREAM_COMPRESSION: Compressor resets stream fast!");
+ }
+ state->data->stream_buffer_pos = 0;
+ }
+}
+
+/*
+ * Destroy compressor state and all related data
+ */
+static void lz4_compressor_destroy(struct compressor_state **state)
+{
+ if (state && *state) {
+ struct compressor_state *s = *state;
+ if (s->data) {
+ if (s->data->stream)
+ LZ4_freeStream(s->data->stream);
+ freez(s->data->stream_buffer);
+ }
+ freez(s->buffer);
+ freez(s);
+ *state = NULL;
+ debug(D_STREAM, "STREAM_COMPRESSION: Compressor destroyed!");
+ }
+}
+
+/*
+ * Compress the given block of data
+ * Comprecced data will remain in the internal buffer until the next invokation
+ * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
+ * or 0 in case of error
+ */
+static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
+{
+ if (!state || !size || !out)
+ return 0;
+ if (size > LZ4_MAX_MSG_SIZE) {
+ error("Message size above limit: %lu", size);
+ return 0;
+ }
+ size_t max_dst_size = LZ4_COMPRESSBOUND(size);
+ size_t data_size = max_dst_size + SIGNATURE_SIZE;
+
+ if (!state->buffer) {
+ state->buffer = mallocz(data_size);
+ state->buffer_size = data_size;
+ } else if (state->buffer_size < data_size) {
+ state->buffer = reallocz(state->buffer, data_size);
+ state->buffer_size = data_size;
+ }
+
+ memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size);
+ long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream,
+ state->data->stream_buffer + state->data->stream_buffer_pos,
+ state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
+ if (compressed_data_size < 0) {
+ error("Date compression error: %ld", compressed_data_size);
+ return 0;
+ }
+ state->data->stream_buffer_pos += size;
+ if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE)
+ state->data->stream_buffer_pos = 0;
+ uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
+ *(uint32_t *)state->buffer = len | SIGNATURE;
+ *out = state->buffer;
+ debug(D_STREAM, "STREAM: Compressed data header: %ld", compressed_data_size);
+ return compressed_data_size + SIGNATURE_SIZE;
+}
+
+/*
+ * Create and initalize compressor state
+ * Return the pointer to compressor_state structure created
+ */
+struct compressor_state *create_compressor()
+{
+ struct compressor_state *state = callocz(1, sizeof(struct compressor_state));
+
+ state->reset = lz4_compressor_reset;
+ state->compress = lz4_compressor_compress;
+ state->destroy = lz4_compressor_destroy;
+
+ state->data = callocz(1, sizeof(struct compressor_data));
+ state->data->stream = LZ4_createStream();
+ state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
+ state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
+ state->reset(state);
+ debug(D_STREAM, "STREAM_COMPRESSION: Initialize streaming compression!");
+ return state;
+}
+
+/*
+ * LZ4 streaming API decompressor specific data
+ */
+struct decompressor_data {
+ LZ4_streamDecode_t *stream;
+ char *stream_buffer;
+ size_t stream_buffer_size;
+ size_t stream_buffer_pos;
+};
+
+/*
+ * Reset decompressor state for a new stream
+ */
+static void lz4_decompressor_reset(struct decompressor_state *state)
+{
+ if (state->data) {
+ if (state->data->stream)
+ LZ4_setStreamDecode(state->data->stream, NULL, 0);
+ state->data->stream_buffer_pos = 0;
+ state->buffer_len = 0;
+ state->out_buffer_len = 0;
+ }
+}
+
+/*
+ * Destroy decompressor state and all related data
+ */
+static void lz4_decompressor_destroy(struct decompressor_state **state)
+{
+ if (state && *state) {
+ struct decompressor_state *s = *state;
+ if (s->data) {
+ debug(D_STREAM, "STREAM_COMPRESSION: Destroying decompressor.");
+ if (s->data->stream)
+ LZ4_freeStreamDecode(s->data->stream);
+ freez(s->data->stream_buffer);
+ }
+ freez(s->buffer);
+ freez(s);
+ *state = NULL;
+ }
+}
+
+static size_t decode_compress_header(const char *data, size_t data_size)
+{
+ if (!data || !data_size)
+ return 0;
+ if (data_size < SIGNATURE_SIZE)
+ return 0;
+ uint32_t sign = *(uint32_t *)data;
+ if ((sign & SIGNATURE_MASK) != SIGNATURE)
+ return 0;
+ size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
+ return length;
+}
+
+/*
+ * Check input data for the compression header
+ * Return the size of compressed data or 0 for uncompressed data
+ */
+size_t is_compressed_data(const char *data, size_t data_size)
+{
+ return decode_compress_header(data, data_size);
+}
+
+/*
+ * Start the collection of compressed data in an internal buffer
+ * Return the size of compressed data or 0 for uncompressed data
+ */
+static size_t lz4_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size)
+{
+ size_t length = decode_compress_header(header, header_size);
+ if (!length)
+ return 0;
+
+ if (!state->buffer) {
+ state->buffer = mallocz(length);
+ state->buffer_size = length;
+ } else if (state->buffer_size < length) {
+ state->buffer = reallocz(state->buffer, length);
+ state->buffer_size = length;
+ }
+ state->buffer_len = length;
+ state->buffer_pos = 0;
+ state->out_buffer_pos = 0;
+ state->out_buffer_len = 0;
+ return length;
+}
+
+/*
+ * Add a chunk of compressed data to the internal buffer
+ * Return the current size of compressed data or 0 for error
+ */
+static size_t lz4_decompressor_put(struct decompressor_state *state, const char *data, size_t size)
+{
+ if (!state || !size || !data)
+ return 0;
+ if (!state->buffer)
+ fatal("STREAM: No decompressor buffer allocated");
+
+ if (state->buffer_pos + size > state->buffer_len) {
+ error("STREAM: Decompressor buffer overflow %lu + %lu > %lu",
+ state->buffer_pos, size, state->buffer_len);
+ size = state->buffer_len - state->buffer_pos;
+ }
+ memcpy(state->buffer + state->buffer_pos, data, size);
+ state->buffer_pos += size;
+ return state->buffer_pos;
+}
+
+static size_t saving_percent(size_t comp_len, size_t src_len)
+{
+ if (comp_len > src_len)
+ comp_len = src_len;
+ if (!src_len)
+ return 0;
+ return 100 - comp_len * 100 / src_len;
+}
+
+/*
+ * Decompress the compressed data in the internal buffer
+ * Return the size of uncompressed data or 0 for error
+ */
+static size_t lz4_decompressor_decompress(struct decompressor_state *state)
+{
+ if (!state)
+ return 0;
+ if (!state->buffer) {
+ error("STREAM: No decompressor buffer allocated");
+ return 0;
+ }
+
+ long int decompressed_size = LZ4_decompress_safe_continue(state->data->stream, state->buffer,
+ state->data->stream_buffer + state->data->stream_buffer_pos,
+ state->buffer_len, state->data->stream_buffer_size - state->data->stream_buffer_pos);
+ if (decompressed_size < 0) {
+ error("STREAM: Decompressor error %ld", decompressed_size);
+ return 0;
+ }
+
+ state->out_buffer = state->data->stream_buffer + state->data->stream_buffer_pos;
+ state->data->stream_buffer_pos += decompressed_size;
+ if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - LZ4_MAX_MSG_SIZE)
+ state->data->stream_buffer_pos = 0;
+ state->out_buffer_len = decompressed_size;
+ state->out_buffer_pos = 0;
+
+ // Some compression statistics
+ size_t old_avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
+ size_t old_avg_size = state->packet_count ? state->total_uncompressed / state->packet_count : 0;
+
+ state->total_compressed += state->buffer_len + SIGNATURE_SIZE;
+ state->total_uncompressed += decompressed_size;
+ state->packet_count++;
+
+ size_t saving = saving_percent(state->buffer_len, decompressed_size);
+ size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
+ size_t avg_size = state->total_uncompressed / state->packet_count;
+
+ if (old_avg_saving != avg_saving || old_avg_size != avg_size){
+ debug(D_STREAM, "STREAM: Saving: %lu%% (avg. %lu%%), avg.size: %lu", saving, avg_saving, avg_size);
+ }
+ return decompressed_size;
+}
+
+/*
+ * Return the size of uncompressed data left in the internal buffer or 0 for error
+ */
+static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state)
+{
+ return state->out_buffer_len ?
+ state->out_buffer_len - state->out_buffer_pos : 0;
+}
+
+/*
+ * Fill the buffer provided with uncompressed data from the internal buffer
+ * Return the size of uncompressed data copied or 0 for error
+ */
+static size_t lz4_decompressor_get(struct decompressor_state *state, char *data, size_t size)
+{
+ if (!state || !size || !data)
+ return 0;
+ if (!state->out_buffer)
+ fatal("STREAM: No decompressor output buffer allocated");
+ if (state->out_buffer_pos + size > state->out_buffer_len)
+ size = state->out_buffer_len - state->out_buffer_pos;
+
+ char *p = state->out_buffer + state->out_buffer_pos, *endp = p + size, *last_lf = NULL;
+ for (; p < endp; ++p)
+ if (*p == '\n' || *p == 0)
+ last_lf = p;
+ if (last_lf)
+ size = last_lf + 1 - (state->out_buffer + state->out_buffer_pos);
+
+ memcpy(data, state->out_buffer + state->out_buffer_pos, size);
+ state->out_buffer_pos += size;
+ return size;
+}
+
+/*
+ * Create and initalize decompressor state
+ * Return the pointer to decompressor_state structure created
+ */
+struct decompressor_state *create_decompressor()
+{
+ struct decompressor_state *state = callocz(1, sizeof(struct decompressor_state));
+ state->reset = lz4_decompressor_reset;
+ state->start = lz4_decompressor_start;
+ state->put = lz4_decompressor_put;
+ state->decompress = lz4_decompressor_decompress;
+ state->get = lz4_decompressor_get;
+ state->decompressed_bytes_in_buffer = lz4_decompressor_decompressed_bytes_in_buffer;
+ state->destroy = lz4_decompressor_destroy;
+
+ state->data = callocz(1, sizeof(struct decompressor_data));
+ fatal_assert(state->data);
+ state->data->stream = LZ4_createStreamDecode();
+ state->data->stream_buffer_size = LZ4_decoderRingBufferSize(LZ4_MAX_MSG_SIZE);
+ state->data->stream_buffer = mallocz(state->data->stream_buffer_size);
+ fatal_assert(state->data->stream_buffer);
+ state->reset(state);
+ debug(D_STREAM, "STREAM_COMPRESSION: Initialize streaming decompression!");
+ return state;
+}
+#endif
diff --git a/streaming/receiver.c b/streaming/receiver.c
index e8f8528a7..b083766dd 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -22,6 +22,10 @@ void destroy_receiver_state(struct receiver_state *rpt) {
SSL_free(rpt->ssl.conn);
}
#endif
+#ifdef ENABLE_COMPRESSION
+ if (rpt->decompressor)
+ rpt->decompressor->destroy(&rpt->decompressor);
+#endif
freez(rpt);
}
@@ -69,15 +73,33 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host);
time_t gap = 0;
if (prev == 0)
- info("STREAM %s from %s: Initial connection (no gap to check), remote=%ld local=%ld slew=%ld",
- host->hostname, cd->cmd, remote_time, now, now-remote_time);
+ info(
+ "STREAM %s from %s: Initial connection (no gap to check), "
+ "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"",
+ host->hostname,
+ cd->cmd,
+ (int64_t)remote_time,
+ (int64_t)now,
+ (int64_t)now - remote_time);
else {
gap = now - prev;
- info("STREAM %s from %s: Checking for gaps... remote=%ld local=%ld..%ld slew=%ld %ld-sec gap",
- host->hostname, cd->cmd, remote_time, prev, now, remote_time - now, gap);
+ info(
+ "STREAM %s from %s: Checking for gaps... "
+ "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap",
+ host->hostname,
+ cd->cmd,
+ (int64_t)remote_time,
+ (int64_t)prev,
+ (int64_t)now,
+ (int64_t)(remote_time - now),
+ (int64_t)gap);
}
char message[128];
- sprintf(message,"REPLICATE %ld %ld\n", remote_time - gap, remote_time);
+ sprintf(
+ message,
+ "REPLICATE %"PRId64" %"PRId64"\n",
+ (int64_t)(remote_time - gap),
+ (int64_t)remote_time);
int ret;
#ifdef ENABLE_HTTPS
SSL *conn = host->stream_ssl.conn ;
@@ -141,6 +163,8 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin
return PARSER_RC_OK;
}
+
+#ifndef ENABLE_COMPRESSION
/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
*/
static int receiver_read(struct receiver_state *r, FILE *fp) {
@@ -168,6 +192,130 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
r->read_len = strlen(r->read_buffer);
return 0;
}
+#else
+/*
+ * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
+ * if SSL encryption is on, then use SSL API for reading stream data.
+ * Use line oriented fgets() in buffer from receiver_state is provided.
+ * In other cases use fread to read binary data from socket.
+ * Return zero on success and the number of bytes were read using pointer in the last argument.
+ */
+static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) {
+ if (!ret)
+ return 1;
+ *ret = 0;
+#ifdef ENABLE_HTTPS
+ if (r->ssl.conn && !r->ssl.flags) {
+ ERR_clear_error();
+ if (buffer != r->read_buffer + r->read_len) {
+ *ret = SSL_read(r->ssl.conn, buffer, size);
+ if (*ret > 0 )
+ return 0;
+ } else {
+ // we need to receive data with LF to parse compression header
+ size_t ofs = 0;
+ int res = 0;
+ while (ofs < size) {
+ do {
+ res = SSL_read(r->ssl.conn, buffer + ofs, 1);
+ } while (res == 0);
+
+ if (res < 0)
+ break;
+ if (buffer[ofs] == '\n')
+ break;
+ ofs += res;
+ }
+ if (res > 0) {
+ ofs += res;
+ *ret = ofs;
+ buffer[ofs] = 0;
+ return 0;
+ }
+ }
+ // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
+ u_long err;
+ char buf[256];
+ while ((err = ERR_get_error()) != 0) {
+ ERR_error_string_n(err, buf, sizeof(buf));
+ error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf);
+ }
+ return 1;
+ }
+#endif
+ if (buffer != r->read_buffer + r->read_len) {
+ // read to external buffer
+ *ret = fread(buffer, 1, size, fp);
+ if (!*ret)
+ return 1;
+ } else {
+ if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
+ return 1;
+ *ret = strlen(r->read_buffer);
+ }
+ return 0;
+}
+
+/*
+ * Get the next line of data for parsing.
+ * Return data from the decompressor buffer if available.
+ * Otherwise read next line from the socket and check for compression header.
+ * Return the line was read If no compression header was found.
+ * Otherwise read the entire block of compressed data, decompress it
+ * and return it in receiver_state buffer.
+ * Return zero on success.
+ */
+static int receiver_read(struct receiver_state *r, FILE *fp) {
+ // check any decompressed data present
+ if (r->decompressor &&
+ r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
+ size_t available = sizeof(r->read_buffer) - r->read_len;
+ if (available) {
+ size_t len = r->decompressor->get(r->decompressor,
+ r->read_buffer + r->read_len, available);
+ if (!len)
+ return 1;
+ r->read_len += len;
+ }
+ return 0;
+ }
+ int ret = 0;
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
+ return 1;
+
+ if (!is_compressed_data(r->read_buffer, ret)) {
+ r->read_len += ret;
+ return 0;
+ }
+
+ if (unlikely(!r->decompressor))
+ r->decompressor = create_decompressor();
+
+ size_t bytes_to_read = r->decompressor->start(r->decompressor,
+ r->read_buffer, ret);
+
+ // Read the entire block of compressed data because
+ // we're unable to decompress incomplete block
+ char compressed[bytes_to_read];
+ do {
+ if (read_stream(r, fp, compressed, bytes_to_read, &ret))
+ return 1;
+ // Send input data to decompressor
+ if (ret)
+ r->decompressor->put(r->decompressor, compressed, ret);
+ bytes_to_read -= ret;
+ } while (bytes_to_read > 0);
+ // Decompress
+ size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
+ if (!bytes_to_parse)
+ return 1;
+ // Fill read buffer with decompressed data
+ r->read_len = r->decompressor->get(r->decompressor,
+ r->read_buffer, sizeof(r->read_buffer));
+ return 0;
+}
+
+#endif
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
@@ -190,7 +338,6 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) {
return NULL;
}
-
size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
size_t result;
PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
@@ -226,7 +373,11 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
user->parser = parser;
- do {
+#ifdef ENABLE_COMPRESSION
+ if (rpt->decompressor)
+ rpt->decompressor->reset(rpt->decompressor);
+#endif
+ do{
if (receiver_read(rpt, fp))
break;
int pos = 0;
@@ -293,6 +444,13 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression = default_compression_enabled;
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression);
+ rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled);
+#endif //ENABLE_COMPRESSION
+
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
@@ -345,6 +503,31 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
netdata_mutex_unlock(&rpt->host->receiver_lock);
}
+ else {
+ rrd_wrlock();
+ rrdhost_update(
+ rpt->host,
+ rpt->hostname,
+ rpt->registry_hostname,
+ rpt->machine_guid,
+ rpt->os,
+ rpt->timezone,
+ rpt->abbrev_timezone,
+ rpt->utc_offset,
+ rpt->tags,
+ rpt->program_name,
+ rpt->program_version,
+ rpt->update_every,
+ history,
+ mode,
+ (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO),
+ (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key),
+ rrdpush_destination,
+ rrdpush_api_key,
+ rrdpush_send_charts_matching,
+ rpt->system_info);
+ rrd_unlock();
+ }
#ifdef NETDATA_INTERNAL_CHECKS
int ssl = 0;
@@ -389,6 +572,16 @@ static int rrdpush_receive(struct receiver_state *rpt)
info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
char initial_response[HTTP_HEADER_SIZE];
if (rpt->stream_version > 1) {
+ if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
+#ifdef ENABLE_COMPRESSION
+ if(!rpt->rrdpush_compression)
+ rpt->stream_version = STREAM_VERSION_CLABELS;
+#else
+ if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) {
+ rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
+ }
+#endif
+ }
info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
} else if (rpt->stream_version == 1) {
@@ -441,10 +634,10 @@ static int rrdpush_receive(struct receiver_state *rpt)
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
- info("Postponing health checks for %ld seconds, on host '%s', because it was just connected."
- , alarms_delay
- , rpt->host->hostname
- );
+ info(
+ "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.",
+ (int64_t)alarms_delay,
+ rpt->host->hostname);
}
}
rrdhost_unlock(rpt->host);
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 53a897699..ebd8327f0 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -40,6 +40,9 @@ struct config stream_config = {
};
unsigned int default_rrdpush_enabled = 0;
+#ifdef ENABLE_COMPRESSION
+unsigned int default_compression_enabled = 0;
+#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
@@ -73,7 +76,10 @@ int rrdpush_init() {
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
-
+#ifdef ENABLE_COMPRESSION
+ default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
+ "enable compression", default_compression_enabled);
+#endif
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
@@ -287,7 +293,7 @@ static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_s
RRDHOST *host = st->rrdhost;
buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
if (s->version >= VERSION_GAP_FILLING)
- buffer_sprintf(host->sender->build, " %ld\n", st->last_collected_time.tv_sec);
+ buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec);
else
buffer_strcat(host->sender->build, "\n");
@@ -522,6 +528,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
utc_offset = (int32_t)strtol(value, NULL, 0);
else if(!strcmp(name, "hops"))
system_info->hops = (uint16_t) strtoul(value, NULL, 0);
+ else if(!strcmp(name, "ml_capable"))
+ system_info->ml_capable = strtoul(value, NULL, 0);
+ else if(!strcmp(name, "ml_enabled"))
+ system_info->ml_enabled = strtoul(value, NULL, 0);
else if(!strcmp(name, "tags"))
tags = value;
else if(!strcmp(name, "ver"))
@@ -677,7 +687,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
host->receiver->shutdown = 1;
shutdown(host->receiver->fd, SHUT_RDWR);
host->receiver = NULL; // Thread holds reference to structure
- info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is dead (%ld sec), accepting new connection.", host->hostname, w->client_ip, w->client_port, age);
+ info(
+ "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
+ "existing connection is dead (%"PRId64" sec), accepting new connection.",
+ host->hostname,
+ w->client_ip,
+ w->client_port,
+ (int64_t)age);
}
else {
netdata_mutex_unlock(&host->receiver_lock);
@@ -685,7 +701,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
rrd_unlock();
log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname,
"REJECTED - ALREADY CONNECTED");
- info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is active (within last %ld sec), rejecting new connection.", host->hostname, w->client_ip, w->client_port, age);
+ info(
+ "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
+ "existing connection is active (within last %"PRId64" sec), rejecting new connection.",
+ host->hostname,
+ w->client_ip,
+ w->client_port,
+ (int64_t)age);
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
buffer_strcat(w->response.data, "This GUID is already streaming to this server");
@@ -759,4 +781,4 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
buffer_flush(w->response.data);
return 200;
-}
+} \ No newline at end of file
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 027ccd102..937ead6fa 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,10 +10,16 @@
#define CONNECTED_TO_SIZE 100
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4
#define STREAM_VERSION_CLAIM 3
#define STREAM_VERSION_CLABELS 4
-#define VERSION_GAP_FILLING 5
+#define STREAM_VERSION_COMPRESSION 5
+#define VERSION_GAP_FILLING 6
+
+#ifdef ENABLE_COMPRESSION
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION)
+#else
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS)
+#endif //ENABLE_COMPRESSION
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
@@ -35,6 +41,38 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
+#ifdef ENABLE_COMPRESSION
+struct compressor_state {
+ char *buffer;
+ size_t buffer_size;
+ struct compressor_data *data; // Compression API specific data
+ void (*reset)(struct compressor_state *state);
+ size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
+ void (*destroy)(struct compressor_state **state);
+};
+
+struct decompressor_state {
+ char *buffer;
+ size_t buffer_size;
+ size_t buffer_len;
+ size_t buffer_pos;
+ char *out_buffer;
+ size_t out_buffer_len;
+ size_t out_buffer_pos;
+ size_t total_compressed;
+ size_t total_uncompressed;
+ size_t packet_count;
+ struct decompressor_data *data; // Deompression API specific data
+ void (*reset)(struct decompressor_state *state);
+ size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
+ size_t (*put)(struct decompressor_state *state, const char *data, size_t size);
+ size_t (*decompress)(struct decompressor_state *state);
+ size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
+ size_t (*get)(struct decompressor_state *state, char *data, size_t size);
+ void (*destroy)(struct decompressor_state **state);
+};
+#endif
+
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
@@ -60,6 +98,10 @@ struct sender_state {
char read_buffer[512];
int read_len;
int32_t version;
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression;
+ struct compressor_state *compressor;
+#endif
};
struct receiver_state {
@@ -75,9 +117,9 @@ struct receiver_state {
char *abbrev_timezone;
int32_t utc_offset;
char *tags;
- char *client_ip; // Duplicated in pluginsd
- char *client_port; // Duplicated in pluginsd
- char *program_name; // Duplicated in pluginsd
+ char *client_ip; // Duplicated in pluginsd
+ char *client_port; // Duplicated in pluginsd
+ char *program_name; // Duplicated in pluginsd
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
@@ -85,15 +127,22 @@ struct receiver_state {
time_t last_msg_t;
char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
int read_len;
+ unsigned int shutdown:1; // Tell the thread to exit
+ unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
- unsigned int shutdown:1; // Tell the thread to exit
- unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression;
+ struct decompressor_state *decompressor;
+#endif
};
extern unsigned int default_rrdpush_enabled;
+#ifdef ENABLE_COMPRESSION
+extern unsigned int default_compression_enabled;
+#endif
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@@ -116,4 +165,10 @@ extern void rrdpush_sender_thread_stop(RRDHOST *host);
extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
+#ifdef ENABLE_COMPRESSION
+struct compressor_state *create_compressor();
+struct decompressor_state *create_decompressor();
+size_t is_compressed_data(const char *data, size_t data_size);
+#endif
+
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 0abfac180..916d809a9 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -15,9 +15,26 @@ void sender_start(struct sender_state *s) {
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s) {
- if(cbuffer_add_unsafe(s->host->sender->buffer, buffer_tostring(s->host->sender->build),
- s->host->sender->build->len))
+ char *src = (char *)buffer_tostring(s->host->sender->build);
+ size_t src_len = s->host->sender->build->len;
+#ifdef ENABLE_COMPRESSION
+ do {
+ if (src && src_len) {
+ if (s->compressor && s->rrdpush_compression) {
+ src_len = s->compressor->compress(s->compressor, src, src_len, &src);
+ if (!src_len) {
+ error("Compression error - data discarded");
+ break;
+ }
+ }
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->overflow = 1;
+ }
+ } while (0);
+#else
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
s->overflow = 1;
+#endif
buffer_flush(s->build);
netdata_mutex_unlock(&s->mutex);
}
@@ -147,6 +164,35 @@ void rrdpush_clean_encoded(stream_encoded_t *se)
freez(se->kernel_version);
}
+static inline long int parse_stream_version(RRDHOST *host, char *http)
+{
+ long int stream_version = -1;
+ int answer = -1;
+ char *stream_version_start = strchr(http, '=');
+ if (stream_version_start) {
+ stream_version_start++;
+ stream_version = strtol(stream_version_start, NULL, 10);
+ answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(stream_version_start - http));
+ if (!answer) {
+ rrdpush_set_flags_to_newest_stream(host);
+ }
+ } else {
+ answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
+ if (!answer) {
+ stream_version = 1;
+ rrdpush_set_flags_to_newest_stream(host);
+ } else {
+ answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ if (!answer) {
+ stream_version = 0;
+ host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
+ host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ }
+ }
+ }
+ return stream_version;
+}
+
static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout,
struct sender_state *s) {
@@ -214,7 +260,21 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
char http[HTTP_HEADER_SIZE + 1];
int eol = snprintfz(http, HTTP_HEADER_SIZE,
- "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&abbrev_timezone=%s&utc_offset=%d&hops=%d&tags=%s&ver=%u"
+ "STREAM "
+ "key=%s"
+ "&hostname=%s"
+ "&registry_hostname=%s"
+ "&machine_guid=%s"
+ "&update_every=%d"
+ "&os=%s"
+ "&timezone=%s"
+ "&abbrev_timezone=%s"
+ "&utc_offset=%d"
+ "&hops=%d"
+ "&ml_capable=%d"
+ "&ml_enabled=%d"
+ "&tags=%s"
+ "&ver=%u"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
@@ -253,6 +313,8 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, host->abbrev_timezone
, host->utc_offset
, host->system_info->hops + 1
+ , host->system_info->ml_capable
+ , host->system_info->ml_enabled
, (host->tags) ? host->tags : ""
, STREAMING_PROTOCOL_CURRENT_VERSION
, se.os_name
@@ -339,32 +401,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
http[received] = '\0';
debug(D_STREAM, "Response to sender from far end: %s", http);
- int answer = -1;
- char *version_start = strchr(http, '=');
- int32_t version = -1;
- if(version_start) {
- version_start++;
- version = (int32_t)strtol(version_start, NULL, 10);
- answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http));
- if(!answer) {
- rrdpush_set_flags_to_newest_stream(host);
- }
- } else {
- answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
- if(!answer) {
- version = 1;
- rrdpush_set_flags_to_newest_stream(host);
- }
- else {
- answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
- if(!answer) {
- version = 0;
- host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
- }
- }
- }
-
+ int32_t version = (int32_t)parse_stream_version(host, http);
if(version == -1) {
error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to);
rrdpush_sender_thread_close_socket(host);
@@ -372,10 +409,26 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
}
s->version = version;
+#ifdef ENABLE_COMPRESSION
+ s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+ if(s->rrdpush_compression)
+ {
+ // parent supports compression
+ if(s->compressor)
+ s->compressor->reset(s->compressor);
+ }
+ else {
+ //parent does not support compression or has compression disabled
+ debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
+ s->version = STREAM_VERSION_CLABELS;
+ }
+#endif //ENABLE_COMPRESSION
+
+
info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..."
, host->hostname
, s->connected_to
- , version);
+ , s->version);
if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to);
@@ -565,6 +618,11 @@ void sender_init(struct sender_state *s, RRDHOST *parent) {
s->host = parent;
s->buffer = cbuffer_new(1024, 1024*1024);
s->build = buffer_create(1);
+#ifdef ENABLE_COMPRESSION
+ s->rrdpush_compression = default_compression_enabled;
+ if (default_compression_enabled)
+ s->compressor = create_compressor();
+#endif
netdata_mutex_init(&s->mutex);
}
@@ -630,7 +688,7 @@ void *rrdpush_sender_thread(void *ptr) {
if (s->version >= VERSION_GAP_FILLING) {
time_t now = now_realtime_sec();
sender_start(s);
- buffer_sprintf(s->build, "TIMESTAMP %ld", now);
+ buffer_sprintf(s->build, "TIMESTAMP %"PRId64"", (int64_t)now);
sender_commit(s);
}
rrdpush_claimed_id(s->host);
diff --git a/streaming/stream.conf b/streaming/stream.conf
index b5142632d..3c363fad6 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -60,6 +60,12 @@
# The API_KEY to use (as the sender)
api key =
+ # Stream Compresssion
+ #
+ # The netdata child is configurated to enable stream compression by default.
+ # You can control stream compression in this agent with options: yes | no
+ #enable compression = yes
+
# The timeout to connect and send metrics
timeout seconds = 60
@@ -156,6 +162,12 @@
#default proxy api key = API_KEY
#default proxy send charts matching = *
+ # Stream Compresssion
+ #
+ # The stream with the child can be configurated to enable stream compression.
+ # You can control stream compression in this parent agent stream with options: yes | no
+ #enable compression = yes
+
# -----------------------------------------------------------------------------
# 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA
@@ -203,3 +215,10 @@
#proxy destination = IP:PORT IP:PORT ...
#proxy api key = API_KEY
#proxy send charts matching = *
+
+ # Stream Compresssion
+ #
+ # The stream with the child can be configurated to enable stream compression.
+ # You can control stream compression in this parent agent stream with options: yes | no
+ #enable compression = yes
+ \ No newline at end of file