summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 14:31:17 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 14:31:17 +0000
commit8020f71afd34d7696d7933659df2d763ab05542f (patch)
tree2fdf1b5447ffd8bdd61e702ca183e814afdcb4fc /streaming
parentInitial commit. (diff)
downloadnetdata-8020f71afd34d7696d7933659df2d763ab05542f.tar.xz
netdata-8020f71afd34d7696d7933659df2d763ab05542f.zip
Adding upstream version 1.37.1.upstream/1.37.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/Makefile.am12
-rw-r--r--streaming/README.md734
-rw-r--r--streaming/compression.c322
-rw-r--r--streaming/receiver.c809
-rw-r--r--streaming/replication.c1446
-rw-r--r--streaming/replication.h33
-rw-r--r--streaming/rrdpush.c1044
-rw-r--r--streaming/rrdpush.h305
-rw-r--r--streaming/sender.c1358
-rw-r--r--streaming/stream.conf246
10 files changed, 6309 insertions, 0 deletions
diff --git a/streaming/Makefile.am b/streaming/Makefile.am
new file mode 100644
index 0000000..95c31ca
--- /dev/null
+++ b/streaming/Makefile.am
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_libconfig_DATA = \
+ stream.conf \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/streaming/README.md b/streaming/README.md
new file mode 100644
index 0000000..58eb2cc
--- /dev/null
+++ b/streaming/README.md
@@ -0,0 +1,734 @@
+---
+title: "Streaming and replication"
+description: "Replicate and mirror Netdata's metrics through real-time streaming from child to parent nodes. Then combine, correlate, and export."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README.md
+---
+
+
+Each Netdata node is able to replicate/mirror its database to another Netdata node, by streaming the collected
+metrics in real-time. This is quite different to [data archiving to third party time-series
+databases](/exporting/README.md).
+The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
+
+There are also **proxy** nodes, which collect metrics from a child and sends it to a parent.
+
+When one Netdata node streams metrics another, the receiving instance can use the data for all features of a typical Netdata node, for example:
+
+- Visualize metrics with a dashboard
+- Run health checks that trigger alarms and send alarm notifications
+- Export metrics to an external time-series database
+
+
+
+
+## Supported configurations
+
+### Netdata without a database or web API (headless collector)
+
+A local Netdata Agent (child), **without any database or alarms**, collects metrics and sends them to another Netdata node
+(parent).
+The same parent can collect data for any number of child nodes and serves alerts for each child.
+
+The node menu shows a list of all "databases streamed to" the parent. Clicking one of those links allows the user to
+view the full dashboard of the child node. The URL has the form
+`http://parent-host:parent-port/host/child-host/`.
+
+
+In a headless setup, the child acts as a plain data collector. It spawns all external plugins, but instead of maintaining a
+local database and accepting dashboard requests, it streams all metrics to the parent.
+
+This setup works great to reduce the memory footprint. Depending on the enabled plugins, memory usage is between 6 MiB and 40 MiB. To reduce the memory usage as much as
+possible, refer to the [performance optimization guide](/docs/guides/configure/performance.md).
+
+
+### Database Replication
+
+The local Netdata Agent (child), **with a local database (and possibly alarms)**, collects metrics and
+sends them to another Netdata node (parent).
+
+The user can use all the functions **at both** `http://child-ip:child-port/` and
+`http://parent-host:parent-port/host/child-host/`.
+
+The child and the parent may have different data retention policies for the same metrics.
+
+Alerts for the child are triggered by **both** the child and the parent.
+It is possible to enable different alert configurations on the parent and the child.
+
+In order for custom chart names on the child to work correctly, follow the form `type.name`. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
+
+### Netdata proxies
+
+The local Netdata Agent(child), with or without a database, collects metrics and sends them to another
+Netdata node(**proxy**), which may or may not maintain a database, which forwards them to another
+Netdata (parent).
+
+Alerts for the child can be triggered by any of the involved hosts that maintains a database.
+
+You can daisy-chain any number of Netdata, each with or without a database and
+with or without alerts for the child metrics.
+
+### Mix and match with exporting engine
+
+All nodes that maintain a database can also send their data to an external database.
+This allows quite complex setups.
+
+Example:
+
+1. Netdata nodes `A` and `B` do not maintain a database and stream metrics to Netdata node `C`(live streaming functionality).
+2. Netdata node `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (exporting functionality).
+3. Netdata node `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality).
+4. Netdata node `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`.
+5. Netdata node `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (exporting functionality)
+6. Alerts are triggered by `H` for all hosts.
+7. Users can use all Netdata nodes that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
+
+## Configuration
+
+The following options affect how Netdata streams:
+
+```
+[global]
+ memory mode = none | ram | save | map | dbengine
+```
+
+`[global].memory mode = none` disables the database at this host. This also disables health
+monitoring because a node can't have health monitoring without a database.
+
+```
+[web]
+ mode = none | static-threaded
+ accept a streaming request every seconds = 0
+```
+
+`[web].mode = none` disables the API (Netdata will not listen to any ports).
+This also disables the registry (there cannot be a registry without an API).
+
+`accept a streaming request every seconds` can be used to set a limit on how often a parent node will accept streaming
+requests from its child nodes. 0 sets no limit, 1 means maximum once every second. If this is set, you may see error log
+entries "... too busy to accept new streaming request. Will be allowed in X secs".
+
+You can [use](/exporting/README.md#configuration) the exporting engine to configure data archiving to an external database (it archives all databases maintained on
+this host).
+
+### Streaming configuration
+
+The new file `stream.conf` contains streaming configuration for a sending and a receiving Netdata node.
+
+To configure streaming on your system:
+1. Generate an API key using `uuidgen`. Note: API keys are just random GUIDs. You can use the same API key on all your Netdata, or use a different API key for any pair of sending-receiving Netdata nodes.
+
+2. Authorize the communication between a pair of sending-receiving Netdata nodes using the generated API key.
+Once the communication is authorized, the sending Netdata node can push metrics for any number of hosts.
+
+3. To edit `stream.conf`, run `/etc/netdata/edit-config stream.conf`
+
+The following sections describe how you can configure sending and receiving Netdata nodes.
+
+
+
+
+
+##### Options for the sending node
+
+This is the section for the sending Netdata node. On the receiving node, `[stream].enabled` can be `no`.
+If it is `yes`, the receiving node will also stream the metrics to another node (i.e. it will be
+a proxy).
+
+```
+[stream]
+ enabled = yes | no
+ destination = IP:PORT[:SSL] ...
+ api key = XXXXXXXXXXX
+
+[API_KEY]
+ enabled = yes | no
+
+[MACHINE_GUID]
+ enabled = yes | no
+```
+This is an overview of how these options can be combined:
+
+| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|exporting|alarms|dashboard|
+|------|:-------------:|:----------:|:----------------:|:-----:|:----:|:-------:|
+| headless collector|`none`|`none`|`yes`|only for `data source = as collected`|not possible|no|
+| headless proxy|`none`|not `none`|`yes`|only for `data source = as collected`|not possible|no|
+| proxy with db|not `none`|not `none`|`yes`|possible|possible|yes|
+| central netdata|not `none`|not `none`|`no`|possible|possible|yes|
+
+For the options to encrypt the data stream between the child and the parent, refer to [securing the communication](#securing-streaming-communications)
+
+
+##### Options for the receiving node
+
+For a receiving Netdata node, the `stream.conf` looks like this:
+
+```sh
+# replace API_KEY with your uuidgen generated GUID
+[API_KEY]
+ enabled = yes
+ default history = 3600
+ default memory mode = save
+ health enabled by default = auto
+ allow from = *
+```
+
+You can add many such sections, one for each API key. The above are used as default values for
+all hosts pushed with this API key.
+
+You can also add sections like this:
+
+```sh
+# replace MACHINE_GUID with the child /var/lib/netdata/registry/netdata.public.unique.id
+[MACHINE_GUID]
+ enabled = yes
+ history = 3600
+ memory mode = save
+ health enabled = yes
+ allow from = *
+```
+
+The above is the parent configuration of a single host, at the parent end. `MACHINE_GUID` is
+the unique id the Netdata generating the metrics (i.e. the Netdata that originally collects
+them `/var/lib/netdata/registry/netdata.unique.id`). So, metrics for Netdata `A` that pass through
+any number of other Netdata, will have the same `MACHINE_GUID`.
+
+You can also use `default memory mode = dbengine` for an API key or `memory mode = dbengine` for
+ a single host. The additional `page cache size` and `dbengine multihost disk space` configuration options
+ are inherited from the global Netdata configuration.
+
+##### Allow from
+
+`allow from` settings are [Netdata simple patterns](/libnetdata/simple_pattern/README.md): string matches
+that use `*` as wildcard (any number of times) and a `!` prefix for a negative match.
+So: `allow from = !10.1.2.3 10.*` will allow all IPs in `10.*` except `10.1.2.3`. The order is
+important: left to right, the first positive or negative match is used.
+
+
+##### Tracing
+
+When a child is trying to push metrics to a parent or proxy, it logs entries like these:
+
+```
+2017-02-25 01:57:44: netdata: ERROR: Failed to connect to '10.11.12.1', port '19999' (errno 111, Connection refused)
+2017-02-25 01:57:44: netdata: ERROR: STREAM costa-pc [send to 10.11.12.1:19999]: failed to connect
+2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: initializing communication...
+2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: waiting response from remote netdata...
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: established communication - sending metrics...
+2017-02-25 01:58:14: netdata: ERROR: STREAM costa-pc [send]: discarding 1900 bytes of metrics already in the buffer.
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [send]: ready - sending metrics...
+```
+
+The receiving end (proxy or parent) logs entries like these:
+
+```
+2017-02-25 01:58:04: netdata: INFO : STREAM [receive from [10.11.12.11]:33554]: new client connection.
+2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [10.11.12.11]:33554: receive thread created (task id 7698)
+2017-02-25 01:58:14: netdata: INFO : Host 'costa-pc' with guid '12345678-b5a6-11e6-8a50-00508db7e9c9' initialized, os: linux, update every: 1, memory mode: ram, history entries: 3600, streaming: disabled, health: enabled, cache_dir: '/var/cache/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9', varlib_dir: '/var/lib/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9', health_log: '/var/lib/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9/health/health-log.db', alarms default handler: '/usr/libexec/netdata/plugins.d/alarm-notify.sh', alarms default recipient: 'root'
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [receive from [10.11.12.11]:33554]: initializing communication...
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [receive from [10.11.12.11]:33554]: receiving metrics...
+```
+
+For Netdata v1.9+, streaming can also be monitored via `access.log`.
+
+### Securing streaming communications
+
+Netdata does not activate TLS encryption by default. To encrypt streaming connections:
+1. On the parent node (receiving node), [enable TLS support](/web/server/README.md#enabling-tls-support).
+2. On the child's `stream.conf`, configure the destination as follows:
+
+```
+[stream]
+ destination = host:port:SSL
+```
+
+The word `SSL` appended to the end of the destination tells the child that connections must be encrypted.
+
+> While Netdata uses Transport Layer Security (TLS) 1.2 to encrypt communications rather than the obsolete SSL protocol,
+> it's still common practice to refer to encrypted web connections as `SSL`. Many vendors, like Nginx and even Netdata
+> itself, use `SSL` in configuration files, whereas documentation will always refer to encrypted communications as `TLS`
+> or `TLS/SSL`.
+
+#### Certificate verification
+
+When TLS/SSL is enabled on the child, the default behavior will be to not connect with the parent unless the server's certificate can be verified via the default chain. In case you want to avoid this check, add the following to the child's `stream.conf` file:
+
+```
+[stream]
+ ssl skip certificate verification = yes
+```
+
+#### Trusted certificate
+
+If you've enabled [certificate verification](#certificate-verification), you might see errors from the OpenSSL library when there's a problem with checking the certificate chain (`X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY`). More importantly, OpenSSL will reject self-signed certificates.
+
+Given these known issues, you have two options. If you trust your certificate, you can set the options `CApath` and `CAfile` to inform Netdata where your certificates, and the certificate trusted file, are stored.
+
+For more details about these options, you can read about [verify locations](https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_load_verify_locations.html).
+
+Before you changed your streaming configuration, you need to copy your trusted certificate to your child system and add the certificate to OpenSSL's list.
+
+On most Linux distributions, the `update-ca-certificates` command searches inside the `/usr/share/ca-certificates` directory for certificates. You should double-check by reading the `update-ca-certificate` manual (`man update-ca-certificate`), and then change the directory in the below commands if needed.
+
+If you have `sudo` configured on your child system, you can use that to run the following commands. If not, you'll have to log in as `root` to complete them.
+
+```
+# mkdir /usr/share/ca-certificates/netdata
+# cp parent_cert.pem /usr/share/ca-certificates/netdata/parent_cert.crt
+# chown -R netdata.netdata /usr/share/ca-certificates/netdata/
+```
+
+First, you create a new directory to store your certificates for Netdata. Next, you need to change the extension on your certificate from `.pem` to `.crt` so it's compatible with `update-ca-certificate`. Finally, you need to change permissions so the user that runs Netdata can access the directory where you copied in your certificate.
+
+Next, edit the file `/etc/ca-certificates.conf` and add the following line:
+
+```
+netdata/parent_cert.crt
+```
+
+Now you update the list of certificates running the following, again either as `sudo` or `root`:
+
+```
+# update-ca-certificates
+```
+
+> Some Linux distributions have different methods of updating the certificate list. For more details, please read this
+> guide on [adding trusted root certificates](https://github.com/Busindre/How-to-Add-trusted-root-certificates).
+
+Once you update your certificate list, you can set the stream parameters for Netdata to trust the parent certificate. Open `stream.conf` for editing and change the following lines:
+
+```
+[stream]
+ CApath = /etc/ssl/certs/
+ CAfile = /etc/ssl/certs/parent_cert.pem
+```
+
+With this configuration, the `CApath` option tells Netdata to search for trusted certificates inside `/etc/ssl/certs`. The `CAfile` option specifies the Netdata parent certificate is located at `/etc/ssl/certs/parent_cert.pem`. With this configuration, you can skip using the system's entire list of certificates and use Netdata's parent certificate instead.
+
+#### Expected behaviors
+
+With the introduction of TLS/SSL, the parent-child communication behaves as shown in the table below, depending on the following configurations:
+
+- **Parent TLS (Yes/No)**: Whether the `[web]` section in `netdata.conf` has `ssl key` and `ssl certificate`.
+- **Parent port TLS (-/force/optional)**: Depends on whether the `[web]` section `bind to` contains a `^SSL=force` or `^SSL=optional` directive on the port(s) used for streaming.
+- **Child TLS (Yes/No)**: Whether the destination in the child's `stream.conf` has `:SSL` at the end.
+- **Child TLS Verification (yes/no)**: Value of the child's `stream.conf` `ssl skip certificate verification` parameter (default is no).
+
+| Parent TLS enabled|Parent port SSL|Child TLS|Child SSL Ver.|Behavior|
+|:----------------:|:-------------:|:-------:|:------------:|:-------|
+| No|-|No|no|Legacy behavior. The parent-child stream is unencrypted.|
+| Yes|force|No|no|The parent rejects the child connection.|
+| Yes|-/optional|No|no|The parent-child stream is unencrypted (expected situation for legacy child nodes and newer parent nodes)|
+| Yes|-/force/optional|Yes|no|The parent-child stream is encrypted, provided that the parent has a valid TLS/SSL certificate. Otherwise, the child refuses to connect.|
+| Yes|-/force/optional|Yes|yes|The parent-child stream is encrypted.|
+
+### Streaming compression
+
+[![Supported version Netdata Agent release](https://img.shields.io/badge/Supported%20Netdata%20Agent-v1.33%2B-brightgreen)](https://github.com/netdata/netdata/releases/latest)
+
+[![Supported version Netdata Agent release](https://img.shields.io/badge/Supported%20Netdata%20stream%20version-v5%2B-blue)](https://github.com/netdata/netdata/releases/latest)
+
+#### OS dependencies
+* Streaming compression is based on [lz4 v1.9.0+](https://github.com/lz4/lz4). The [lz4 v1.9.0+](https://github.com/lz4/lz4) library must be installed in your OS in order to enable streaming compression. Any lower version will disable Netdata streaming compression for compatibility purposes between the older versions of Netdata agents.
+
+To check if your Netdata Agent supports stream compression run the following GET request in your browser or terminal:
+
+```
+curl -X GET http://localhost:19999/api/v1/info | grep 'Stream Compression'
+```
+
+**Output**
+```
+"buildinfo": "dbengine|Native HTTPS|Netdata Cloud|ACLK Next Generation|New Cloud Protocol Support|ACLK Legacy|TLS Host Verification|Machine Learning|Stream Compression|protobuf|JSON-C|libcrypto|libm|LWS v3.2.2|mosquitto|zlib|apps|cgroup Network Tracking|EBPF|perf|slabinfo",
+```
+> Note: If your OS doesn't support Netdata compression the `buildinfo` will not contain the `Stream Compression` statement.
+
+To check if your Netdata Agent has stream compression enabled, run the following GET request in your browser or terminal:
+
+```
+ curl -X GET http://localhost:19999/api/v1/info | grep 'stream-compression'
+```
+**Output**
+```
+"stream-compression": "enabled"
+```
+Note: The `stream-compression` status can be `"enabled" | "disabled" | "N/A"`.
+
+A compressed data packet is determined and decompressed on the fly.
+
+#### Limitations
+This limitation will be withdrawn asap and is work-in-progress.
+
+The current implementation of streaming data compression can support only a few number of dimensions in a chart with names that cannot exceed the size of 16384 bytes. In case your instance hit this limitation, the agent will deactivate compression during runtime to avoid stream corruption. This limitation can be seen in the error.log file with the sequence of the following messages:
+```
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: connecting...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: initializing communication...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: waiting response from remote netdata...
+netdata INFO : STREAM_SENDER[child01] : STREAM_COMPRESSION: Compressor Reset
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: established communication with a parent using protocol version 5 - ready to send metrics...
+...
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION: Compression Failed - Message size 27847 above compression buffer limit: 16384 (errno 9, Bad file descriptor)
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION: Deactivating compression to avoid stream corruption
+netdata ERROR : PLUGINSD[go.d] : STREAM_COMPRESSION child01 [send to my.parent.IP]: Restarting connection without compression
+...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: connecting...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: initializing communication...
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: waiting response from remote netdata...
+netdata INFO : STREAM_SENDER[child01] : Stream is uncompressed! One of the agents (my.parent.IP <-> child01) does not support compression OR compression is disabled.
+netdata INFO : STREAM_SENDER[child01] : STREAM child01 [send to my.parent.IP]: established communication with a parent using protocol version 4 - ready to send metrics...
+netdata INFO : WEB_SERVER[static4] : STREAM child01 [send]: sending metrics...
+```
+
+#### How to enable stream compression
+Netdata Agents are shipped with data compression enabled by default. You can also configure which streams will use compression.
+
+With enabled stream compression, a Netdata Agent can negotiate streaming compression with other Netdata Agents. During the negotiation of streaming compression both Netdata Agents should support and enable compression in order to communicate over a compressed stream. The negotiation will result into an uncompressed stream, if one of the Netdata Agents doesn't support **or** has compression disabled.
+
+To enable stream compression:
+
+1. Edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`.
+
+2. In the `[stream]` section, set `enable compression` to `yes`.
+```
+# This is the default stream compression flag for an agent.
+
+[stream]
+ enable compression = yes | no
+```
+
+
+| Parent | Stream compression | Child |
+|----------------------|--------------------|----------------------|
+| Supported & Enabled | compressed | Supported & Enabled |
+| (Supported & Disabled)/Not supported | uncompressed | Supported & Enabled |
+| Supported & Enabled | uncompressed | (Supported & Disabled)/Not supported |
+| (Supported & Disabled)/Not supported | uncompressed | (Supported & Disabled)/Not supported |
+
+In case of parents with multiple children you can select which streams will be compressed by using the same configuration under the `[API_KEY]`, `[MACHINE_GUID]` section.
+
+This configuration uses AND logic with the default stream compression configuration under the `[stream]` section. This means the stream compression from child to parent will be enabled only if the outcome of the AND logic operation is true (`default compression enabled` && `api key compression enabled`). So both should be enabled to get stream compression otherwise stream compression is disabled.
+```
+[API_KEY]
+ enable compression = yes | no
+```
+Same thing applies with the `[MACHINE_GUID]` configuration.
+```
+[MACHINE_GUID]
+ enable compression = yes | no
+```
+## Viewing remote host dashboards, using mirrored databases
+
+On any receiving Netdata, that maintains remote databases and has its web server enabled,
+The node menu will include a list of the mirrored databases.
+
+![image](https://cloud.githubusercontent.com/assets/2662304/24080824/24cd2d3c-0caf-11e7-909d-a8dd1dbb95d7.png)
+
+Selecting any of these, the server will offer a dashboard using the mirrored metrics.
+
+## Monitoring ephemeral nodes
+
+Auto-scaling is probably the most trendy service deployment strategy these days.
+
+Auto-scaling detects the need for additional resources and boots VMs on demand, based on a template. Soon after they start running the applications, a load balancer starts distributing traffic to them, allowing the service to grow horizontally to the scale needed to handle the load. When demands falls, auto-scaling starts shutting down VMs that are no longer needed.
+
+![Monitoring ephemeral nodes with Netdata](https://cloud.githubusercontent.com/assets/2662304/23627426/65a9074a-02b9-11e7-9664-cd8f258a00af.png)
+
+What a fantastic feature for controlling infrastructure costs! Pay only for what you need for the time you need it!
+
+In auto-scaling, all servers are ephemeral, they live for just a few hours. Every VM is a brand new instance of the application, that was automatically created based on a template.
+
+So, how can we monitor them? How can we be sure that everything is working as expected on all of them?
+
+### The Netdata way
+
+We recently made a significant improvement at the core of Netdata to support monitoring such setups.
+
+Following the Netdata way of monitoring, we wanted:
+
+1. **real-time performance monitoring**, collecting ***thousands of metrics per server per second***, visualized in interactive, automatically created dashboards.
+2. **real-time alarms**, for all nodes.
+3. **zero configuration**, all ephemeral servers should have exactly the same configuration, and nothing should be configured at any system for each of the ephemeral nodes. We shouldn't care if 10 or 100 servers are spawned to handle the load.
+4. **self-cleanup**, so that nothing needs to be done for cleaning up the monitoring infrastructure from the hundreds of nodes that may have been monitored through time.
+
+### How it works
+
+All monitoring solutions, including Netdata, work like this:
+
+1. Collect metrics from the system and the running applications
+2. Store metrics in a time-series database
+3. Examine metrics periodically, for triggering alarms and sending alarm notifications
+4. Visualize metrics so that users can see what exactly is happening
+
+Netdata used to be self-contained, so that all these functions were handled entirely by each server. The changes we made, allow each Netdata to be configured independently for each function. So, each Netdata can now act as:
+
+- A self-contained system, much like it used to be.
+- A data collector that collects metrics from a host and pushes them to another Netdata (with or without a local database and alarms).
+- A proxy, which receives metrics from other hosts and pushes them immediately to other Netdata servers. Netdata proxies can also be `store and forward proxies` meaning that they are able to maintain a local database for all metrics passing through them (with or without alarms).
+- A time-series database node, where data are kept, alarms are run and queries are served to visualise the metrics.
+
+### Configuring an auto-scaling setup
+
+![A diagram of an auto-scaling setup with Netdata](https://user-images.githubusercontent.com/1153921/84290043-0c1c1600-aaf8-11ea-9757-dd8dd8a8ec6c.png)
+
+You need a Netdata parent. This node should not be ephemeral. It will be the node where all ephemeral child
+nodes will send their metrics.
+
+The parent will need to authorize child nodes to receive their metrics. This is done with an API key.
+
+#### API keys
+
+API keys are just random GUIDs. Use the Linux command `uuidgen` to generate one. You can use the same API key for all your child nodes, or you can configure one API for each of them. This is entirely your decision.
+
+We suggest to use the same API key for each ephemeral node template you have, so that all replicas of the same ephemeral node will have exactly the same configuration.
+
+I will use this API_KEY: `11111111-2222-3333-4444-555555555555`. Replace it with your own.
+
+#### Configuring the parent
+
+To configure the parent node:
+
+1. On the parent node, edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`
+
+2. Set the following parameters:
+
+```bash
+[11111111-2222-3333-4444-555555555555]
+ # enable/disable this API key
+ enabled = yes
+
+ # one hour of data for each of the child nodes
+ default history = 3600
+
+ # do not save child metrics on disk
+ default memory = ram
+
+ # alarms checks, only while the child is connected
+ health enabled by default = auto
+```
+
+_`stream.conf` on the parent, to enable receiving metrics from its child nodes using the API key._
+
+If you used many API keys, you can add one such section for each API key.
+
+When done, restart Netdata on the parent node. It is now ready to receive metrics.
+
+Note that `health enabled by default = auto` will still trigger `last_collected` alarms, if a connected child does not exit gracefully. If the `netdata` process running on the child is
+stopped, it will close the connection to the parent, ensuring that no `last_collected` alarms are triggered. For example, a proper container restart would first terminate
+the `netdata` process, but a system power issue would leave the connection open on the parent side. In the second case, you will still receive alarms.
+
+#### Configuring the child nodes
+
+To configure the child node:
+
+1. On the child node, edit `stream.conf` by using the `edit-config` script:
+`/etc/netdata/edit-config stream.conf`
+
+2. Set the following parameters:
+
+```bash
+[stream]
+ # stream metrics to another Netdata
+ enabled = yes
+
+ # the IP and PORT of the parent
+ destination = 10.11.12.13:19999
+
+ # the API key to use
+ api key = 11111111-2222-3333-4444-555555555555
+```
+
+_`stream.conf` on child nodes, to enable pushing metrics to their parent at `10.11.12.13:19999`._
+
+Using just the above configuration, the child nodes will be pushing their metrics to the parent Netdata, but they will still maintain a local database of the metrics and run health checks. To disable them, edit `/etc/netdata/netdata.conf` and set:
+
+```bash
+[global]
+ # disable the local database
+ memory mode = none
+
+[health]
+ # disable health checks
+ enabled = no
+```
+
+_`netdata.conf` configuration on child nodes, to disable the local database and health checks._
+
+Keep in mind that setting `memory mode = none` will also force `[health].enabled = no` (health checks require access to a local database). But you can keep the database and disable health checks if you need to. You are however sending all the metrics to the parent node, which can handle the health checking (`[health].enabled = yes`)
+
+#### Netdata unique ID
+
+The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each Netdata Agent**. This file is automatically generated, by Netdata, the first time it is started and remains unaltered forever.
+
+> If you are building an image to be used for automated provisioning of autoscaled VMs, it important to delete that file from the image, so that each instance of your image will generate its own.
+
+#### Troubleshooting metrics streaming
+
+Both parent and child nodes log information at `/var/log/netdata/error.log`.
+
+To obtain the error logs, run the following on both the parent and child nodes:
+
+```
+tail -f /var/log/netdata/error.log | grep STREAM
+```
+
+If the child manages to connect to the parent you will see something like (on the parent):
+
+```
+2017-03-09 09:38:52: netdata: INFO : STREAM [receive from [10.11.12.86]:38564]: new client connection.
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [10.11.12.86]:38564: receive thread created (task id 27721)
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [receive from [10.11.12.86]:38564]: client willing to stream metrics for host 'xxx' with machine_guid '1234567-1976-11e6-ae19-7cdd9077342a': update every = 1, history = 3600, memory mode = ram, health auto
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [receive from [10.11.12.86]:38564]: initializing communication...
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [receive from [10.11.12.86]:38564]: receiving metrics...
+```
+
+and something like this on the child:
+
+```
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: connecting...
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: initializing communication...
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: waiting response from remote netdata...
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: established communication - sending metrics...
+```
+
+### Archiving to a time-series database
+
+The parent Netdata node can also archive metrics, for all its child nodes, to a time-series database. At the time of
+this writing, Netdata supports:
+
+- graphite
+- opentsdb
+- prometheus
+- json document DBs
+- all the compatibles to the above (e.g. kairosdb, influxdb, etc)
+
+Check the Netdata [exporting documentation](/docs/export/external-databases.md) for configuring this.
+
+This is how such a solution will work:
+
+![Diagram showing an example configuration for archiving to a time-series
+database](https://user-images.githubusercontent.com/1153921/84291308-c2ccc600-aaf9-11ea-98a9-89ccbf3a62dd.png)
+
+### An advanced setup
+
+Netdata also supports `proxies` with and without a local database, and data retention can be different between all nodes.
+
+This means a setup like the following is also possible:
+
+<p align="center">
+<img src="https://cloud.githubusercontent.com/assets/2662304/23629551/bb1fd9c2-02c0-11e7-90f5-cab5a3ed4c53.png"/>
+</p>
+
+## Proxies
+
+A proxy is a Netdata node that is receiving metrics from a Netdata node, and streams them to another Netdata node.
+
+Netdata proxies may or may not maintain a database for the metrics passing through them.
+When they maintain a database, they can also run health checks (alarms and notifications)
+for the remote host that is streaming the metrics.
+
+To configure a proxy, configure it as a receiving and a sending Netdata at the same time,
+using `stream.conf`.
+
+The sending side of a Netdata proxy, connects and disconnects to the final destination of the
+metrics, following the same pattern of the receiving side.
+
+For a practical example see [Monitoring ephemeral nodes](#monitoring-ephemeral-nodes).
+
+## Troubleshooting streaming connections
+
+This section describes the most common issues you might encounter when connecting parent and child nodes.
+
+### Slow connections between parent and child
+
+When you have a slow connection between parent and child, Netdata raises a few different errors. Most of the
+errors will appear in the child's `error.log`.
+
+```bash
+netdata ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM CHILD HOSTNAME [send to PARENT IP:PARENT PORT]: too many data pending - buffer is X bytes long,
+Y unsent - we have sent Z bytes in total, W on this connection. Closing connection to flush the data.
+```
+
+On the parent side, you may see various error messages, most commonly the following:
+
+```
+netdata ERROR : STREAM_PARENT[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : read failed: end of file
+```
+
+Another common problem in slow connections is the CHILD sending a partial message to the parent. In this case,
+the parent will write the following in its `error.log`:
+
+```
+ERROR : STREAM_RECEIVER[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : sent command 'B' which is not known by netdata, for host 'HOSTNAME'. Disabling it.
+```
+
+In this example, `B` was part of a `BEGIN` message that was cut due to connection problems.
+
+Slow connections can also cause problems when the parent misses a message and then receives a command related to the
+missed message. For example, a parent might miss a message containing the child's charts, and then doesn't know
+what to do with the `SET` message that follows. When that happens, the parent will show a message like this:
+
+```
+ERROR : STREAM_RECEIVER[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : requested a SET on chart 'CHART NAME' of host 'HOSTNAME', without a dimension. Disabling it.
+```
+
+### Child cannot connect to parent
+
+When the child can't connect to a parent for any reason (misconfiguration, networking, firewalls, parent
+down), you will see the following in the child's `error.log`.
+
+```
+ERROR : STREAM_SENDER[HOSTNAME] : Failed to connect to 'PARENT IP', port 'PARENT PORT' (errno 113, No route to host)
+```
+
+### 'Is this a Netdata node?'
+
+This question can appear when Netdata starts the stream and receives an unexpected response. This error can appear when
+the parent is using SSL and the child tries to connect using plain text. You will also see this message when
+Netdata connects to another server that isn't a Netdata node. The complete error message will look like this:
+
+```
+ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM child HOSTNAME [send to PARENT HOSTNAME:PARENT PORT]: server is not replying properly (is it a netdata?).
+```
+
+### Stream charts wrong
+
+Chart data needs to be consistent between child and parent nodes. If there are differences between chart data on
+a parent and a child, such as gaps in metrics collection, it most often means your child's `memory mode`
+does not match the parent's. To learn more about the different ways Netdata can store metrics, and thus keep chart
+data consistent, read our [memory mode documentation](/database/README.md).
+
+### Forbidding access
+
+You may see errors about "forbidding access" for a number of reasons. It could be because of a slow connection between
+the parent and child nodes, but it could also be due to other failures. Look in your parent's `error.log` for errors
+that look like this:
+
+```
+STREAM [receive from [child HOSTNAME]:child IP]: `MESSAGE`. Forbidding access."
+```
+
+`MESSAGE` will have one of the following patterns:
+
+- `request without KEY` : The message received is incomplete and the KEY value can be API, hostname, machine GUID.
+- `API key 'VALUE' is not valid GUID`: The UUID received from child does not have the format defined in [RFC 4122]
+ (https://tools.ietf.org/html/rfc4122)
+- `machine GUID 'VALUE' is not GUID.`: This error with machine GUID is like the previous one.
+- `API key 'VALUE' is not allowed`: This stream has a wrong API key.
+- `API key 'VALUE' is not permitted from this IP`: The IP is not allowed to use STREAM with this parent.
+- `machine GUID 'VALUE' is not allowed.`: The GUID that is trying to send stream is not allowed.
+- `Machine GUID 'VALUE' is not permitted from this IP. `: The IP does not match the pattern or IP allowed to connect
+ to use stream.
+
+### Netdata could not create a stream
+
+The connection between parent and child is a stream. When the parent can't convert the initial connection into
+a stream, it will write the following message inside `error.log`:
+
+```
+file descriptor given is not a valid stream
+```
+
+After logging this error, Netdata will close the stream.
+
+
diff --git a/streaming/compression.c b/streaming/compression.c
new file mode 100644
index 0000000..7ba9dbf
--- /dev/null
+++ b/streaming/compression.c
@@ -0,0 +1,322 @@
+#include "rrdpush.h"
+
+#ifdef ENABLE_COMPRESSION
+#include "lz4.h"
+
+#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
+
+// signature MUST end with a newline
+#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
+#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
+#define SIGNATURE_SIZE 4
+
+
+/*
+ * LZ4 streaming API compressor specific data
+ */
+struct compressor_data {
+ LZ4_stream_t *stream;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
+};
+
+
+/*
+ * Reset compressor state for a new stream
+ */
+static void lz4_compressor_reset(struct compressor_state *state)
+{
+ if (state->data) {
+ if (state->data->stream) {
+ LZ4_resetStream_fast(state->data->stream);
+ internal_error(true, "%s: compressor reset", STREAM_COMPRESSION_MSG);
+ }
+ state->data->input_ring_buffer_pos = 0;
+ }
+}
+
+/*
+ * Destroy compressor state and all related data
+ */
+static void lz4_compressor_destroy(struct compressor_state **state)
+{
+ if (state && *state) {
+ struct compressor_state *s = *state;
+ if (s->data) {
+ if (s->data->stream)
+ LZ4_freeStream(s->data->stream);
+ freez(s->data->input_ring_buffer);
+ freez(s->data);
+ }
+ freez(s->compression_result_buffer);
+ freez(s);
+ *state = NULL;
+ debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
+ }
+}
+
+/*
+ * Compress the given block of data
+ * Compressed data will remain in the internal buffer until the next invocation
+ * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
+ * or 0 in case of error
+ */
+static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
+{
+ if(unlikely(!state || !size || !out))
+ return 0;
+
+ if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) {
+ error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE);
+ return 0;
+ }
+
+ size_t max_dst_size = LZ4_COMPRESSBOUND(size);
+ size_t data_size = max_dst_size + SIGNATURE_SIZE;
+
+ if (!state->compression_result_buffer) {
+ state->compression_result_buffer = mallocz(data_size);
+ state->compression_result_buffer_size = data_size;
+ }
+ else if(unlikely(state->compression_result_buffer_size < data_size)) {
+ state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size);
+ state->compression_result_buffer_size = data_size;
+ }
+
+ // the ring buffer always has space for LZ4_MAX_MSG_SIZE
+ memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size);
+
+ // this call needs the last 64K of our previous data
+ // they are available in the ring buffer
+ long int compressed_data_size = LZ4_compress_fast_continue(
+ state->data->stream,
+ state->data->input_ring_buffer + state->data->input_ring_buffer_pos,
+ state->compression_result_buffer + SIGNATURE_SIZE,
+ size,
+ max_dst_size,
+ 1);
+
+ if (compressed_data_size < 0) {
+ error("Data compression error: %ld", compressed_data_size);
+ return 0;
+ }
+
+ // update the next writing position of the ring buffer
+ state->data->input_ring_buffer_pos += size;
+ if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE))
+ state->data->input_ring_buffer_pos = 0;
+
+ // update the signature header
+ uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
+ *(uint32_t *)state->compression_result_buffer = len | SIGNATURE;
+ *out = state->compression_result_buffer;
+ debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
+ return compressed_data_size + SIGNATURE_SIZE;
+}
+
+/*
+ * Create and initialize compressor state
+ * Return the pointer to compressor_state structure created
+ */
+struct compressor_state *create_compressor()
+{
+ struct compressor_state *state = callocz(1, sizeof(struct compressor_state));
+
+ state->reset = lz4_compressor_reset;
+ state->compress = lz4_compressor_compress;
+ state->destroy = lz4_compressor_destroy;
+
+ state->data = callocz(1, sizeof(struct compressor_data));
+ state->data->stream = LZ4_createStream();
+ state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2);
+ state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size);
+ state->compression_result_buffer_size = 0;
+ state->reset(state);
+ debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
+ return state;
+}
+
+/*
+ * LZ4 streaming API decompressor specific data
+ */
+struct decompressor_stream {
+ LZ4_streamDecode_t *lz4_stream;
+ char *buffer;
+ size_t size;
+ size_t write_at;
+ size_t read_at;
+};
+
+/*
+ * Reset decompressor state for a new stream
+ */
+static void lz4_decompressor_reset(struct decompressor_state *state)
+{
+ if (state->stream) {
+ if (state->stream->lz4_stream)
+ LZ4_setStreamDecode(state->stream->lz4_stream, NULL, 0);
+
+ state->stream->write_at = 0;
+ state->stream->read_at = 0;
+ }
+}
+
+/*
+ * Destroy decompressor state and all related data
+ */
+static void lz4_decompressor_destroy(struct decompressor_state **state)
+{
+ if (state && *state) {
+ struct decompressor_state *s = *state;
+ if (s->stream) {
+ debug(D_STREAM, "%s: Destroying decompressor.", STREAM_COMPRESSION_MSG);
+ if (s->stream->lz4_stream)
+ LZ4_freeStreamDecode(s->stream->lz4_stream);
+ freez(s->stream->buffer);
+ freez(s->stream);
+ }
+ freez(s);
+ *state = NULL;
+ }
+}
+
+static size_t decode_compress_header(const char *data, size_t data_size) {
+ if (unlikely(!data || !data_size))
+ return 0;
+
+ if (unlikely(data_size != SIGNATURE_SIZE))
+ return 0;
+
+ uint32_t sign = *(uint32_t *)data;
+ if (unlikely((sign & SIGNATURE_MASK) != SIGNATURE))
+ return 0;
+
+ size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
+ return length;
+}
+
+/*
+ * Start the collection of compressed data in an internal buffer
+ * Return the size of compressed data or 0 for uncompressed data
+ */
+static size_t lz4_decompressor_start(struct decompressor_state *state __maybe_unused, const char *header, size_t header_size) {
+ if(unlikely(state->stream->read_at != state->stream->write_at))
+ fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!"
+ , STREAM_COMPRESSION_MSG);
+
+ return decode_compress_header(header, header_size);
+}
+
+/*
+ * Decompress the compressed data in the internal buffer
+ * Return the size of uncompressed data or 0 for error
+ */
+static size_t lz4_decompressor_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
+ return 0;
+
+ if(unlikely(state->stream->read_at != state->stream->write_at))
+ fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!"
+ , STREAM_COMPRESSION_MSG);
+
+ if (unlikely(state->stream->write_at >= state->stream->size / 2)) {
+ state->stream->write_at = 0;
+ state->stream->read_at = 0;
+ }
+
+ long int decompressed_size = LZ4_decompress_safe_continue(
+ state->stream->lz4_stream
+ , compressed_data
+ , state->stream->buffer + state->stream->write_at
+ , (int)compressed_size
+ , (int)(state->stream->size - state->stream->write_at)
+ );
+
+ if (unlikely(decompressed_size < 0)) {
+ error("%s: decompressor returned negative decompressed bytes: %ld", STREAM_COMPRESSION_MSG, decompressed_size);
+ return 0;
+ }
+
+ if(unlikely(decompressed_size + state->stream->write_at > state->stream->size))
+ fatal("%s: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu"
+ , STREAM_COMPRESSION_MSG
+ , state->stream->size
+ , state->stream->write_at
+ , decompressed_size
+ , state->stream->write_at + decompressed_size - state->stream->size
+ );
+
+ state->stream->write_at += decompressed_size;
+
+ // statistics
+ state->total_compressed += compressed_size + SIGNATURE_SIZE;
+ state->total_uncompressed += decompressed_size;
+ state->packet_count++;
+
+ return decompressed_size;
+}
+
+/*
+ * Return the size of uncompressed data left in the internal buffer or 0 for error
+ */
+static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state) {
+ if(unlikely(state->stream->read_at > state->stream->write_at))
+ fatal("%s: invalid read/write stream positions"
+ , STREAM_COMPRESSION_MSG);
+
+ return state->stream->write_at - state->stream->read_at;
+}
+
+/*
+ * Fill the buffer provided with uncompressed data from the internal buffer
+ * Return the size of uncompressed data copied or 0 for error
+ */
+static size_t lz4_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
+ if (unlikely(!state || !size || !dst))
+ return 0;
+
+ size_t remaining = lz4_decompressor_decompressed_bytes_in_buffer(state);
+ if(unlikely(!remaining))
+ return 0;
+
+ size_t bytes_to_return = size;
+ if(bytes_to_return > remaining)
+ bytes_to_return = remaining;
+
+ memcpy(dst, state->stream->buffer + state->stream->read_at, bytes_to_return);
+ state->stream->read_at += bytes_to_return;
+
+ if(unlikely(state->stream->read_at > state->stream->write_at))
+ fatal("%s: invalid read/write stream positions"
+ , STREAM_COMPRESSION_MSG);
+
+ return bytes_to_return;
+}
+
+/*
+ * Create and initialize decompressor state
+ * Return the pointer to decompressor_state structure created
+ */
+struct decompressor_state *create_decompressor()
+{
+ struct decompressor_state *state = callocz(1, sizeof(struct decompressor_state));
+ state->signature_size = SIGNATURE_SIZE;
+ state->reset = lz4_decompressor_reset;
+ state->start = lz4_decompressor_start;
+ state->decompress = lz4_decompressor_decompress;
+ state->get = lz4_decompressor_get;
+ state->decompressed_bytes_in_buffer = lz4_decompressor_decompressed_bytes_in_buffer;
+ state->destroy = lz4_decompressor_destroy;
+
+ state->stream = callocz(1, sizeof(struct decompressor_stream));
+ fatal_assert(state->stream);
+ state->stream->lz4_stream = LZ4_createStreamDecode();
+ state->stream->size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2;
+ state->stream->buffer = mallocz(state->stream->size);
+ fatal_assert(state->stream->buffer);
+ state->reset(state);
+ debug(D_STREAM, "%s: Initialize streaming decompression!", STREAM_COMPRESSION_MSG);
+ return state;
+}
+#endif
diff --git a/streaming/receiver.c b/streaming/receiver.c
new file mode 100644
index 0000000..61ee33b
--- /dev/null
+++ b/streaming/receiver.c
@@ -0,0 +1,809 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+#include "parser/parser.h"
+
+// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
+#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
+#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2)
+
+// this has to be the same at parser.h
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
+
+#if WORKER_PARSER_FIRST_JOB < 1
+#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
+#endif
+
+extern struct config stream_config;
+
+void destroy_receiver_state(struct receiver_state *rpt) {
+ freez(rpt->key);
+ freez(rpt->hostname);
+ freez(rpt->registry_hostname);
+ freez(rpt->machine_guid);
+ freez(rpt->os);
+ freez(rpt->timezone);
+ freez(rpt->abbrev_timezone);
+ freez(rpt->tags);
+ freez(rpt->client_ip);
+ freez(rpt->client_port);
+ freez(rpt->program_name);
+ freez(rpt->program_version);
+#ifdef ENABLE_HTTPS
+ if(rpt->ssl.conn){
+ SSL_free(rpt->ssl.conn);
+ }
+#endif
+#ifdef ENABLE_COMPRESSION
+ if (rpt->decompressor)
+ rpt->decompressor->destroy(&rpt->decompressor);
+#endif
+ freez(rpt);
+}
+
+static void rrdpush_receiver_thread_cleanup(void *ptr) {
+ worker_unregister();
+
+ static __thread int executed = 0;
+ if(!executed) {
+ executed = 1;
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
+ // the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
+ if (netdata_exit && rpt->host) {
+ rpt->exited = 1;
+ return;
+ }
+
+ // Make sure that we detach this thread and don't kill a freshly arriving receiver
+ if (!netdata_exit && rpt->host) {
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == rpt)
+ rpt->host->receiver = NULL;
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ }
+
+ info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+ destroy_receiver_state(rpt);
+ }
+}
+
+#include "collectors/plugins.d/pluginsd_parser.h"
+
+PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
+{
+ const char *host_uuid_str = get_word(words, num_words, 1);
+ const char *claim_id_str = get_word(words, num_words, 2);
+
+ if (!host_uuid_str || !claim_id_str) {
+ error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
+ host_uuid_str ? host_uuid_str : "[unset]",
+ claim_id_str ? claim_id_str : "[unset]");
+ return PARSER_RC_ERROR;
+ }
+
+ uuid_t uuid;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(host_uuid_str, uuid)) {
+ error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) {
+ error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(host_uuid_str, host->machine_guid)) {
+ error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewhere else
+ }
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ freez(host->aclk_state.claimed_id);
+ host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
+
+ metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL);
+
+ rrdhost_aclk_state_unlock(host);
+
+ rrdpush_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
+static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
+ if(unlikely(!size)) {
+ internal_error(true, "%s() asked to read zero bytes", __FUNCTION__);
+ return 0;
+ }
+
+#ifdef ENABLE_HTTPS
+ if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ return (int)netdata_ssl_read(r->ssl.conn, buffer, size);
+#endif
+
+ ssize_t bytes_read = read(r->fd, buffer, size);
+ if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
+ error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
+ bytes_read = -3;
+ }
+ else if (bytes_read == 0) {
+ error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
+ bytes_read = -1;
+ }
+ else if (bytes_read < 0) {
+ error("STREAM: %s() failed to read from socket!", __FUNCTION__);
+ bytes_read = -2;
+ }
+
+// do {
+// bytes_read = (int) fread(buffer, 1, size, fp);
+// if (unlikely(bytes_read <= 0)) {
+// if(feof(fp)) {
+// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__);
+// bytes_read = -2;
+// }
+// else if(ferror(fp)) {
+// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__);
+// bytes_read = -3;
+// }
+// else bytes_read = 0;
+// }
+// else
+// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read);
+// } while(bytes_read == 0);
+
+ return (int)bytes_read;
+}
+
+static bool receiver_read_uncompressed(struct receiver_state *r) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(r->read_buffer[r->read_len] != '\0')
+ fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
+#endif
+
+ int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
+ if(unlikely(bytes_read <= 0))
+ return false;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
+
+ r->read_len += bytes_read;
+ r->read_buffer[r->read_len] = '\0';
+
+ return true;
+}
+
+#ifdef ENABLE_COMPRESSION
+static bool receiver_read_compressed(struct receiver_state *r) {
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(r->read_buffer[r->read_len] != '\0')
+ fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ );
+#endif
+
+ // first use any available uncompressed data
+ if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
+ size_t available = sizeof(r->read_buffer) - r->read_len - 1;
+ if (available) {
+ size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available);
+ if (!len) {
+ internal_error(true, "decompressor returned zero length #1");
+ return false;
+ }
+
+ r->read_len += (int)len;
+ r->read_buffer[r->read_len] = '\0';
+ }
+ else
+ internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len);
+
+ return true;
+ }
+
+ // no decompressed data available
+ // read the compression signature of the next block
+
+ if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) {
+ internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len);
+ return false;
+ }
+
+ // read the compression signature from the stream
+ // we have to do a loop here, because read_stream() may return less than the data we need
+ int bytes_read = 0;
+ do {
+ int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read);
+ if (unlikely(ret <= 0))
+ return false;
+
+ bytes_read += ret;
+ } while(unlikely(bytes_read < (int)r->decompressor->signature_size));
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
+
+ if(unlikely(bytes_read != (int)r->decompressor->signature_size))
+ fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size);
+
+ size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read);
+ if (unlikely(!compressed_message_size)) {
+ internal_error(true, "multiplexed uncompressed data in compressed stream!");
+ r->read_len += bytes_read;
+ r->read_buffer[r->read_len] = '\0';
+ return true;
+ }
+
+ if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) {
+ error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.",
+ compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE);
+ return false;
+ }
+
+ // delete compression header from our read buffer
+ r->read_buffer[r->read_len] = '\0';
+
+ // Read the entire compressed block of compressed data
+ char compressed[compressed_message_size];
+ size_t compressed_bytes_read = 0;
+ do {
+ size_t start = compressed_bytes_read;
+ size_t remaining = compressed_message_size - start;
+
+ int last_read_bytes = read_stream(r, &compressed[start], remaining);
+ if (unlikely(last_read_bytes <= 0)) {
+ internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes);
+ return false;
+ }
+
+ compressed_bytes_read += last_read_bytes;
+
+ } while(unlikely(compressed_message_size > compressed_bytes_read));
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read);
+
+ // decompress the compressed block
+ size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read);
+ if (!bytes_to_parse) {
+ internal_error(true, "no bytes to parse.");
+ return false;
+ }
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse);
+
+ // fill read buffer with decompressed data
+ size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
+ if (!len) {
+ internal_error(true, "decompressor returned zero length #2");
+ return false;
+ }
+ r->read_len += (int)len;
+ r->read_buffer[r->read_len] = '\0';
+
+ return true;
+}
+#else // !ENABLE_COMPRESSION
+static bool receiver_read_compressed(struct receiver_state *r) {
+ return receiver_read_uncompressed(r);
+}
+#endif // ENABLE_COMPRESSION
+
+/* Produce a full line if one exists, statefully return where we start next time.
+ * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
+ */
+static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) {
+ size_t start = *pos;
+
+ char *ss = &r->read_buffer[start];
+ char *se = &r->read_buffer[r->read_len];
+ char *ds = buffer;
+ char *de = &buffer[buffer_length - 2];
+
+ if(ss >= se) {
+ *ds = '\0';
+ *pos = 0;
+ r->read_len = 0;
+ r->read_buffer[r->read_len] = '\0';
+ return NULL;
+ }
+
+ // copy all bytes to buffer
+ while(ss < se && ds < de && *ss != '\n')
+ *ds++ = *ss++;
+
+ // if we have a newline, return the buffer
+ if(ss < se && ds < de && *ss == '\n') {
+ // newline found in the r->read_buffer
+
+ *ds++ = *ss++; // copy the newline too
+ *ds = '\0';
+
+ *pos = ss - r->read_buffer;
+ return buffer;
+ }
+
+ // if the destination is full, oops!
+ if(ds == de) {
+ error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
+ *ds = '\0';
+ *pos = ss - r->read_buffer;
+ return buffer;
+ }
+
+ // no newline found in the r->read_buffer
+ // move everything to the beginning
+ memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
+ r->read_len -= (int)start;
+ r->read_buffer[r->read_len] = '\0';
+ *ds = '\0';
+ *pos = 0;
+ return NULL;
+}
+
+static void streaming_parser_thread_cleanup(void *ptr) {
+ PARSER *parser = (PARSER *)ptr;
+ rrd_collector_finished();
+ parser_destroy(parser);
+}
+
+static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
+ size_t result;
+
+ PARSER_USER_OBJECT user = {
+ .enabled = cd->enabled,
+ .host = rpt->host,
+ .opaque = rpt,
+ .cd = cd,
+ .trust_durations = 1
+ };
+
+ PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+
+ rrd_collector_started();
+
+ // this keeps the parser with its current value
+ // so, parser needs to be allocated before pushing it
+ netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser);
+
+ parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
+
+ user.parser = parser;
+
+ bool compressed_connection = false;
+#ifdef ENABLE_COMPRESSION
+ if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
+ compressed_connection = true;
+
+ if (!rpt->decompressor)
+ rpt->decompressor = create_decompressor();
+ else
+ rpt->decompressor->reset(rpt->decompressor);
+ }
+#endif
+
+ rpt->read_buffer[0] = '\0';
+ rpt->read_len = 0;
+
+ size_t read_buffer_start = 0;
+ char buffer[PLUGINSD_LINE_MAX + 2] = "";
+ while(!netdata_exit) {
+ if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) {
+ bool have_new_data;
+ if(compressed_connection)
+ have_new_data = receiver_read_compressed(rpt);
+ else
+ have_new_data = receiver_read_uncompressed(rpt);
+
+ if(!have_new_data)
+ break;
+
+ rpt->last_msg_t = now_realtime_sec();
+ continue;
+ }
+
+ if(unlikely(netdata_exit)) {
+ internal_error(true, "exiting...");
+ goto done;
+ }
+ if(unlikely(rpt->shutdown)) {
+ internal_error(true, "parser shutdown...");
+ goto done;
+ }
+
+ if (unlikely(parser_action(parser, buffer))) {
+ internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
+ break;
+ }
+ }
+
+done:
+ internal_error(true, "Streaming receiver thread stopping...");
+
+ result = user.count;
+
+ // free parser with the pop function
+ netdata_thread_cleanup_pop(1);
+
+ return result;
+}
+
+static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) {
+ RRDSET *st;
+ rrdset_foreach_read(st, rpt->host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ }
+ rrdset_foreach_done(st);
+ rrdhost_receiver_replicating_charts_zero(rpt->host);
+}
+
+static int rrdpush_receive(struct receiver_state *rpt)
+{
+ int history = default_rrd_history_entries;
+ RRD_MEMORY_MODE mode = default_rrd_memory_mode;
+ int health_enabled = default_health_enabled;
+ int rrdpush_enabled = default_rrdpush_enabled;
+ char *rrdpush_destination = default_rrdpush_destination;
+ char *rrdpush_api_key = default_rrdpush_api_key;
+ char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+ bool rrdpush_enable_replication = default_rrdpush_enable_replication;
+ time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
+ time_t rrdpush_replication_step = default_rrdpush_replication_step;
+ time_t alarms_delay = 60;
+
+ rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every);
+ if(rpt->update_every < 0) rpt->update_every = 1;
+
+ history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history);
+ history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history);
+ if(history < 5) history = 5;
+
+ mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode)));
+ mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode)));
+
+ if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
+ error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port);
+ mode = default_rrd_memory_mode;
+ }
+
+ health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled);
+ health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled);
+
+ alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay);
+ alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay);
+
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled);
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled);
+
+ rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination);
+ rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination);
+
+ rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key);
+ rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key);
+
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+
+ rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication);
+ rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication);
+
+ rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate);
+ rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate);
+
+ rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step);
+ rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step);
+
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression = default_compression_enabled;
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
+ rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression);
+ rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled);
+#endif //ENABLE_COMPRESSION
+
+ (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
+
+ if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT");
+ error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
+ char initial_response[HTTP_HEADER_SIZE + 1];
+ snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
+#ifdef ENABLE_HTTPS
+ if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+#else
+ if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
+#endif
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ close(rpt->fd);
+ return 0;
+ }
+ close(rpt->fd);
+ return 0;
+ }
+
+ if (rpt->host==NULL) {
+
+ rpt->host = rrdhost_find_or_create(
+ rpt->hostname
+ , rpt->registry_hostname
+ , rpt->machine_guid
+ , rpt->os
+ , rpt->timezone
+ , rpt->abbrev_timezone
+ , rpt->utc_offset
+ , rpt->tags
+ , rpt->program_name
+ , rpt->program_version
+ , rpt->update_every
+ , history
+ , mode
+ , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+ , rrdpush_destination
+ , rrdpush_api_key
+ , rrdpush_send_charts_matching
+ , rrdpush_enable_replication
+ , rrdpush_seconds_to_replicate
+ , rrdpush_replication_step
+ , rpt->system_info
+ , 0
+ );
+
+ if(!rpt->host) {
+ close(rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST");
+ error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port);
+ return 1;
+ }
+
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == NULL)
+ rpt->host->receiver = rpt;
+ else {
+ error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid);
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ close(rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION");
+ return 1;
+ }
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ }
+ else {
+ rrd_wrlock();
+ rrdhost_update(
+ rpt->host,
+ rpt->hostname,
+ rpt->registry_hostname,
+ rpt->machine_guid,
+ rpt->os,
+ rpt->timezone,
+ rpt->abbrev_timezone,
+ rpt->utc_offset,
+ rpt->tags,
+ rpt->program_name,
+ rpt->program_version,
+ rpt->update_every,
+ history,
+ mode,
+ (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO),
+ (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key),
+ rrdpush_destination,
+ rrdpush_api_key,
+ rrdpush_send_charts_matching,
+ rrdpush_enable_replication,
+ rrdpush_seconds_to_replicate,
+ rrdpush_replication_step,
+ rpt->system_info);
+ rrd_unlock();
+ }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ int ssl = 0;
+#ifdef ENABLE_HTTPS
+ if (rpt->ssl.conn != NULL)
+ ssl = 1;
+#endif
+ info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
+ , rpt->hostname
+ , rpt->client_ip
+ , rpt->client_port
+ , rrdhost_hostname(rpt->host)
+ , rpt->host->machine_guid
+ , rpt->host->rrd_update_every
+ , rpt->host->rrd_history_entries
+ , rrd_memory_mode_name(rpt->host->rrd_memory_mode)
+ , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
+ , ssl ? " SSL," : ""
+ , rrdhost_tags(rpt->host)
+ );
+#endif // NETDATA_INTERNAL_CHECKS
+
+
+ struct plugind cd = {
+ .enabled = 1,
+ .update_every = default_rrd_update_every,
+ .pid = 0,
+ .serial_failures = 0,
+ .successful_collections = 0,
+ .obsolete = 0,
+ .started_t = now_realtime_sec(),
+ .next = NULL,
+ .capabilities = 0,
+ };
+
+ // put the client IP and port into the buffers used by plugins.d
+ snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.filename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+
+#ifdef ENABLE_COMPRESSION
+ if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
+ if (!rpt->rrdpush_compression)
+ rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
+ }
+#endif
+
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ char initial_response[HTTP_HEADER_SIZE];
+ if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities);
+ }
+ else if (stream_has_capability(rpt, STREAM_CAP_VN)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities));
+ } else if (stream_has_capability(rpt, STREAM_CAP_V2)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
+ } else { // stream_has_capability(rpt, STREAM_CAP_V1)
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
+ }
+ debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
+#ifdef ENABLE_HTTPS
+ if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+#else
+ if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
+#endif
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ close(rpt->fd);
+ return 0;
+ }
+
+ // remove the non-blocking flag from the socket
+ if(sock_delnonblock(rpt->fd) < 0)
+ error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
+
+ struct timeval timeout;
+ timeout.tv_sec = 600;
+ timeout.tv_usec = 0;
+ if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
+ error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
+
+ rrdhost_wrlock(rpt->host);
+/* if(rpt->host->connected_senders > 0) {
+ rrdhost_unlock(rpt->host);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ fclose(fp);
+ return 0;
+ }
+*/
+
+// rpt->host->connected_senders++;
+ if(health_enabled != CONFIG_BOOLEAN_NO) {
+ if(alarms_delay > 0) {
+ rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
+ log_health(
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(rpt->host),
+ (int64_t)alarms_delay);
+ }
+ }
+ rpt->host->senders_connect_time = now_realtime_sec();
+ rpt->host->senders_last_chart_command = 0;
+ rpt->host->trigger_chart_obsoletion_check = 1;
+
+ rrdhost_unlock(rpt->host);
+
+ // call the plugins.d processor to receive the metrics
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
+
+ cd.capabilities = rpt->capabilities;
+
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // new child connected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, 1);
+#endif
+
+ rrdhost_set_is_parent_label(++localhost->senders_count);
+
+ rrdpush_receiver_replication_reset(rpt);
+ rrdcontext_host_child_connected(rpt->host);
+
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
+ size_t count = streaming_parser(rpt, &cd, rpt->fd,
+#ifdef ENABLE_HTTPS
+ (rpt->ssl.conn) ? &rpt->ssl : NULL
+#else
+ NULL
+#endif
+ );
+
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ rpt->key, rpt->host->machine_guid, rpt->hostname,
+ "DISCONNECTED");
+
+ error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
+ rpt->hostname, rpt->client_ip, rpt->client_port, count);
+
+ rrdcontext_host_child_disconnected(rpt->host);
+ rrdpush_receiver_replication_reset(rpt);
+
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // a child disconnected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, 0);
+#endif
+
+ rrdhost_set_is_parent_label(--localhost->senders_count);
+
+ // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
+ if (!netdata_exit && rpt->host) {
+ rrd_rdlock();
+ rrdhost_wrlock(rpt->host);
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == rpt) {
+ rpt->host->senders_connect_time = 0;
+ rpt->host->trigger_chart_obsoletion_check = 0;
+ rpt->host->senders_disconnected_time = now_realtime_sec();
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
+ if(health_enabled == CONFIG_BOOLEAN_AUTO)
+ rpt->host->health_enabled = 0;
+ }
+ rrdhost_unlock(rpt->host);
+ if (rpt->host->receiver == rpt) {
+ rrdpush_sender_thread_stop(rpt->host);
+ }
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ rrd_unlock();
+ }
+
+ // cleanup
+ close(rpt->fd);
+ return (int)count;
+}
+
+void *rrdpush_receiver_thread(void *ptr) {
+ netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
+
+ struct receiver_state *rpt = (struct receiver_state *)ptr;
+ info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+
+ worker_register("STREAMRCV");
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
+ rrdpush_receive(rpt);
+ worker_unregister();
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
diff --git a/streaming/replication.c b/streaming/replication.c
new file mode 100644
index 0000000..d659d70
--- /dev/null
+++ b/streaming/replication.c
@@ -0,0 +1,1446 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "replication.h"
+#include "Judy.h"
+
+#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
+#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
+
+#define WORKER_JOB_FIND_NEXT 1
+#define WORKER_JOB_QUERYING 2
+#define WORKER_JOB_DELETE_ENTRY 3
+#define WORKER_JOB_FIND_CHART 4
+#define WORKER_JOB_CHECK_CONSISTENCY 5
+#define WORKER_JOB_BUFFER_COMMIT 6
+#define WORKER_JOB_CLEANUP 7
+#define WORKER_JOB_WAIT 8
+
+// master thread worker jobs
+#define WORKER_JOB_STATISTICS 9
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 13
+#define WORKER_JOB_CUSTOM_METRIC_DONE 14
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16
+
+#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
+#define SECONDS_TO_RESET_POINT_IN_TIME 10
+
+static struct replication_query_statistics replication_queries = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .queries_started = 0,
+ .queries_finished = 0,
+ .points_read = 0,
+ .points_generated = 0,
+};
+
+struct replication_query_statistics replication_get_query_statistics(void) {
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ struct replication_query_statistics ret = replication_queries;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
+// sending replication replies
+
+struct replication_dimension {
+ STORAGE_POINT sp;
+ struct storage_engine_query_handle handle;
+ bool enabled;
+
+ DICTIONARY *dict;
+ const DICTIONARY_ITEM *rda;
+ RRDDIM *rd;
+};
+
+static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
+ size_t dimensions = rrdset_number_of_dimensions(st);
+ size_t points_read = 0, points_generated = 0;
+
+ struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
+ struct replication_dimension data[dimensions];
+ memset(data, 0, sizeof(data));
+
+ if(enable_streaming && st->last_updated.tv_sec > before) {
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)before,
+ (unsigned long long)st->last_updated.tv_sec
+ );
+ before = st->last_updated.tv_sec;
+ }
+
+ // prepare our array of dimensions
+ {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ continue;
+
+ if (unlikely(rd_dfe.counter >= dimensions)) {
+ internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ break;
+ }
+
+ struct replication_dimension *d = &data[rd_dfe.counter];
+
+ d->dict = rd_dfe.dict;
+ d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ d->rd = rd;
+
+ ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before);
+ d->enabled = true;
+ }
+ rrddim_foreach_done(rd);
+ }
+
+ time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
+ while(now <= before) {
+ time_t min_start_time = 0, min_end_time = 0;
+ for (size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ // fetch the first valid point for the dimension
+ int max_skip = 100;
+ while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) {
+ d->sp = ops->next_metric(&d->handle);
+ points_read++;
+ }
+
+ internal_error(max_skip <= 0,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now);
+
+ if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp)))
+ continue;
+
+ if(unlikely(!min_start_time)) {
+ min_start_time = d->sp.start_time;
+ min_end_time = d->sp.end_time;
+ }
+ else {
+ min_start_time = MIN(min_start_time, d->sp.start_time);
+ min_end_time = MIN(min_end_time, d->sp.end_time);
+ }
+ }
+
+ if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)min_start_time,
+ (unsigned long long)min_end_time,
+ (unsigned long long)wall_clock_time);
+ break;
+ }
+
+ if(unlikely(min_end_time < now)) {
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
+ break;
+ }
+
+ if(unlikely(min_end_time <= min_start_time))
+ min_start_time = min_end_time - st->update_every;
+
+ if(unlikely(!actual_after)) {
+ actual_after = min_end_time;
+ actual_before = min_end_time;
+ }
+ else
+ actual_before = min_end_time;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n"
+ , (unsigned long long)min_start_time
+ , (unsigned long long)min_end_time
+ , (unsigned long long)wall_clock_time
+ );
+
+ // output the replay values for this time
+ for (size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time))
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
+ rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+
+ else
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
+ rrddim_id(d->rd));
+
+ points_generated++;
+ }
+
+ now = min_end_time + 1;
+ }
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if(actual_after) {
+ char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
+ log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
+ log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
+ (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
+ }
+ else
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)after, (unsigned long long)before);
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
+
+ // release all the dictionary items acquired
+ // finalize the queries
+ size_t queries = 0;
+ for(size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ ops->finalize(&d->handle);
+
+ dictionary_acquired_item_release(d->dict, d->rda);
+
+ // update global statistics
+ queries++;
+ }
+
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ replication_queries.queries_started += queries;
+ replication_queries.queries_finished += queries;
+ replication_queries.points_read += points_read;
+ replication_queries.points_generated += points_generated;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
+
+ return before;
+}
+
+static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(!rd->exposed) continue;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
+ rrddim_id(rd),
+ (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
+ rd->last_collected_value,
+ rd->last_calculated_value,
+ rd->last_stored_value
+ );
+ }
+ rrddim_foreach_done(rd);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
+ (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
+ (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
+ );
+}
+
+bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+ time_t query_after = after;
+ time_t query_before = before;
+ time_t now = now_realtime_sec();
+ time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
+ // a data collection has been made
+ // so, we give this tolerance to detect invalid timestamps
+
+ // find the first entry we have
+ time_t first_entry_local = rrdset_first_entry_t(st);
+ if(first_entry_local > now + tolerance) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)first_entry_local, (unsigned long long)now);
+ first_entry_local = now;
+ }
+
+ if (query_after < first_entry_local)
+ query_after = first_entry_local;
+
+ // find the latest entry we have
+ time_t last_entry_local = st->last_updated.tv_sec;
+ if(!last_entry_local) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ last_entry_local = rrdset_last_entry_t(st);
+ if(!last_entry_local) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ last_entry_local = now;
+ }
+ }
+
+ if(last_entry_local > now + tolerance) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)last_entry_local, (unsigned long long)now);
+ last_entry_local = now;
+ }
+
+ if (query_before > last_entry_local)
+ query_before = last_entry_local;
+
+ // if the parent asked us to start streaming, then fill the rest with the data that we have
+ if (start_streaming)
+ query_before = last_entry_local;
+
+ if (query_after > query_before) {
+ time_t tmp = query_before;
+ query_before = query_after;
+ query_after = tmp;
+ }
+
+ bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
+
+ // we might want to optimize this by filling a temporary buffer
+ // and copying the result to the host's buffer in order to avoid
+ // holding the host's buffer lock for too long
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
+
+ if(after != 0 && before != 0)
+ before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
+ else {
+ after = 0;
+ before = 0;
+ enable_streaming = true;
+ }
+
+ // get again the world clock time
+ time_t world_clock_time = now_realtime_sec();
+ if(enable_streaming) {
+ if(now < world_clock_time) {
+ // we needed time to execute this request
+ // so, the parent will need to replicate more data
+ enable_streaming = false;
+ }
+ else
+ replicate_chart_collection_state(wb, st);
+ }
+
+ // end with first/last entries we have, and the first start time and
+ // last end time of the data we sent
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
+
+ // current chart update every
+ (int)st->update_every
+
+ // child first db time, child end db time
+ , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
+
+ // start streaming boolean
+ , enable_streaming ? "true" : "false"
+
+ // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
+ , (unsigned long long)after, (unsigned long long)before
+
+ // child world clock time
+ , (unsigned long long)world_clock_time
+ );
+
+ worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
+ sender_commit(host->sender, wb);
+ worker_is_busy(WORKER_JOB_CLEANUP);
+
+ return enable_streaming;
+}
+
+// ----------------------------------------------------------------------------
+// sending replication requests
+
+struct replication_request_details {
+ struct {
+ send_command callback;
+ void *data;
+ } caller;
+
+ RRDHOST *host;
+ RRDSET *st;
+
+ struct {
+ time_t first_entry_t; // the first entry time the child has
+ time_t last_entry_t; // the last entry time the child has
+ time_t world_time_t; // the current time of the child
+ } child_db;
+
+ struct {
+ time_t first_entry_t; // the first entry time we have
+ time_t last_entry_t; // the last entry time we have
+ bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed
+ time_t now; // the current local world clock time
+ } local_db;
+
+ struct {
+ time_t from; // the starting time of the entire gap we have
+ time_t to; // the ending time of the entire gap we have
+ } gap;
+
+ struct {
+ time_t after; // the start time we requested previously from this child
+ time_t before; // the end time we requested previously from this child
+ } last_request;
+
+ struct {
+ time_t after; // the start time of this replication request - the child will add 1 second
+ time_t before; // the end time of this replication request
+ bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
+ } wanted;
+};
+
+static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
+ RRDSET *st = r->st;
+
+ if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
+ st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.log_next_data_collection = true;
+
+ char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
+
+ if(r->wanted.after)
+ log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
+
+ if(r->wanted.before)
+ log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
+
+ internal_error(true,
+ "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
+ "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
+ , rrdhost_hostname(r->host), rrdset_id(r->st)
+ , r->wanted.after, wanted_after_buf
+ , r->wanted.before, wanted_before_buf
+ , r->wanted.start_streaming ? "YES" : "NO"
+ , msg
+ , r->last_request.after, r->last_request.before
+ , r->child_db.first_entry_t, r->child_db.last_entry_t
+ , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+ , r->local_db.first_entry_t, r->local_db.last_entry_t
+ , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
+ , r->gap.from, r->gap.to
+ , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
+ , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
+ );
+
+ st->replay.start_streaming = r->wanted.start_streaming;
+ st->replay.after = r->wanted.after;
+ st->replay.before = r->wanted.before;
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
+
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
+ (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
+
+ int ret = r->caller.callback(buffer, r->caller.data);
+ if (ret < 0) {
+ error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+ rrdhost_hostname(r->host), rrdset_id(r->st), ret);
+ return false;
+ }
+
+ return true;
+}
+
+bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
+ time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
+ time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
+{
+ struct replication_request_details r = {
+ .caller = {
+ .callback = callback,
+ .data = callback_data,
+ },
+
+ .host = host,
+ .st = st,
+
+ .child_db = {
+ .first_entry_t = first_entry_child,
+ .last_entry_t = last_entry_child,
+ .world_time_t = child_world_time,
+ },
+
+ .local_db = {
+ .first_entry_t = rrdset_first_entry_t(st),
+ .last_entry_t = rrdset_last_entry_t(st),
+ .last_entry_t_adjusted_to_now = false,
+ .now = now_realtime_sec(),
+ },
+
+ .last_request = {
+ .after = prev_first_entry_wanted,
+ .before = prev_last_entry_wanted,
+ },
+
+ .wanted = {
+ .after = 0,
+ .before = 0,
+ .start_streaming = true,
+ },
+ };
+
+ // check our local database retention
+ if(r.local_db.last_entry_t > r.local_db.now) {
+ r.local_db.last_entry_t = r.local_db.now;
+ r.local_db.last_entry_t_adjusted_to_now = true;
+ }
+
+ // let's find the GAP we have
+ if(!r.last_request.after || !r.last_request.before) {
+ // there is no previous request
+
+ if(r.local_db.last_entry_t)
+ // we have some data, let's continue from the last point we have
+ r.gap.from = r.local_db.last_entry_t;
+ else
+ // we don't have any data, the gap is the max timeframe we are allowed to replicate
+ r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
+
+ }
+ else {
+ // we had sent a request - let's continue at the point we left it
+ // for this we don't take into account the actual data in our db
+ // because the child may also have gaps, and we need to get over it
+ r.gap.from = r.last_request.before;
+ }
+
+ // we want all the data up to now
+ r.gap.to = r.local_db.now;
+
+ // The gap is now r.gap.from -> r.gap.to
+
+ if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
+ return send_replay_chart_cmd(&r, "empty replication request, replication is disabled");
+
+ if (unlikely(!r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, child has no stored data");
+
+ if (unlikely(!rrdset_number_of_dimensions(st)))
+ return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions");
+
+ if (r.child_db.first_entry_t <= 0)
+ return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid");
+
+ if (r.child_db.first_entry_t > r.child_db.last_entry_t)
+ return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)");
+
+ if (r.local_db.last_entry_t > r.child_db.last_entry_t)
+ return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one");
+
+ // let's find what the child can provide to fill that gap
+
+ if(r.child_db.first_entry_t > r.gap.from)
+ // the child does not have all the data - let's get what it has
+ r.wanted.after = r.child_db.first_entry_t;
+ else
+ // ok, the child can fill the entire gap we have
+ r.wanted.after = r.gap.from;
+
+ if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
+ // the duration is too big for one request - let's take the first step
+ r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
+ else
+ // wow, we can do it in one request
+ r.wanted.before = r.gap.to;
+
+ // don't ask from the child more than it has
+ if(r.wanted.before > r.child_db.last_entry_t)
+ r.wanted.before = r.child_db.last_entry_t;
+
+ if(r.wanted.after > r.wanted.before)
+ r.wanted.after = r.wanted.before;
+
+ // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
+ r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
+
+ // the wanted timeframe is now r.wanted.after -> r.wanted.before
+ // send it
+ return send_replay_chart_cmd(&r, "OK");
+}
+
+// ----------------------------------------------------------------------------
+// replication thread
+
+// replication request in sender DICTIONARY
+// used for de-duplicating the requests
+struct replication_request {
+ struct sender_state *sender; // the sender we should put the reply at
+ STRING *chart_id; // the chart of the request
+ time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
+ time_t before; // the end time of the query (maybe zero)
+ bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
+
+ usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
+ Word_t unique_id; // auto-increment, later requests have bigger
+ bool found; // used as a result boolean for the find call
+ bool indexed_in_judy; // true when the request is indexed in judy
+ bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
+};
+
+// replication sort entry in JudyL array
+// used for sorting all requests, across all nodes
+struct replication_sort_entry {
+ struct replication_request *rq;
+
+ size_t unique_id; // used as a key to identify the sort entry - we never access its contents
+};
+
+#define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
+
+// the global variables for the replication thread
+static struct replication_thread {
+ SPINLOCK spinlock;
+
+ struct {
+ size_t pending; // number of requests pending in the queue
+ Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
+
+ // statistics
+ size_t added; // number of requests added to the queue
+ size_t removed; // number of requests removed from the queue
+ size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
+ size_t senders_full; // number of times a sender reset our last position in the queue
+ size_t sender_resets; // number of times a sender reset our last position in the queue
+ time_t first_time_t; // the minimum 'after' we encountered
+
+ struct {
+ Word_t after;
+ Word_t unique_id;
+ Pvoid_t JudyL_array;
+ } queue;
+
+ } unsafe; // protected from replication_recursive_lock()
+
+ struct {
+ size_t executed; // the number of replication requests executed
+ size_t latest_first_time; // the 'after' timestamp of the last request we executed
+ } atomic; // access should be with atomic operations
+
+ struct {
+ size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
+
+ netdata_thread_t **threads_ptrs;
+ size_t threads;
+ } main_thread; // access is allowed only by the main thread
+
+} replication_globals = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .unsafe = {
+ .pending = 0,
+ .unique_id = 0,
+
+ .added = 0,
+ .removed = 0,
+ .pending_no_room = 0,
+ .sender_resets = 0,
+ .senders_full = 0,
+
+ .first_time_t = 0,
+
+ .queue = {
+ .after = 0,
+ .unique_id = 0,
+ .JudyL_array = NULL,
+ },
+ },
+ .atomic = {
+ .executed = 0,
+ .latest_first_time = 0,
+ },
+ .main_thread = {
+ .last_executed = 0,
+ .threads = 0,
+ .threads_ptrs = NULL,
+ },
+};
+
+#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
+#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
+
+static inline bool replication_recursive_lock_mode(char mode) {
+ static __thread int recursions = 0;
+
+ if(mode == 'L') { // (L)ock
+ if(++recursions == 1)
+ netdata_spinlock_lock(&replication_globals.spinlock);
+ }
+ else if(mode == 'U') { // (U)nlock
+ if(--recursions == 0)
+ netdata_spinlock_unlock(&replication_globals.spinlock);
+ }
+ else if(mode == 'C') { // (C)heck
+ if(recursions > 0)
+ return true;
+ else
+ return false;
+ }
+ else
+ fatal("REPLICATION: unknown lock mode '%c'", mode);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(recursions < 0)
+ fatal("REPLICATION: recursions is %d", recursions);
+#endif
+
+ return true;
+}
+
+#define replication_recursive_lock() replication_recursive_lock_mode('L')
+#define replication_recursive_unlock() replication_recursive_lock_mode('U')
+#define fatal_when_replication_is_not_locked_for_me() do { \
+ if(!replication_recursive_lock_mode('C')) \
+ fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
+} while(0)
+
+void replication_set_next_point_in_time(time_t after, size_t unique_id) {
+ replication_recursive_lock();
+ replication_globals.unsafe.queue.after = after;
+ replication_globals.unsafe.queue.unique_id = unique_id;
+ replication_recursive_unlock();
+}
+
+// ----------------------------------------------------------------------------
+// replication sort entry management
+
+static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
+ fatal_when_replication_is_not_locked_for_me();
+
+ struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+
+ rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
+
+ // copy the request
+ rse->rq = rq;
+ rse->unique_id = ++replication_globals.unsafe.unique_id;
+
+ // save the unique id into the request, to be able to delete it later
+ rq->unique_id = rse->unique_id;
+ rq->indexed_in_judy = false;
+ rq->not_indexed_buffer_full = false;
+ return rse;
+}
+
+static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
+ freez(rse);
+}
+
+static void replication_sort_entry_add(struct replication_request *rq) {
+ replication_recursive_lock();
+
+ if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
+ rq->indexed_in_judy = false;
+ rq->not_indexed_buffer_full = true;
+ replication_globals.unsafe.pending_no_room++;
+ replication_recursive_unlock();
+ return;
+ }
+
+ if(rq->not_indexed_buffer_full)
+ replication_globals.unsafe.pending_no_room--;
+
+ struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
+
+// if(rq->after < (time_t)replication_globals.protected.queue.after &&
+// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
+// !replication_globals.protected.skipped_no_room_since_last_reset) {
+//
+// // make it find this request first
+// replication_set_next_point_in_time(rq->after, rq->unique_id);
+// }
+
+ replication_globals.unsafe.added++;
+ replication_globals.unsafe.pending++;
+
+ Pvoid_t *inner_judy_ptr;
+
+ // find the outer judy entry, using after as key
+ inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ if(!inner_judy_ptr)
+ inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+
+ // add it to the inner judy, using unique_id as key
+ Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
+ *item = rse;
+ rq->indexed_in_judy = true;
+ rq->not_indexed_buffer_full = false;
+
+ if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
+ replication_globals.unsafe.first_time_t = rq->after;
+
+ replication_recursive_unlock();
+}
+
+static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
+ fatal_when_replication_is_not_locked_for_me();
+
+ bool inner_judy_deleted = false;
+
+ replication_globals.unsafe.removed++;
+ replication_globals.unsafe.pending--;
+
+ rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
+
+ rse->rq->indexed_in_judy = false;
+
+ // delete it from the inner judy
+ JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
+
+ // if no items left, delete it from the outer judy
+ if(**inner_judy_ppptr == NULL) {
+ JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
+ inner_judy_deleted = true;
+ }
+
+ // free memory
+ replication_sort_entry_destroy(rse);
+
+ return inner_judy_deleted;
+}
+
+static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
+ Pvoid_t *inner_judy_pptr;
+ struct replication_sort_entry *rse_to_delete = NULL;
+
+ replication_recursive_lock();
+ if(rq->indexed_in_judy) {
+
+ inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
+ if (inner_judy_pptr) {
+ Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
+ if (our_item_pptr) {
+ rse_to_delete = *our_item_pptr;
+ replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
+
+ if(buffer_full) {
+ replication_globals.unsafe.pending_no_room++;
+ rq->not_indexed_buffer_full = true;
+ }
+ }
+ }
+
+ if (!rse_to_delete)
+ fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
+ rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
+
+ }
+
+ replication_recursive_unlock();
+}
+
+static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) {
+ if(unlikely(first))
+ return JudyLFirst(PArray, PIndex, PJE0);
+
+ return JudyLNext(PArray, PIndex, PJE0);
+}
+
+static struct replication_request replication_request_get_first_available() {
+ Pvoid_t *inner_judy_pptr;
+
+ replication_recursive_lock();
+
+ struct replication_request rq_to_return = (struct replication_request){ .found = false };
+
+ if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
+ replication_globals.unsafe.queue.after = 0;
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
+
+ Word_t started_after = replication_globals.unsafe.queue.after;
+
+ size_t round = 0;
+ while(!rq_to_return.found) {
+ round++;
+
+ if(round > 2)
+ break;
+
+ if(round == 2) {
+ if(started_after == 0)
+ break;
+
+ replication_globals.unsafe.queue.after = 0;
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
+
+ bool find_same_after = true;
+ while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) {
+ Pvoid_t *our_item_pptr;
+
+ if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
+ break;
+
+ while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
+ struct replication_sort_entry *rse = *our_item_pptr;
+ struct replication_request *rq = rse->rq;
+
+ // copy the request to return it
+ rq_to_return = *rq;
+ rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
+
+ // set the return result to found
+ rq_to_return.found = true;
+
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
+ }
+
+ // call JudyLNext from now on
+ find_same_after = false;
+
+ // prepare for the next iteration on the outer loop
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
+ }
+
+ replication_recursive_unlock();
+ return rq_to_return;
+}
+
+// ----------------------------------------------------------------------------
+// replication request management
+
+static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
+ struct sender_state *s = sender_state; (void)s;
+ struct replication_request *rq = value;
+
+ // IMPORTANT:
+ // We use the react instead of the insert callback
+ // because we want the item to be atomically visible
+ // to our replication thread, immediately after.
+
+ // If we put this at the insert callback, the item is not guaranteed
+ // to be atomically visible to others, so the replication thread
+ // may see the replication sort entry, but fail to find the dictionary item
+ // related to it.
+
+ replication_sort_entry_add(rq);
+
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_plus_one(s);
+}
+
+static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
+ struct sender_state *s = sender_state; (void)s;
+ struct replication_request *rq = old_value; (void)rq;
+ struct replication_request *rq_new = new_value;
+
+ replication_recursive_lock();
+
+ if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ // we can replace this command
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+
+ rq->after = rq_new->after;
+ rq->before = rq_new->before;
+ rq->start_streaming = rq_new->start_streaming;
+ }
+ else if(!rq->indexed_in_judy) {
+ replication_sort_entry_add(rq);
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+ }
+ else {
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
+ dictionary_acquired_item_name(item),
+ (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
+ }
+
+ replication_recursive_unlock();
+
+ string_freez(rq_new->chart_id);
+ return false;
+}
+
+static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
+ struct replication_request *rq = value;
+
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_minus_one(rq->sender);
+
+ if(rq->indexed_in_judy)
+ replication_sort_entry_del(rq, false);
+
+ else if(rq->not_indexed_buffer_full) {
+ replication_recursive_lock();
+ replication_globals.unsafe.pending_no_room--;
+ replication_recursive_unlock();
+ }
+
+ string_freez(rq->chart_id);
+}
+
+static bool replication_execute_request(struct replication_request *rq, bool workers) {
+ bool ret = false;
+
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_FIND_CHART);
+
+ RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
+ if(!st) {
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
+ rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
+
+ goto cleanup;
+ }
+
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_QUERYING);
+
+ netdata_thread_disable_cancelability();
+
+ // send the replication data
+ bool start_streaming = replicate_chart_response(
+ st->rrdhost, st, rq->start_streaming, rq->after, rq->before);
+
+ netdata_thread_enable_cancelability();
+
+ if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) {
+ // enable normal streaming if we have to
+ // but only if the sender buffer has not been flushed since we started
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
+ }
+ else
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
+ rrdhost_hostname(st->rrdhost), string2str(rq->chart_id));
+ }
+
+ __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
+
+ ret = true;
+
+cleanup:
+ string_freez(rq->chart_id);
+ worker_is_idle();
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
+// public API
+
+void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
+ struct replication_request rq = {
+ .sender = sender,
+ .chart_id = string_strdupz(chart_id),
+ .after = after,
+ .before = before,
+ .start_streaming = start_streaming,
+ .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
+ .indexed_in_judy = false,
+ .not_indexed_buffer_full = false,
+ };
+
+ if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
+ replication_execute_request(&rq, false);
+
+ else
+ dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
+}
+
+void replication_sender_delete_pending_requests(struct sender_state *sender) {
+ // allow the dictionary destructor to go faster on locks
+ replication_recursive_lock();
+ dictionary_flush(sender->replication.requests);
+ replication_recursive_unlock();
+}
+
+void replication_init_sender(struct sender_state *sender) {
+ sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
+ dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
+ dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
+}
+
+void replication_cleanup_sender(struct sender_state *sender) {
+ // allow the dictionary destructor to go faster on locks
+ replication_recursive_lock();
+ dictionary_destroy(sender->replication.requests);
+ replication_recursive_unlock();
+}
+
+void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
+ size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
+
+ if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
+ rrdpush_sender_replication_buffer_full_set(s, true);
+
+ struct replication_request *rq;
+ dfe_start_read(s->replication.requests, rq) {
+ if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) {
+ replication_sort_entry_del(rq, true);
+ }
+ }
+ dfe_done(rq);
+
+ replication_recursive_lock();
+ replication_globals.unsafe.senders_full++;
+ replication_recursive_unlock();
+ }
+ else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
+ rrdpush_sender_replication_buffer_full_set(s, false);
+
+ struct replication_request *rq;
+ dfe_start_read(s->replication.requests, rq) {
+ if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ replication_sort_entry_add(rq);
+ }
+ }
+ dfe_done(rq);
+
+ replication_recursive_lock();
+ replication_globals.unsafe.senders_full--;
+ replication_globals.unsafe.sender_resets++;
+ // replication_set_next_point_in_time(0, 0);
+ replication_recursive_unlock();
+ }
+
+ rrdpush_sender_set_buffer_used_percent(s, percentage);
+}
+
+// ----------------------------------------------------------------------------
+// replication thread
+
+static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
+ internal_error(
+ host->sender &&
+ !rrdpush_sender_pending_replication_requests(host->sender) &&
+ dictionary_entries(host->sender->replication.requests) != 0,
+ "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
+ rrdhost_hostname(host),
+ rrdpush_sender_pending_replication_requests(host->sender),
+ dictionary_entries(host->sender->replication.requests)
+ );
+
+ size_t ok = 0;
+ size_t errors = 0;
+
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+ bool is_error = false;
+
+ if(!flags) {
+ internal_error(
+ true,
+ "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
+ rrdhost_hostname(host), rrdset_id(st)
+ );
+ is_error = true;
+ }
+
+ if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ internal_error(
+ true,
+ "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
+ rrdhost_hostname(host), rrdset_id(st)
+ );
+ is_error = true;
+ }
+
+ if(is_error)
+ errors++;
+ else
+ ok++;
+ }
+ rrdset_foreach_done(st);
+
+ internal_error(errors,
+ "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
+ rrdhost_hostname(host), ok, errors);
+
+ return errors;
+}
+
+static void verify_all_hosts_charts_are_streaming_now(void) {
+ worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
+
+ size_t errors = 0;
+ RRDHOST *host;
+ dfe_start_read(rrdhost_root_index, host)
+ errors += verify_host_charts_are_streaming_now(host);
+ dfe_done(host);
+
+ size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
+ info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
+ executed - replication_globals.main_thread.last_executed, errors);
+ replication_globals.main_thread.last_executed = executed;
+}
+
+static void replication_initialize_workers(bool master) {
+ worker_register("REPLICATION");
+ worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
+ worker_register_job_name(WORKER_JOB_QUERYING, "querying");
+ worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
+ worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
+ worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
+ worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
+ worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
+ worker_register_job_name(WORKER_JOB_WAIT, "wait");
+
+ if(master) {
+ worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
+ }
+}
+
+#define REQUEST_OK (0)
+#define REQUEST_QUEUE_EMPTY (-1)
+#define REQUEST_CHART_NOT_FOUND (-2)
+
+static int replication_execute_next_pending_request(void) {
+ worker_is_busy(WORKER_JOB_FIND_NEXT);
+ struct replication_request rq = replication_request_get_first_available();
+
+ if(unlikely(!rq.found)) {
+ worker_is_idle();
+ return REQUEST_QUEUE_EMPTY;
+ }
+
+ // delete the request from the dictionary
+ worker_is_busy(WORKER_JOB_DELETE_ENTRY);
+ if(!dictionary_del(rq.sender->replication.requests, string2str(rq.chart_id)))
+ error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index",
+ rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
+
+ replication_set_latest_first_time(rq.after);
+
+ if(unlikely(!replication_execute_request(&rq, true))) {
+ worker_is_idle();
+ return REQUEST_CHART_NOT_FOUND;
+ }
+
+ worker_is_idle();
+ return REQUEST_OK;
+}
+
+static void replication_worker_cleanup(void *ptr __maybe_unused) {
+ worker_unregister();
+}
+
+static void *replication_worker_thread(void *ptr) {
+ replication_initialize_workers(false);
+
+ netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
+
+ while(!netdata_exit) {
+ if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ worker_is_busy(WORKER_JOB_WAIT);
+ worker_is_idle();
+ sleep_usec(1 * USEC_PER_SEC);
+ }
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+static void replication_main_cleanup(void *ptr) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ int threads = (int)replication_globals.main_thread.threads;
+ for(int i = 0; i < threads ;i++) {
+ netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
+ freez(replication_globals.main_thread.threads_ptrs[i]);
+ }
+ freez(replication_globals.main_thread.threads_ptrs);
+ replication_globals.main_thread.threads_ptrs = NULL;
+
+ // custom code
+ worker_unregister();
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+void *replication_thread_main(void *ptr __maybe_unused) {
+ replication_initialize_workers(true);
+
+ int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
+ if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
+ error("replication threads given %d is invalid, resetting to 1", threads);
+ threads = 1;
+ }
+
+ if(--threads) {
+ replication_globals.main_thread.threads = threads;
+ replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
+
+ for(int i = 0; i < threads ;i++) {
+ replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
+ netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], "REPLICATION",
+ NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
+ }
+ }
+
+ netdata_thread_cleanup_push(replication_main_cleanup, ptr);
+
+ // start from 100% completed
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+
+ long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
+ bool slow = true; // control the time we sleep - it has to start with true!
+ usec_t last_now_mono_ut = now_monotonic_usec();
+ time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
+
+ size_t last_executed = 0;
+ size_t last_sender_resets = 0;
+
+ while(!netdata_exit) {
+
+ // statistics
+ usec_t now_mono_ut = now_monotonic_usec();
+ if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
+ last_now_mono_ut = now_mono_ut;
+
+ worker_is_busy(WORKER_JOB_STATISTICS);
+ replication_recursive_lock();
+
+ size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
+ if(last_executed != current_executed) {
+ run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
+ last_executed = current_executed;
+ slow = false;
+ }
+
+ if(replication_reset_next_point_in_time_countdown-- == 0) {
+ // once per second, make it scan all the pending requests next time
+ replication_set_next_point_in_time(0, 0);
+// replication_globals.protected.skipped_no_room_since_last_reset = 0;
+ replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
+ }
+
+ if(--run_verification_countdown == 0) {
+ if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
+ // reset the statistics about completion percentage
+ replication_globals.unsafe.first_time_t = 0;
+ replication_set_latest_first_time(0);
+
+ verify_all_hosts_charts_are_streaming_now();
+
+ run_verification_countdown = LONG_MAX;
+ slow = true;
+ }
+ else
+ run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
+ }
+
+ time_t latest_first_time_t = replication_get_latest_first_time();
+ if(latest_first_time_t && replication_globals.unsafe.pending) {
+ // completion percentage statistics
+ time_t now = now_realtime_sec();
+ time_t total = now - replication_globals.unsafe.first_time_t;
+ time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
+ (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
+ }
+ else
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
+
+ replication_recursive_unlock();
+ worker_is_idle();
+ }
+
+ if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+
+ worker_is_busy(WORKER_JOB_WAIT);
+ replication_recursive_lock();
+
+ // the timeout also defines now frequently we will traverse all the pending requests
+ // when the outbound buffers of all senders is full
+ usec_t timeout;
+ if(slow)
+ // no work to be done, wait for a request to come in
+ timeout = 1000 * USEC_PER_MS;
+
+ else if(replication_globals.unsafe.pending > 0) {
+ if(replication_globals.unsafe.sender_resets == last_sender_resets) {
+ timeout = 1000 * USEC_PER_MS;
+ }
+ else {
+ // there are pending requests waiting to be executed,
+ // but none could be executed at this time.
+ // try again after this time.
+ timeout = 100 * USEC_PER_MS;
+ }
+
+ last_sender_resets = replication_globals.unsafe.sender_resets;
+ }
+ else {
+ // no requests pending, but there were requests recently (run_verification_countdown)
+ // so, try in a short time.
+ // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
+ timeout = 10 * USEC_PER_MS;
+ last_sender_resets = replication_globals.unsafe.sender_resets;
+ }
+
+ replication_recursive_unlock();
+
+ worker_is_idle();
+ sleep_usec(timeout);
+
+ // make it scan all the pending requests next time
+ replication_set_next_point_in_time(0, 0);
+ replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
+
+ continue;
+ }
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
diff --git a/streaming/replication.h b/streaming/replication.h
new file mode 100644
index 0000000..00462cc
--- /dev/null
+++ b/streaming/replication.h
@@ -0,0 +1,33 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#include "daemon/common.h"
+
+struct replication_query_statistics {
+ SPINLOCK spinlock;
+ size_t queries_started;
+ size_t queries_finished;
+ size_t points_read;
+ size_t points_generated;
+};
+
+struct replication_query_statistics replication_get_query_statistics(void);
+
+bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before);
+
+typedef int (*send_command)(const char *txt, void *data);
+
+bool replicate_chart_request(send_command callback, void *callback_data,
+ RRDHOST *rh, RRDSET *rs,
+ time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
+ time_t response_first_start_time, time_t response_last_end_time);
+
+void replication_init_sender(struct sender_state *sender);
+void replication_cleanup_sender(struct sender_state *sender);
+void replication_sender_delete_pending_requests(struct sender_state *sender);
+void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming);
+void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s);
+
+#endif /* REPLICATION_H */
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
new file mode 100644
index 0000000..a57f1b0
--- /dev/null
+++ b/streaming/rrdpush.c
@@ -0,0 +1,1044 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+#include "parser/parser.h"
+
+/*
+ * rrdpush
+ *
+ * 3 threads are involved for all stream operations
+ *
+ * 1. a random data collection thread, calling rrdset_done_push()
+ * this is called for each chart.
+ *
+ * the output of this work is kept in a thread BUFFER
+ * the sender thread is signalled via a pipe (in RRDHOST)
+ *
+ * 2. a sender thread running at the sending netdata
+ * this is spawned automatically on the first chart to be pushed
+ *
+ * It tries to push the metrics to the remote netdata, as fast
+ * as possible (i.e. immediately after they are collected).
+ *
+ * 3. a receiver thread, running at the receiving netdata
+ * this is spawned automatically when the sender connects to
+ * the receiver.
+ *
+ */
+
+struct config stream_config = {
+ .first_section = NULL,
+ .last_section = NULL,
+ .mutex = NETDATA_MUTEX_INITIALIZER,
+ .index = {
+ .avl_tree = {
+ .root = NULL,
+ .compar = appconfig_section_compare
+ },
+ .rwlock = AVL_LOCK_INITIALIZER
+ }
+};
+
+unsigned int default_rrdpush_enabled = 0;
+#ifdef ENABLE_COMPRESSION
+unsigned int default_compression_enabled = 1;
+#endif
+char *default_rrdpush_destination = NULL;
+char *default_rrdpush_api_key = NULL;
+char *default_rrdpush_send_charts_matching = NULL;
+bool default_rrdpush_enable_replication = true;
+time_t default_rrdpush_seconds_to_replicate = 86400;
+time_t default_rrdpush_replication_step = 600;
+#ifdef ENABLE_HTTPS
+int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
+char *netdata_ssl_ca_path = NULL;
+char *netdata_ssl_ca_file = NULL;
+#endif
+
+static void load_stream_conf() {
+ errno = 0;
+ char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
+ if(!appconfig_load(&stream_config, filename, 0, NULL)) {
+ info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ freez(filename);
+
+ filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
+ if(!appconfig_load(&stream_config, filename, 0, NULL))
+ info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ }
+ freez(filename);
+}
+
+bool rrdpush_receiver_needs_dbengine() {
+ struct section *co;
+
+ for(co = stream_config.first_section; co; co = co->next) {
+ if(strcmp(co->name, "stream") == 0)
+ continue; // the first section is not relevant
+
+ char *s;
+
+ s = appconfig_get_by_section(co, "enabled", NULL);
+ if(!s || !appconfig_test_boolean_value(s))
+ continue;
+
+ s = appconfig_get_by_section(co, "default memory mode", NULL);
+ if(s && strcmp(s, "dbengine") == 0)
+ return true;
+
+ s = appconfig_get_by_section(co, "memory mode", NULL);
+ if(s && strcmp(s, "dbengine") == 0)
+ return true;
+ }
+
+ return false;
+}
+
+int rrdpush_init() {
+ // --------------------------------------------------------------------
+ // load stream.conf
+ load_stream_conf();
+
+ default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
+ default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
+ default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
+ default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
+
+ default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication);
+ default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
+ default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step);
+
+ rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
+
+#ifdef ENABLE_COMPRESSION
+ default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
+ "enable compression", default_compression_enabled);
+#endif
+
+ if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
+ error("STREAM [send]: cannot enable sending thread - information is missing.");
+ default_rrdpush_enabled = 0;
+ }
+
+#ifdef ENABLE_HTTPS
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) {
+ if (default_rrdpush_destination){
+ char *test = strstr(default_rrdpush_destination,":SSL");
+ if(test){
+ *test = 0X00;
+ netdata_use_ssl_on_stream = NETDATA_SSL_FORCE;
+ }
+ }
+ }
+
+ bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
+
+ if(invalid_certificate == CONFIG_BOOLEAN_YES){
+ if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
+ info("Netdata is configured to accept invalid SSL certificate.");
+ netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
+ }
+ }
+
+ netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
+ netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
+#endif
+
+ return default_rrdpush_enabled;
+}
+
+// data collection happens from multiple threads
+// each of these threads calls rrdset_done()
+// which in turn calls rrdset_done_push()
+// which uses this pipe to notify the streaming thread
+// that there are more data ready to be sent
+#define PIPE_READ 0
+#define PIPE_WRITE 1
+
+// to have the remote netdata re-sync the charts
+// to its current clock, we send for this many
+// iterations a BEGIN line without microseconds
+// this is for the first iterations of each chart
+unsigned int remote_clock_resync_iterations = 60;
+
+static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
+ if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
+ return false;
+
+ if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) {
+ RRDHOST *host = st->rrdhost;
+
+ if (flags & RRDSET_FLAG_ANOMALY_DETECTION) {
+ if(ml_streaming_enabled())
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+ else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
+ simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st)))
+
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+
+ // get the flags again, to know how to respond
+ flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+
+ return flags & RRDSET_FLAG_UPSTREAM_SEND;
+}
+
+int configured_as_parent() {
+ struct section *section = NULL;
+ int is_parent = 0;
+
+ appconfig_wrlock(&stream_config);
+ for (section = stream_config.first_section; section; section = section->next) {
+ uuid_t uuid;
+
+ if (uuid_parse(section->name, uuid) != -1 &&
+ appconfig_get_boolean_by_section(section, "enabled", 0)) {
+ is_parent = 1;
+ break;
+ }
+ }
+ appconfig_unlock(&stream_config);
+
+ return is_parent;
+}
+
+// chart labels
+static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
+ return 1;
+}
+
+static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
+ if (st->rrdlabels) {
+ if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0)
+ buffer_sprintf(wb, "CLABEL_COMMIT\n");
+ }
+}
+
+// Send the current chart definition.
+// Assumes that collector thread has already called sender_start for mutex / buffer state.
+static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
+ bool replication_progress = false;
+
+ RRDHOST *host = st->rrdhost;
+
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
+ // properly set the name for the remote end to parse it
+ char *name = "";
+ if(likely(st->name)) {
+ if(unlikely(st->id != st->name)) {
+ // they differ
+ name = strchr(rrdset_name(st), '.');
+ if(name)
+ name++;
+ else
+ name = "";
+ }
+ }
+
+ // send the chart
+ buffer_sprintf(
+ wb
+ , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
+ , rrdset_id(st)
+ , name
+ , rrdset_title(st)
+ , rrdset_units(st)
+ , rrdset_family(st)
+ , rrdset_context(st)
+ , rrdset_type_name(st->chart_type)
+ , st->priority
+ , st->update_every
+ , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
+ , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
+ , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
+ , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
+ , rrdset_plugin_name(st)
+ , rrdset_module_name(st)
+ );
+
+ // send the chart labels
+ if (stream_has_capability(host->sender, STREAM_CAP_CLABELS))
+ rrdpush_send_clabels(wb, st);
+
+ // send the dimensions
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ buffer_sprintf(
+ wb
+ , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
+ , rrddim_id(rd)
+ , rrddim_name(rd)
+ , rrd_algorithm_name(rd->algorithm)
+ , rd->multiplier
+ , rd->divisor
+ , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
+ );
+ rd->exposed = 1;
+ }
+ rrddim_foreach_done(rd);
+
+ // send the chart functions
+ if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ rrd_functions_expose_rrdpush(st, wb);
+
+ // send the chart local custom variables
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+
+ if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
+ time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0);
+ time_t last_entry_local = st->last_updated.tv_sec;
+
+ if(unlikely(!last_entry_local))
+ last_entry_local = rrdset_last_entry_t(st);
+
+ time_t now = now_realtime_sec();
+ if(unlikely(last_entry_local > now)) {
+ internal_error(true,
+ "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ last_entry_local, now);
+ last_entry_local = now;
+ }
+
+ if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) {
+ internal_error(true,
+ "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ first_entry_local, last_entry_local);
+ first_entry_local = last_entry_local - st->update_every;
+ }
+
+ if(unlikely(!first_entry_local && last_entry_local)) {
+ internal_error(true,
+ "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ first_entry_local, last_entry_local);
+ first_entry_local = last_entry_local;
+ }
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
+ (unsigned long long)first_entry_local,
+ (unsigned long long)last_entry_local,
+ (unsigned long long)now);
+
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ replication_progress = true;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
+ }
+
+ st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+ return replication_progress;
+}
+
+// sends the current chart dimensions
+static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) {
+ buffer_fast_strcat(wb, "BEGIN \"", 7);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "\" ", 2);
+
+ if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time)
+ buffer_print_llu(wb, st->usec_since_last_update);
+ else
+ buffer_fast_strcat(wb, "0", 1);
+
+ buffer_fast_strcat(wb, "\n", 1);
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(unlikely(!rd->updated))
+ continue;
+
+ if(likely(rd->exposed)) {
+ buffer_fast_strcat(wb, "SET \"", 5);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "\" = ", 4);
+ buffer_print_ll(wb, rd->collected_value);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
+ else {
+ internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
+ // we will include it in the next iteration
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ }
+ }
+ rrddim_foreach_done(rd);
+
+ if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+
+ buffer_fast_strcat(wb, "END\n", 4);
+}
+
+static void rrdpush_sender_thread_spawn(RRDHOST *host);
+
+// Called from the internal collectors to mark a chart obsolete.
+bool rrdset_push_chart_definition_now(RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
+ return false;
+
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_send_chart_definition(wb, st);
+ sender_commit(host->sender, wb);
+
+ return true;
+}
+
+void rrdset_done_push(RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+
+ // fetch the flags we need to check with one atomic operation
+ RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST);
+
+ // check if we are not connected
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
+
+ if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED))))
+ rrdpush_sender_thread_spawn(host);
+
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
+ error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ }
+
+ return;
+ }
+ else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
+ info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
+ }
+
+ RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
+ bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
+ bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+ if(unlikely((exposed_upstream && replication_in_progress) ||
+ !should_send_chart_matching(st, rrdset_flags)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ if(unlikely(!exposed_upstream))
+ replication_in_progress = rrdpush_send_chart_definition(wb, st);
+
+ if (likely(!replication_in_progress))
+ rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
+
+ sender_commit(host->sender, wb);
+}
+
+// labels
+static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
+ return 1;
+}
+void rrdpush_send_host_labels(RRDHOST *host) {
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb);
+ buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
+
+ sender_commit(host->sender, wb);
+}
+
+void rrdpush_claimed_id(RRDHOST *host)
+{
+ if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
+ return;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+ rrdhost_aclk_state_lock(host);
+
+ buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
+
+ rrdhost_aclk_state_unlock(host);
+ sender_commit(host->sender, wb);
+}
+
+int connect_to_one_of_destinations(
+ RRDHOST *host,
+ int default_port,
+ struct timeval *timeout,
+ size_t *reconnects_counter,
+ char *connected_to,
+ size_t connected_to_size,
+ struct rrdpush_destinations **destination)
+{
+ int sock = -1;
+
+ for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) {
+ time_t now = now_realtime_sec();
+
+ if(d->postpone_reconnection_until > now) {
+ info(
+ "STREAM %s: skipping destination '%s' (default port: %d) due to last error (code: %d, %s), will retry it in %d seconds",
+ rrdhost_hostname(host),
+ string2str(d->destination),
+ default_port,
+ d->last_handshake, d->last_error?d->last_error:"unset reason description",
+ (int)(d->postpone_reconnection_until - now));
+
+ continue;
+ }
+
+ info(
+ "STREAM %s: attempting to connect to '%s' (default port: %d)...",
+ rrdhost_hostname(host),
+ string2str(d->destination),
+ default_port);
+
+ if (reconnects_counter)
+ *reconnects_counter += 1;
+
+ sock = connect_to_this(string2str(d->destination), default_port, timeout);
+
+ if (sock != -1) {
+ if (connected_to && connected_to_size)
+ strncpyz(connected_to, string2str(d->destination), connected_to_size);
+
+ *destination = d;
+
+ // move the current item to the end of the list
+ // without this, this destination will break the loop again and again
+ // not advancing the destinations to find one that may work
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, d, prev, next);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(host->destinations, d, prev, next);
+
+ break;
+ }
+ }
+
+ return sock;
+}
+
+struct destinations_init_tmp {
+ RRDHOST *host;
+ struct rrdpush_destinations *list;
+ int count;
+};
+
+bool destinations_init_add_one(char *entry, void *data) {
+ struct destinations_init_tmp *t = data;
+
+ struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
+ d->destination = string_strdupz(entry);
+
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next);
+
+ t->count++;
+ info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
+
+ return false; // we return false, so that we will get all defined destinations
+}
+
+void rrdpush_destinations_init(RRDHOST *host) {
+ if(!host->rrdpush_send_destination) return;
+
+ rrdpush_destinations_free(host);
+
+ struct destinations_init_tmp t = {
+ .host = host,
+ .list = NULL,
+ .count = 0,
+ };
+
+ foreach_entry_in_connection_string(host->rrdpush_send_destination, destinations_init_add_one, &t);
+
+ host->destinations = t.list;
+}
+
+void rrdpush_destinations_free(RRDHOST *host) {
+ while (host->destinations) {
+ struct rrdpush_destinations *tmp = host->destinations;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next);
+ string_freez(tmp->destination);
+ freez(tmp);
+ }
+
+ host->destinations = NULL;
+}
+
+// ----------------------------------------------------------------------------
+// rrdpush sender thread
+
+// Either the receiver lost the connection or the host is being destroyed.
+// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
+void rrdpush_sender_thread_stop(RRDHOST *host) {
+
+ if (!host->sender)
+ return;
+
+ netdata_mutex_lock(&host->sender->mutex);
+ netdata_thread_t thr = 0;
+
+ if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+
+ info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host));
+
+ // signal the thread that we want to join it
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
+
+ // copy the thread id, so that we will be waiting for the right one
+ // even if a new one has been spawn
+ thr = host->rrdpush_sender_thread;
+
+ // signal it to cancel
+ netdata_thread_cancel(host->rrdpush_sender_thread);
+ }
+
+ netdata_mutex_unlock(&host->sender->mutex);
+
+ if(thr != 0) {
+ info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host));
+ void *result;
+ netdata_thread_join(thr, &result);
+ info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host));
+ }
+}
+
+
+// ----------------------------------------------------------------------------
+// rrdpush receiver thread
+
+void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
+ log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
+}
+
+
+static void rrdpush_sender_thread_spawn(RRDHOST *host) {
+ netdata_mutex_lock(&host->sender->mutex);
+
+ if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host));
+
+ if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender))
+ error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
+ else
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+ }
+
+ netdata_mutex_unlock(&host->sender->mutex);
+}
+
+int rrdpush_receiver_permission_denied(struct web_client *w) {
+ // we always respond with the same message and error code
+ // to prevent an attacker from gaining info about the error
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
+ return 401;
+}
+
+int rrdpush_receiver_too_busy_now(struct web_client *w) {
+ // we always respond with the same message and error code
+ // to prevent an attacker from gaining info about the error
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
+ return 503;
+}
+
+void *rrdpush_receiver_thread(void *ptr);
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
+ info("clients wants to STREAM metrics.");
+
+ char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *abbrev_timezone = "UTC", *tags = NULL;
+ int32_t utc_offset = 0;
+ int update_every = default_rrd_update_every;
+ uint32_t stream_version = UINT_MAX;
+ char buf[GUID_LEN + 1];
+
+ struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ system_info->hops = 1;
+ while(url) {
+ char *value = mystrsep(&url, "&");
+ if(!value || !*value) continue;
+
+ char *name = mystrsep(&value, "=");
+ if(!name || !*name) continue;
+ if(!value || !*value) continue;
+
+ if(!strcmp(name, "key"))
+ key = value;
+ else if(!strcmp(name, "hostname"))
+ hostname = value;
+ else if(!strcmp(name, "registry_hostname"))
+ registry_hostname = value;
+ else if(!strcmp(name, "machine_guid"))
+ machine_guid = value;
+ else if(!strcmp(name, "update_every"))
+ update_every = (int)strtoul(value, NULL, 0);
+ else if(!strcmp(name, "os"))
+ os = value;
+ else if(!strcmp(name, "timezone"))
+ timezone = value;
+ else if(!strcmp(name, "abbrev_timezone"))
+ abbrev_timezone = value;
+ else if(!strcmp(name, "utc_offset"))
+ utc_offset = (int32_t)strtol(value, NULL, 0);
+ else if(!strcmp(name, "hops"))
+ system_info->hops = (uint16_t) strtoul(value, NULL, 0);
+ else if(!strcmp(name, "ml_capable"))
+ system_info->ml_capable = strtoul(value, NULL, 0);
+ else if(!strcmp(name, "ml_enabled"))
+ system_info->ml_enabled = strtoul(value, NULL, 0);
+ else if(!strcmp(name, "mc_version"))
+ system_info->mc_version = strtoul(value, NULL, 0);
+ else if(!strcmp(name, "tags"))
+ tags = value;
+ else if(!strcmp(name, "ver"))
+ stream_version = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+ else {
+ // An old Netdata child does not have a compatible streaming protocol, map to something sane.
+ if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
+ name = "NETDATA_HOST_OS_NAME";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
+ name = "NETDATA_HOST_OS_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
+ name = "NETDATA_HOST_OS_ID_LIKE";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
+ name = "NETDATA_HOST_OS_VERSION";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
+ name = "NETDATA_HOST_OS_VERSION_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
+ name = "NETDATA_HOST_OS_DETECTION";
+ else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) {
+ stream_version = convert_stream_version_to_capabilities(1);
+ }
+
+ if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
+ info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
+ w->client_ip, w->client_port, name, value);
+ }
+ }
+ }
+
+ if (stream_version == UINT_MAX)
+ stream_version = convert_stream_version_to_capabilities(0);
+
+ if(!key || !*key) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
+ error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!hostname || !*hostname) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME");
+ error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!machine_guid || !*machine_guid) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID");
+ error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(regenerate_guid(key, buf) == -1) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID KEY");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(regenerate_guid(machine_guid, buf) == -1) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID MACHINE GUID");
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ const char *api_key_type = appconfig_get(&stream_config, key, "type", "api");
+ if(!api_key_type || !*api_key_type) api_key_type = "unknown";
+ if(strcmp(api_key_type, "api") != 0) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - API KEY GIVEN IS NOT API KEY");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, key, api_key_type);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ENABLED");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ {
+ SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ if(key_allow_from) {
+ if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
+ simple_pattern_free(key_allow_from);
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key);
+ return rrdpush_receiver_permission_denied(w);
+ }
+ simple_pattern_free(key_allow_from);
+ }
+ }
+
+ const char *machine_guid_type = appconfig_get(&stream_config, machine_guid, "type", "machine");
+ if(!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
+ if(strcmp(machine_guid_type, "machine") != 0) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID GIVEN IS NOT A MACHINE GUID");
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid, machine_guid_type);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ENABLED");
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ {
+ SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ if(machine_allow_from) {
+ if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
+ simple_pattern_free(machine_allow_from);
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
+ error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ return rrdpush_receiver_permission_denied(w);
+ }
+ simple_pattern_free(machine_allow_from);
+ }
+ }
+
+ if(unlikely(web_client_streaming_rate_t > 0)) {
+ static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER;
+ static volatile time_t last_stream_accepted_t = 0;
+
+ netdata_mutex_lock(&stream_rate_mutex);
+ time_t now = now_realtime_sec();
+
+ if(unlikely(last_stream_accepted_t == 0))
+ last_stream_accepted_t = now;
+
+ if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
+ netdata_mutex_unlock(&stream_rate_mutex);
+ rrdhost_system_info_free(system_info);
+ error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
+ return rrdpush_receiver_too_busy_now(w);
+ }
+
+ last_stream_accepted_t = now;
+ netdata_mutex_unlock(&stream_rate_mutex);
+ }
+
+ /*
+ * Quick path for rejecting multiple connections. The lock taken is fine-grained - it only protects the receiver
+ * pointer within the host (if a host exists). This protects against multiple concurrent web requests hitting
+ * separate threads within the web-server and landing here. The lock guards the thread-shutdown sequence that
+ * detaches the receiver from the host. If the host is being created (first time-access) then we also use the
+ * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a
+ * lookup to the now-attached structure).
+ */
+ struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+
+ rrd_rdlock();
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid);
+ if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
+ host = NULL;
+ if (host) {
+ rrdhost_wrlock(host);
+ netdata_mutex_lock(&host->receiver_lock);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->senders_disconnected_time = 0;
+ if (host->receiver != NULL) {
+ time_t age = now_realtime_sec() - host->receiver->last_msg_t;
+ if (age > 30) {
+ host->receiver->shutdown = 1;
+ shutdown(host->receiver->fd, SHUT_RDWR);
+ host->receiver = NULL; // Thread holds reference to structure
+ info(
+ "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
+ "existing connection is dead (%"PRId64" sec), accepting new connection.",
+ rrdhost_hostname(host),
+ w->client_ip,
+ w->client_port,
+ (int64_t)age);
+ }
+ else {
+ netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ rrd_unlock();
+ log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host),
+ "REJECTED - ALREADY CONNECTED");
+ info(
+ "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
+ "existing connection is active (within last %"PRId64" sec), rejecting new connection.",
+ rrdhost_hostname(host),
+ w->client_ip,
+ w->client_port,
+ (int64_t)age);
+ // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, "This GUID is already streaming to this server");
+ freez(rpt);
+ return 409;
+ }
+ }
+ host->receiver = rpt;
+ netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ }
+ rrd_unlock();
+
+ rpt->last_msg_t = now_realtime_sec();
+
+ rpt->host = host;
+ rpt->fd = w->ifd;
+ rpt->key = strdupz(key);
+ rpt->hostname = strdupz(hostname);
+ rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
+ rpt->machine_guid = strdupz(machine_guid);
+ rpt->os = strdupz(os);
+ rpt->timezone = strdupz(timezone);
+ rpt->abbrev_timezone = strdupz(abbrev_timezone);
+ rpt->utc_offset = utc_offset;
+ rpt->tags = (tags)?strdupz(tags):NULL;
+ rpt->client_ip = strdupz(w->client_ip);
+ rpt->client_port = strdupz(w->client_port);
+ rpt->update_every = update_every;
+ rpt->system_info = system_info;
+ rpt->capabilities = stream_version;
+#ifdef ENABLE_HTTPS
+ rpt->ssl.conn = w->ssl.conn;
+ rpt->ssl.flags = w->ssl.flags;
+
+ w->ssl.conn = NULL;
+ w->ssl.flags = NETDATA_SSL_START;
+#endif
+
+ if(w->user_agent && w->user_agent[0]) {
+ char *t = strchr(w->user_agent, '/');
+ if(t && *t) {
+ *t = '\0';
+ t++;
+ }
+
+ rpt->program_name = strdupz(w->user_agent);
+ if(t && *t) rpt->program_version = strdupz(t);
+ }
+
+
+
+ debug(D_SYSTEM, "starting STREAM receive thread.");
+
+ char tag[FILENAME_MAX + 1];
+ snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
+
+ if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
+ error("Failed to create new STREAM receive thread for client.");
+
+ // prevent the caller from closing the streaming socket
+ if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
+ web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
+ }
+ else {
+ if(w->ifd == w->ofd)
+ w->ifd = w->ofd = -1;
+ else
+ w->ifd = -1;
+ }
+
+ buffer_flush(w->response.data);
+ return 200;
+}
+
+static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
+ if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 ");
+ if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 ");
+ if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN ");
+ if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS ");
+ if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS ");
+ if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM ");
+ if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS ");
+ if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION ");
+ if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
+ if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
+ if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
+}
+
+void log_receiver_capabilities(struct receiver_state *rpt) {
+ BUFFER *wb = buffer_create(100);
+ stream_capabilities_to_string(wb, rpt->capabilities);
+
+ info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
+
+ buffer_free(wb);
+}
+
+void log_sender_capabilities(struct sender_state *s) {
+ BUFFER *wb = buffer_create(100);
+ stream_capabilities_to_string(wb, s->capabilities);
+
+ info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
+
+ buffer_free(wb);
+}
+
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
+ STREAM_CAPABILITIES caps = 0;
+
+ if(version <= 1) caps = STREAM_CAP_V1;
+ else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
+ else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
+ else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
+ else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION;
+ else caps = version;
+
+ if(caps & STREAM_CAP_VCAPS)
+ caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2|STREAM_CAP_VN);
+
+ if(caps & STREAM_CAP_VN)
+ caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2);
+
+ if(caps & STREAM_CAP_V2)
+ caps &= ~(STREAM_CAP_V1);
+
+ return caps & STREAM_OUR_CAPABILITIES;
+}
+
+int32_t stream_capabilities_to_vn(uint32_t caps) {
+ if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION;
+ if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
+ return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
+}
+
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
new file mode 100644
index 0000000..a0c7e8d
--- /dev/null
+++ b/streaming/rrdpush.h
@@ -0,0 +1,305 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDPUSH_H
+#define NETDATA_RRDPUSH_H 1
+
+#include "database/rrd.h"
+#include "libnetdata/libnetdata.h"
+#include "web/server/web_client.h"
+#include "daemon/common.h"
+
+#define CONNECTED_TO_SIZE 100
+
+// ----------------------------------------------------------------------------
+// obsolete versions - do not use anymore
+
+#define STREAM_OLD_VERSION_CLAIM 3
+#define STREAM_OLD_VERSION_CLABELS 4
+#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
+
+// ----------------------------------------------------------------------------
+// capabilities negotiation
+
+typedef enum {
+ // do not use the first 3 bits
+ STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
+ STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
+ STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
+ // v3 = claiming supported
+ // v4 = chart labels supported
+ // v5 = lz4 compression supported
+ STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
+ STREAM_CAP_HLABELS = (1 << 7), // host labels supported
+ STREAM_CAP_CLAIM = (1 << 8), // claiming supported
+ STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
+ STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
+ STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
+ STREAM_CAP_REPLICATION = (1 << 12), // replication supported
+ STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
+
+ // this must be signed int, so don't use the last bit
+ // needed for negotiating errors between parent and child
+} STREAM_CAPABILITIES;
+
+#ifdef ENABLE_COMPRESSION
+#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
+#else
+#define STREAM_HAS_COMPRESSION 0
+#endif // ENABLE_COMPRESSION
+
+#define STREAM_OUR_CAPABILITIES ( \
+ STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
+ STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
+ STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY )
+
+#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
+
+// ----------------------------------------------------------------------------
+// stream handshake
+
+#define HTTP_HEADER_SIZE 8192
+
+#define STREAMING_PROTOCOL_VERSION "1.1"
+#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
+#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
+
+#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
+#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
+#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
+
+typedef enum {
+ STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
+ STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
+ STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
+ STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
+ STREAM_HANDSHAKE_OK_V1 = 1,
+ STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
+ STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
+ STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
+ STREAM_HANDSHAKE_ERROR_DENIED = -4,
+ STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
+ STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
+ STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
+ STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
+ STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
+} STREAM_HANDSHAKE;
+
+
+// ----------------------------------------------------------------------------
+
+typedef struct {
+ char *os_name;
+ char *os_id;
+ char *os_version;
+ char *kernel_name;
+ char *kernel_version;
+} stream_encoded_t;
+
+#ifdef ENABLE_COMPRESSION
+struct compressor_state {
+ char *compression_result_buffer;
+ size_t compression_result_buffer_size;
+ struct compressor_data *data; // Compression API specific data
+ void (*reset)(struct compressor_state *state);
+ size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
+ void (*destroy)(struct compressor_state **state);
+};
+
+struct decompressor_state {
+ size_t signature_size;
+ size_t total_compressed;
+ size_t total_uncompressed;
+ size_t packet_count;
+ struct decompressor_stream *stream; // Decompression API specific data
+ void (*reset)(struct decompressor_state *state);
+ size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
+ size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+ size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
+ size_t (*get)(struct decompressor_state *state, char *data, size_t size);
+ void (*destroy)(struct decompressor_state **state);
+};
+#endif
+
+// Thread-local storage
+ // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+
+typedef enum {
+ SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
+ SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
+} SENDER_FLAGS;
+
+struct sender_state {
+ RRDHOST *host;
+ pid_t tid; // the thread id of the sender, from gettid()
+ SENDER_FLAGS flags;
+ int timeout;
+ int default_port;
+ usec_t reconnect_delay;
+ char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
+ size_t begin;
+ size_t reconnects_counter;
+ size_t sent_bytes;
+ size_t sent_bytes_on_this_connection;
+ size_t send_attempts;
+ time_t last_traffic_seen_t;
+ size_t not_connected_loops;
+ // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
+ // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
+ netdata_mutex_t mutex;
+ struct circular_buffer *buffer;
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
+ int read_len;
+ STREAM_CAPABILITIES capabilities;
+
+ int rrdpush_sender_pipe[2]; // collector to sender thread signaling
+ int rrdpush_sender_socket;
+
+#ifdef ENABLE_COMPRESSION
+ struct compressor_state *compressor;
+#endif
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl; // structure used to encrypt the connection
+#endif
+
+ struct {
+ DICTIONARY *requests; // de-duplication of replication requests, per chart
+
+ struct {
+ size_t pending_requests; // the currently outstanding replication requests
+ size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+ bool reached_max; // true when the sender buffer should not get more replication responses
+ } atomic;
+
+ } replication;
+
+ struct {
+ size_t buffer_used_percentage; // the current utilization of the sending buffer
+ usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ } atomic;
+};
+
+#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
+#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
+
+#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
+#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
+#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
+
+struct receiver_state {
+ RRDHOST *host;
+ netdata_thread_t thread;
+ int fd;
+ char *key;
+ char *hostname;
+ char *registry_hostname;
+ char *machine_guid;
+ char *os;
+ char *timezone; // Unused?
+ char *abbrev_timezone;
+ int32_t utc_offset;
+ char *tags;
+ char *client_ip; // Duplicated in pluginsd
+ char *client_port; // Duplicated in pluginsd
+ char *program_name; // Duplicated in pluginsd
+ char *program_version;
+ struct rrdhost_system_info *system_info;
+ int update_every;
+ STREAM_CAPABILITIES capabilities;
+ time_t last_msg_t;
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
+ int read_len;
+ unsigned int shutdown:1; // Tell the thread to exit
+ unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl;
+#endif
+#ifdef ENABLE_COMPRESSION
+ unsigned int rrdpush_compression;
+ struct decompressor_state *decompressor;
+#endif
+
+ time_t replication_first_time_t;
+};
+
+struct rrdpush_destinations {
+ STRING *destination;
+
+ const char *last_error;
+ time_t postpone_reconnection_until;
+ STREAM_HANDSHAKE last_handshake;
+
+ struct rrdpush_destinations *prev;
+ struct rrdpush_destinations *next;
+};
+
+extern unsigned int default_rrdpush_enabled;
+#ifdef ENABLE_COMPRESSION
+extern unsigned int default_compression_enabled;
+#endif
+extern char *default_rrdpush_destination;
+extern char *default_rrdpush_api_key;
+extern char *default_rrdpush_send_charts_matching;
+extern bool default_rrdpush_enable_replication;
+extern time_t default_rrdpush_seconds_to_replicate;
+extern time_t default_rrdpush_replication_step;
+extern unsigned int remote_clock_resync_iterations;
+
+void rrdpush_destinations_init(RRDHOST *host);
+void rrdpush_destinations_free(RRDHOST *host);
+
+void sender_init(RRDHOST *host);
+
+BUFFER *sender_start(struct sender_state *s);
+void sender_commit(struct sender_state *s, BUFFER *wb);
+void sender_cancel(struct sender_state *s);
+int rrdpush_init();
+bool rrdpush_receiver_needs_dbengine();
+int configured_as_parent();
+void rrdset_done_push(RRDSET *st);
+bool rrdset_push_chart_definition_now(RRDSET *st);
+void *rrdpush_sender_thread(void *ptr);
+void rrdpush_send_host_labels(RRDHOST *host);
+void rrdpush_claimed_id(RRDHOST *host);
+
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
+void rrdpush_sender_thread_stop(RRDHOST *host);
+
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
+void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
+int connect_to_one_of_destinations(
+ RRDHOST *host,
+ int default_port,
+ struct timeval *timeout,
+ size_t *reconnects_counter,
+ char *connected_to,
+ size_t connected_to_size,
+ struct rrdpush_destinations **destination);
+
+void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
+
+#ifdef ENABLE_COMPRESSION
+struct compressor_state *create_compressor();
+struct decompressor_state *create_decompressor();
+#endif
+
+void log_receiver_capabilities(struct receiver_state *rpt);
+void log_sender_capabilities(struct sender_state *s);
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
+int32_t stream_capabilities_to_vn(uint32_t caps);
+
+#include "replication.h"
+
+#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
new file mode 100644
index 0000000..62097e3
--- /dev/null
+++ b/streaming/sender.c
@@ -0,0 +1,1358 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+#include "parser/parser.h"
+
+#define WORKER_SENDER_JOB_CONNECT 0
+#define WORKER_SENDER_JOB_PIPE_READ 1
+#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
+#define WORKER_SENDER_JOB_EXECUTE 3
+#define WORKER_SENDER_JOB_SOCKET_SEND 4
+#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
+#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
+#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
+#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
+#define WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR 9
+#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
+#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
+#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
+#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
+#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
+#define WORKER_SENDER_JOB_BUFFER_RATIO 15
+#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
+#define WORKER_SENDER_JOB_BYTES_SENT 17
+#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
+#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
+#endif
+
+extern struct config stream_config;
+extern int netdata_use_ssl_on_stream;
+extern char *netdata_ssl_ca_path;
+extern char *netdata_ssl_ca_file;
+
+static __thread BUFFER *sender_thread_buffer = NULL;
+static __thread bool sender_thread_buffer_used = false;
+
+void sender_thread_buffer_free(void) {
+ if(sender_thread_buffer) {
+ buffer_free(sender_thread_buffer);
+ sender_thread_buffer = NULL;
+ }
+}
+
+// Collector thread starting a transmission
+BUFFER *sender_start(struct sender_state *s __maybe_unused) {
+ if(!sender_thread_buffer)
+ sender_thread_buffer = buffer_create(1024);
+
+ if(sender_thread_buffer_used)
+ fatal("STREAMING: thread buffer is used multiple times concurrently.");
+
+ sender_thread_buffer_used = true;
+ buffer_flush(sender_thread_buffer);
+ return sender_thread_buffer;
+}
+
+void sender_cancel(struct sender_state *s __maybe_unused) {
+ sender_thread_buffer_used = false;
+}
+
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
+
+#ifdef ENABLE_COMPRESSION
+/*
+* In case of stream compression buffer oveflow
+* Inform the user through the error log file and
+* deactivate compression by downgrading the stream protocol.
+*/
+static inline void deactivate_compression(struct sender_state *s) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
+ error("STREAM_COMPRESSION: Compression returned error, disabling it.");
+ s->flags &= ~SENDER_FLAG_COMPRESSION;
+ error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
+ rrdpush_sender_thread_close_socket(s->host);
+}
+#endif
+
+#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
+
+// Collector thread finishing a transmission
+void sender_commit(struct sender_state *s, BUFFER *wb) {
+
+ if(unlikely(wb != sender_thread_buffer))
+ fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
+
+ if(unlikely(!sender_thread_buffer_used))
+ fatal("STREAMING: sender is committing a buffer twice.");
+
+ sender_thread_buffer_used = false;
+
+ char *src = (char *)buffer_tostring(wb);
+ size_t src_len = buffer_strlen(wb);
+
+ if(unlikely(!src || !src_len))
+ return;
+
+ netdata_mutex_lock(&s->mutex);
+
+ if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
+ info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
+ rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
+
+ s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
+ }
+
+#ifdef ENABLE_COMPRESSION
+ if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
+ while(src_len) {
+ size_t size_to_compress = src_len;
+
+ if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) {
+ if (stream_has_capability(s, STREAM_CAP_BINARY))
+ size_to_compress = COMPRESSION_MAX_MSG_SIZE;
+ else {
+ if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
+ // we need to find the last newline
+ // so that the decompressor will have a whole line to work with
+
+ const char *t = &src[COMPRESSION_MAX_MSG_SIZE];
+ while (--t >= src)
+ if (unlikely(*t == '\n'))
+ break;
+
+ if (t <= src) {
+ size_to_compress = COMPRESSION_MAX_MSG_SIZE;
+ } else
+ size_to_compress = t - src + 1;
+ }
+ }
+ }
+
+ char *dst;
+ size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ if (!dst_len) {
+ error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
+ rrdhost_hostname(s->host), s->connected_to);
+
+ s->compressor->reset(s->compressor);
+ dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ if(!dst_len) {
+ error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
+ rrdhost_hostname(s->host), s->connected_to);
+
+ deactivate_compression(s);
+ netdata_mutex_unlock(&s->mutex);
+ return;
+ }
+ }
+
+ if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
+
+ src = src + size_to_compress;
+ src_len -= size_to_compress;
+ }
+ }
+ else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
+#else
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
+#endif
+
+ replication_recalculate_buffer_used_ratio_unsafe(s);
+
+ netdata_mutex_unlock(&s->mutex);
+ rrdpush_signal_sender_to_wake_up(s);
+}
+
+static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
+ buffer_sprintf(
+ wb
+ , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
+ , rrdvar_name(rva)
+ , rrdvar2number(rva)
+ );
+
+ debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
+}
+
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_sender_add_host_variable_to_buffer(wb, rva);
+ sender_commit(host->sender, wb);
+ }
+}
+
+struct custom_host_variables_callback {
+ BUFFER *wb;
+};
+
+static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
+ const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
+ struct custom_host_variables_callback *tmp = struct_ptr;
+ BUFFER *wb = tmp->wb;
+
+ if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
+ rrdpush_sender_add_host_variable_to_buffer(wb, rv);
+ return 1;
+ }
+ return 0;
+}
+
+static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ struct custom_host_variables_callback tmp = {
+ .wb = wb
+ };
+ int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
+ (void)ret;
+ sender_commit(host->sender, wb);
+
+ debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ }
+}
+
+// resets all the chart, so that their definitions
+// will be resent to the central netdata
+static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
+ error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host));
+
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+ st->upstream_resync_time = 0;
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st)
+ rd->exposed = 0;
+ rrddim_foreach_done(rd);
+ }
+ rrdset_foreach_done(st);
+
+ rrdhost_sender_replicating_charts_zero(host);
+}
+
+static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
+
+ netdata_mutex_lock(&host->sender->mutex);
+
+ // flush the output buffer from any data it may have
+ cbuffer_flush(host->sender->buffer);
+ replication_recalculate_buffer_used_ratio_unsafe(host->sender);
+
+ netdata_mutex_unlock(&host->sender->mutex);
+}
+
+static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
+
+ // stop all replication commands inflight
+ replication_sender_delete_pending_requests(host->sender);
+
+ // reset the state of all charts
+ rrdpush_sender_thread_reset_all_charts(host);
+
+ rrdpush_sender_replicating_charts_zero(host->sender);
+}
+
+static void rrdpush_sender_on_connect(RRDHOST *host) {
+ rrdpush_sender_cbuffer_flush(host);
+ rrdpush_sender_charts_and_replication_reset(host);
+ rrdpush_sender_thread_send_custom_host_variables(host);
+}
+
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+ if(host->sender->rrdpush_sender_socket != -1) {
+ close(host->sender->rrdpush_sender_socket);
+ host->sender->rrdpush_sender_socket = -1;
+ }
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ // do not flush the circular buffer here
+ // this function is called sometimes with the mutex lock, sometimes without the lock
+ rrdpush_sender_charts_and_replication_reset(host);
+}
+
+void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
+{
+ se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):"";
+ se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):"";
+ se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):"";
+ se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):"";
+ se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):"";
+}
+
+void rrdpush_clean_encoded(stream_encoded_t *se)
+{
+ if (se->os_name)
+ freez(se->os_name);
+
+ if (se->os_id)
+ freez(se->os_id);
+
+ if (se->os_version)
+ freez(se->os_version);
+
+ if (se->kernel_name)
+ freez(se->kernel_name);
+
+ if (se->kernel_version)
+ freez(se->kernel_version);
+}
+
+struct {
+ const char *response;
+ size_t length;
+ int32_t version;
+ bool dynamic;
+ const char *error;
+ int worker_job_id;
+ time_t postpone_reconnect_seconds;
+} stream_responses[] = {
+ {
+ .response = START_STREAMING_PROMPT_VN,
+ .length = sizeof(START_STREAMING_PROMPT_VN) - 1,
+ .version = STREAM_HANDSHAKE_OK_V3, // and above
+ .dynamic = true, // dynamic = we will parse the version / capabilities
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_PROMPT_V2,
+ .length = sizeof(START_STREAMING_PROMPT_V2) - 1,
+ .version = STREAM_HANDSHAKE_OK_V2,
+ .dynamic = false,
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_PROMPT_V1,
+ .length = sizeof(START_STREAMING_PROMPT_V1) - 1,
+ .version = STREAM_HANDSHAKE_OK_V1,
+ .dynamic = false,
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_ERROR_SAME_LOCALHOST,
+ .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
+ .dynamic = false,
+ .error = "remote server rejected this stream, the host we are trying to stream is its localhost",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
+ },
+ {
+ .response = START_STREAMING_ERROR_ALREADY_STREAMING,
+ .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
+ .dynamic = false,
+ .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ },
+ {
+ .response = START_STREAMING_ERROR_NOT_PERMITTED,
+ .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_DENIED,
+ .dynamic = false,
+ .error = "remote server denied access, probably we don't have the right API key?",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ },
+
+ // terminator
+ {
+ .response = NULL,
+ .length = 0,
+ .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
+ .dynamic = false,
+ .error = "remote node response is not understood, is it Netdata?",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ }
+};
+
+static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) {
+ int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE;
+
+ int i;
+ for(i = 0; stream_responses[i].response ; i++) {
+ if(stream_responses[i].dynamic &&
+ http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) &&
+ strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) {
+
+ version = str2i(&http[stream_responses[i].length]);
+ break;
+ }
+ else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) {
+ version = stream_responses[i].version;
+
+ break;
+ }
+ }
+ const char *error = stream_responses[i].error;
+ int worker_job_id = stream_responses[i].worker_job_id;
+ time_t delay = stream_responses[i].postpone_reconnect_seconds;
+
+ if(version >= STREAM_HANDSHAKE_OK_V1) {
+ host->destination->last_error = NULL;
+ host->destination->last_handshake = version;
+ host->destination->postpone_reconnection_until = 0;
+ s->capabilities = convert_stream_version_to_capabilities(version);
+ return true;
+ }
+
+ error("STREAM %s [send to %s]: %s.", rrdhost_hostname(host), s->connected_to, error);
+
+ worker_is_busy(worker_job_id);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = error;
+ host->destination->last_handshake = version;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
+ return false;
+}
+
+static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
+
+ struct timeval tv = {
+ .tv_sec = timeout,
+ .tv_usec = 0
+ };
+
+ // make sure the socket is closed
+ rrdpush_sender_thread_close_socket(host);
+
+ s->rrdpush_sender_socket = connect_to_one_of_destinations(
+ host
+ , default_port
+ , &tv
+ , &s->reconnects_counter
+ , s->connected_to
+ , sizeof(s->connected_to)-1
+ , &host->destination
+ );
+
+ if(unlikely(s->rrdpush_sender_socket == -1)) {
+ error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
+ return false;
+ }
+
+ info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
+
+#ifdef ENABLE_HTTPS
+ if(netdata_ssl_client_ctx){
+ host->sender->ssl.flags = NETDATA_SSL_START;
+ if (!host->sender->ssl.conn){
+ host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx);
+ if(!host->sender->ssl.conn){
+ error("Failed to allocate SSL structure.");
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+ }
+ else{
+ SSL_clear(host->sender->ssl.conn);
+ }
+
+ if (host->sender->ssl.conn)
+ {
+ if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) {
+ error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket);
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ } else{
+ host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
+ }
+ }
+ }
+ else {
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+#endif
+
+ // reset our capabilities to default
+ s->capabilities = STREAM_OUR_CAPABILITIES;
+
+#ifdef ENABLE_COMPRESSION
+ // If we don't want compression, remove it from our capabilities
+ if(!(s->flags & SENDER_FLAG_COMPRESSION))
+ s->capabilities &= ~STREAM_CAP_COMPRESSION;
+#endif // ENABLE_COMPRESSION
+
+ /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
+ version negotiation resulted in a high enough version.
+ */
+ stream_encoded_t se;
+ rrdpush_encode_variable(&se, host);
+
+ char http[HTTP_HEADER_SIZE + 1];
+ int eol = snprintfz(http, HTTP_HEADER_SIZE,
+ "STREAM "
+ "key=%s"
+ "&hostname=%s"
+ "&registry_hostname=%s"
+ "&machine_guid=%s"
+ "&update_every=%d"
+ "&os=%s"
+ "&timezone=%s"
+ "&abbrev_timezone=%s"
+ "&utc_offset=%d"
+ "&hops=%d"
+ "&ml_capable=%d"
+ "&ml_enabled=%d"
+ "&mc_version=%d"
+ "&tags=%s"
+ "&ver=%u"
+ "&NETDATA_INSTANCE_CLOUD_TYPE=%s"
+ "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
+ "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
+ "&NETDATA_SYSTEM_OS_NAME=%s"
+ "&NETDATA_SYSTEM_OS_ID=%s"
+ "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
+ "&NETDATA_SYSTEM_OS_VERSION=%s"
+ "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
+ "&NETDATA_SYSTEM_OS_DETECTION=%s"
+ "&NETDATA_HOST_IS_K8S_NODE=%s"
+ "&NETDATA_SYSTEM_KERNEL_NAME=%s"
+ "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
+ "&NETDATA_SYSTEM_ARCHITECTURE=%s"
+ "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
+ "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
+ "&NETDATA_SYSTEM_CONTAINER=%s"
+ "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
+ "&NETDATA_CONTAINER_OS_NAME=%s"
+ "&NETDATA_CONTAINER_OS_ID=%s"
+ "&NETDATA_CONTAINER_OS_ID_LIKE=%s"
+ "&NETDATA_CONTAINER_OS_VERSION=%s"
+ "&NETDATA_CONTAINER_OS_VERSION_ID=%s"
+ "&NETDATA_CONTAINER_OS_DETECTION=%s"
+ "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s"
+ "&NETDATA_SYSTEM_CPU_FREQ=%s"
+ "&NETDATA_SYSTEM_TOTAL_RAM=%s"
+ "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
+ "&NETDATA_PROTOCOL_VERSION=%s"
+ " HTTP/1.1\r\n"
+ "User-Agent: %s/%s\r\n"
+ "Accept: */*\r\n\r\n"
+ , host->rrdpush_send_api_key
+ , rrdhost_hostname(host)
+ , rrdhost_registry_hostname(host)
+ , host->machine_guid
+ , default_rrd_update_every
+ , rrdhost_os(host)
+ , rrdhost_timezone(host)
+ , rrdhost_abbrev_timezone(host)
+ , host->utc_offset
+ , host->system_info->hops + 1
+ , host->system_info->ml_capable
+ , host->system_info->ml_enabled
+ , host->system_info->mc_version
+ , rrdhost_tags(host)
+ , s->capabilities
+ , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
+ , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
+ , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
+ , se.os_name
+ , se.os_id
+ , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
+ , se.os_version
+ , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : ""
+ , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : ""
+ , (host->system_info->is_k8s_node) ? host->system_info->is_k8s_node : ""
+ , se.kernel_name
+ , se.kernel_version
+ , (host->system_info->architecture) ? host->system_info->architecture : ""
+ , (host->system_info->virtualization) ? host->system_info->virtualization : ""
+ , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
+ , (host->system_info->container) ? host->system_info->container : ""
+ , (host->system_info->container_detection) ? host->system_info->container_detection : ""
+ , (host->system_info->container_os_name) ? host->system_info->container_os_name : ""
+ , (host->system_info->container_os_id) ? host->system_info->container_os_id : ""
+ , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : ""
+ , (host->system_info->container_os_version) ? host->system_info->container_os_version : ""
+ , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : ""
+ , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : ""
+ , (host->system_info->host_cores) ? host->system_info->host_cores : ""
+ , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : ""
+ , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
+ , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
+ , STREAMING_PROTOCOL_VERSION
+ , rrdhost_program_name(host)
+ , rrdhost_program_version(host)
+ );
+ http[eol] = 0x00;
+ rrdpush_clean_encoded(&se);
+
+#ifdef ENABLE_HTTPS
+ if (!host->sender->ssl.flags) {
+ ERR_clear_error();
+ SSL_set_connect_state(host->sender->ssl.conn);
+ int err = SSL_connect(host->sender->ssl.conn);
+ if (err != 1){
+ err = SSL_get_error(host->sender->ssl.conn, err);
+ error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL));
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = "SSL error";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+ else {
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+ }
+ else {
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
+ if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
+ if ( security_test_certificate(host->sender->ssl.conn)) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ error("Closing the stream connection, because the server SSL certificate is not valid.");
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = "invalid SSL certificate";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+ }
+ }
+ }
+ }
+#endif
+
+ ssize_t bytes;
+
+ bytes = send_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ strlen(http),
+ 0,
+ timeout);
+
+ if(bytes <= 0) { // timeout is 0
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
+ rrdpush_sender_thread_close_socket(host);
+ error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+ host->destination->last_error = "timeout while sending request";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
+ return false;
+ }
+
+ info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to);
+
+ bytes = recv_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ HTTP_HEADER_SIZE,
+ 0,
+ timeout);
+
+ if(bytes <= 0) { // timeout is 0
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
+ rrdpush_sender_thread_close_socket(host);
+ error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+ host->destination->last_error = "timeout while expecting first response";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
+ return false;
+ }
+
+ http[bytes] = '\0';
+ debug(D_STREAM, "Response to sender from far end: %s", http);
+ if(!rrdpush_sender_validate_response(host, s, http, bytes))
+ return false;
+
+#ifdef ENABLE_COMPRESSION
+ if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
+ if(!s->compressor)
+ s->compressor = create_compressor();
+ else
+ s->compressor->reset(s->compressor);
+ }
+#endif //ENABLE_COMPRESSION
+
+ log_sender_capabilities(s);
+
+ if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+
+ if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+
+ debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
+
+ return true;
+}
+
+static bool attempt_to_connect(struct sender_state *state)
+{
+ state->send_attempts = 0;
+
+ if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
+ // reset the buffer, to properly send charts and metrics
+ rrdpush_sender_on_connect(state->host);
+
+ // send from the beginning
+ state->begin = 0;
+
+ // make sure the next reconnection will be immediate
+ state->not_connected_loops = 0;
+
+ // reset the bytes we have sent for this session
+ state->sent_bytes_on_this_connection = 0;
+
+ // let the data collection threads know we are ready
+ rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ return true;
+ }
+
+ // we couldn't connect
+
+ // increase the failed connections counter
+ state->not_connected_loops++;
+
+ // reset the number of bytes sent
+ state->sent_bytes_on_this_connection = 0;
+
+ // slow re-connection on repeating errors
+ sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds
+
+ return false;
+}
+
+// TCP window is open and we have data to transmit.
+static ssize_t attempt_to_send(struct sender_state *s) {
+ ssize_t ret = 0;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct circular_buffer *cb = s->buffer;
+#endif
+
+ netdata_mutex_lock(&s->mutex);
+ char *chunk;
+ size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
+ debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
+
+#ifdef ENABLE_HTTPS
+ SSL *conn = s->host->sender->ssl.conn ;
+ if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ ret = netdata_ssl_write(conn, chunk, outstanding);
+ else
+ ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
+#else
+ ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
+#endif
+
+ if (likely(ret > 0)) {
+ cbuffer_remove_unsafe(s->buffer, ret);
+ s->sent_bytes_on_this_connection += ret;
+ s->sent_bytes += ret;
+ debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
+ }
+ else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
+ debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
+ else if (ret == -1) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
+ debug(D_STREAM, "STREAM: Send failed - closing socket...");
+ error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ else
+ debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
+
+ replication_recalculate_buffer_used_ratio_unsafe(s);
+ netdata_mutex_unlock(&s->mutex);
+
+ return ret;
+}
+
+static ssize_t attempt_read(struct sender_state *s) {
+ ssize_t ret = 0;
+
+#ifdef ENABLE_HTTPS
+ if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
+ size_t desired = sizeof(s->read_buffer) - s->read_len - 1;
+ ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired);
+ if (ret > 0 ) {
+ s->read_len += (int)ret;
+ return ret;
+ }
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
+ rrdpush_sender_thread_close_socket(s->host);
+ return ret;
+ }
+#endif
+ ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
+ if (ret > 0) {
+ s->read_len += ret;
+ return ret;
+ }
+
+ if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
+ return ret;
+
+ if (ret == 0 || errno == ECONNRESET) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
+ error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
+ }
+ else {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
+ error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
+ }
+ rrdpush_sender_thread_close_socket(s->host);
+
+ return ret;
+}
+
+struct inflight_stream_function {
+ struct sender_state *sender;
+ STRING *transaction;
+ usec_t received_ut;
+};
+
+void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
+ struct inflight_stream_function *tmp = data;
+
+ struct sender_state *s = tmp->sender;
+
+ if(rrdhost_can_send_definitions_to_parent(s->host)) {
+ BUFFER *wb = sender_start(s);
+
+ pluginsd_function_result_begin_to_buffer(wb
+ , string2str(tmp->transaction)
+ , code
+ , functions_content_type_to_format(func_wb->contenttype)
+ , func_wb->expires);
+
+ buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
+ pluginsd_function_result_end_to_buffer(wb);
+
+ sender_commit(s, wb);
+
+ internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
+ rrdhost_hostname(s->host), s->connected_to,
+ string2str(tmp->transaction),
+ buffer_strlen(func_wb),
+ now_realtime_usec() - tmp->received_ut);
+ }
+ string_freez(tmp->transaction);
+ buffer_free(func_wb);
+ freez(tmp);
+}
+
+// This is just a placeholder until the gap filling state machine is inserted
+void execute_commands(struct sender_state *s) {
+ worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+
+ char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
+ *end = 0;
+ while( start < end && (newline = strchr(start, '\n')) ) {
+ *newline = '\0';
+
+ log_access("STREAM: %d from '%s' for host '%s': %s",
+ gettid(), s->connected_to, rrdhost_hostname(s->host), start);
+
+ internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
+
+ char *words[PLUGINSD_MAX_WORDS] = { NULL };
+ size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+
+ const char *keyword = get_word(words, num_words, 0);
+
+ if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+ else {
+ int timeout = str2i(timeout_s);
+ if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+
+ struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
+ tmp->received_ut = now_realtime_usec();
+ tmp->sender = s;
+ tmp->transaction = string_strdupz(transaction);
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1);
+
+ int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
+ if(code != HTTP_RESP_OK) {
+ rrd_call_function_error(wb, "Failed to route request to collector", code);
+ stream_execute_function_callback(wb, code, tmp);
+ }
+ }
+ }
+ else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
+
+ const char *chart_id = get_word(words, num_words, 1);
+ const char *start_streaming = get_word(words, num_words, 2);
+ const char *after = get_word(words, num_words, 3);
+ const char *before = get_word(words, num_words, 4);
+
+ if (!chart_id || !start_streaming || !after || !before) {
+ error("STREAM %s [send to %s] %s command is incomplete"
+ " (chart=%s, start_streaming=%s, after=%s, before=%s)",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ chart_id ? chart_id : "(unset)",
+ start_streaming ? start_streaming : "(unset)",
+ after ? after : "(unset)",
+ before ? before : "(unset)");
+ }
+ else {
+ replication_add_request(s, chart_id,
+ strtoll(after, NULL, 0),
+ strtoll(before, NULL, 0),
+ !strcmp(start_streaming, "true")
+ );
+ }
+ }
+ else {
+ error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ }
+
+ worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+ start = newline + 1;
+ }
+ if (start < end) {
+ memmove(s->read_buffer, start, end-start);
+ s->read_len = end - start;
+ }
+ else {
+ s->read_buffer[0] = '\0';
+ s->read_len = 0;
+ }
+}
+
+struct rrdpush_sender_thread_data {
+ struct sender_state *sender_state;
+ RRDHOST *host;
+ char *pipe_buffer;
+};
+
+static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
+ static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+
+ bool ret = true;
+
+ netdata_mutex_lock(&mutex);
+
+ int new_pipe_fds[2];
+ if(reopen) {
+ if(pipe(new_pipe_fds) != 0) {
+ error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
+ new_pipe_fds[PIPE_READ] = -1;
+ new_pipe_fds[PIPE_WRITE] = -1;
+ ret = false;
+ }
+ }
+
+ int old_pipe_fds[2];
+ old_pipe_fds[PIPE_READ] = pipe_fds[PIPE_READ];
+ old_pipe_fds[PIPE_WRITE] = pipe_fds[PIPE_WRITE];
+
+ if(reopen) {
+ pipe_fds[PIPE_READ] = new_pipe_fds[PIPE_READ];
+ pipe_fds[PIPE_WRITE] = new_pipe_fds[PIPE_WRITE];
+ }
+ else {
+ pipe_fds[PIPE_READ] = -1;
+ pipe_fds[PIPE_WRITE] = -1;
+ }
+
+ if(old_pipe_fds[PIPE_READ] > 2)
+ close(old_pipe_fds[PIPE_READ]);
+
+ if(old_pipe_fds[PIPE_WRITE] > 2)
+ close(old_pipe_fds[PIPE_WRITE]);
+
+ netdata_mutex_unlock(&mutex);
+ return ret;
+}
+
+void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
+ if(unlikely(s->tid == gettid()))
+ return;
+
+ RRDHOST *host = s->host;
+
+ int pipe_fd = s->rrdpush_sender_pipe[PIPE_WRITE];
+
+ // signal the sender there are more data
+ if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
+ error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
+ rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
+ }
+}
+
+static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
+ struct rrdpush_sender_thread_data *data = ptr;
+ worker_unregister();
+
+ RRDHOST *host = data->host;
+
+ netdata_mutex_lock(&host->sender->mutex);
+
+ info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host));
+
+ rrdpush_sender_thread_close_socket(host);
+ rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
+
+ if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN)) {
+ info("STREAM %s [send]: sending thread detaches itself.", rrdhost_hostname(host));
+ netdata_thread_detach(netdata_thread_self());
+ }
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+
+ info("STREAM %s [send]: sending thread now exits.", rrdhost_hostname(host));
+
+ netdata_mutex_unlock(&host->sender->mutex);
+
+ freez(data->pipe_buffer);
+ freez(data);
+}
+
+void sender_init(RRDHOST *host)
+{
+ if (host->sender)
+ return;
+
+ host->sender = callocz(1, sizeof(*host->sender));
+ host->sender->host = host;
+ host->sender->buffer = cbuffer_new(1024, 1024 * 1024);
+ host->sender->capabilities = STREAM_OUR_CAPABILITIES;
+
+ host->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
+ host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
+ host->sender->rrdpush_sender_socket = -1;
+
+#ifdef ENABLE_COMPRESSION
+ if(default_compression_enabled) {
+ host->sender->flags |= SENDER_FLAG_COMPRESSION;
+ host->sender->compressor = create_compressor();
+ }
+ else
+ host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
+#endif
+
+ netdata_mutex_init(&host->sender->mutex);
+ replication_init_sender(host->sender);
+}
+
+void *rrdpush_sender_thread(void *ptr) {
+ worker_register("STREAMSND");
+ worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
+ worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
+ worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive");
+ worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute");
+ worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send");
+
+ // disconnection reasons
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+
+ worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
+ worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
+
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
+
+ struct sender_state *s = ptr;
+ s->tid = gettid();
+
+ if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
+ !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
+ !*s->host->rrdpush_send_api_key) {
+ error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
+ rrdhost_hostname(s->host), s->tid);
+ return NULL;
+ }
+
+#ifdef ENABLE_HTTPS
+ if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ) {
+ static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
+ netdata_spinlock_lock(&sp);
+ if(!netdata_ssl_client_ctx) {
+ security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
+ ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+ }
+ netdata_spinlock_unlock(&sp);
+ }
+#endif
+
+ info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->tid);
+
+ s->timeout = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
+
+ s->default_port = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
+
+ s->buffer->max_size = (size_t)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10);
+
+ s->reconnect_delay = (unsigned int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
+
+ remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM,
+ "initial clock resync iterations",
+ remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
+
+ // initialize rrdpush globals
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ int pipe_buffer_size = 10 * 1024;
+#ifdef F_GETPIPE_SZ
+ pipe_buffer_size = fcntl(s->rrdpush_sender_pipe[PIPE_READ], F_GETPIPE_SZ);
+#endif
+ if(pipe_buffer_size < 10 * 1024)
+ pipe_buffer_size = 10 * 1024;
+
+ if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
+ error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ rrdhost_hostname(s->host));
+ return NULL;
+ }
+
+ struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
+ thread_data->pipe_buffer = mallocz(pipe_buffer_size);
+ thread_data->sender_state = s;
+ thread_data->host = s->host;
+
+ // reset our cleanup flags
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
+
+ netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
+
+ for(; rrdhost_has_rrdpush_sender_enabled(s->host) && !netdata_exit ;) {
+ // check for outstanding cancellation requests
+ netdata_thread_testcancel();
+
+ // The connection attempt blocks (after which we use the socket in nonblocking)
+ if(unlikely(s->rrdpush_sender_socket == -1)) {
+ worker_is_busy(WORKER_SENDER_JOB_CONNECT);
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ s->flags &= ~SENDER_FLAG_OVERFLOW;
+ s->read_len = 0;
+ s->buffer->read = 0;
+ s->buffer->write = 0;
+
+ if(unlikely(!attempt_to_connect(s)))
+ continue;
+
+ s->last_traffic_seen_t = now_monotonic_sec();
+ rrdpush_claimed_id(s->host);
+ rrdpush_send_host_labels(s->host);
+
+ rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+
+ continue;
+ }
+
+ // If the TCP window never opened then something is wrong, restart connection
+ if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout &&
+ !rrdpush_sender_pending_replication_requests(s) &&
+ !rrdpush_sender_replicating_charts(s)
+ )) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
+ error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
+ rrdpush_sender_thread_close_socket(s->host);
+ continue;
+ }
+
+ netdata_mutex_lock(&s->mutex);
+ size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL);
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
+ netdata_mutex_unlock(&s->mutex);
+
+ worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size);
+
+ if(outstanding)
+ s->send_attempts++;
+
+ if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
+ if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
+ error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ rrdhost_hostname(s->host));
+ rrdpush_sender_thread_close_socket(s->host);
+ break;
+ }
+ }
+
+ worker_is_idle();
+
+ // Wait until buffer opens in the socket or a rrdset_done_push wakes us
+ enum {
+ Collector = 0,
+ Socket = 1,
+ };
+ struct pollfd fds[2] = {
+ [Collector] = {
+ .fd = s->rrdpush_sender_pipe[PIPE_READ],
+ .events = POLLIN,
+ .revents = 0,
+ },
+ [Socket] = {
+ .fd = s->rrdpush_sender_socket,
+ .events = POLLIN | (outstanding ? POLLOUT : 0 ),
+ .revents = 0,
+ }
+ };
+ int poll_rc = poll(fds, 2, 1000);
+
+ debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
+ fds[Collector].revents, fds[Socket].revents, outstanding);
+
+ if(unlikely(netdata_exit)) break;
+
+ internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ],
+ "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to);
+
+ internal_error(fds[Socket].fd != s->rrdpush_sender_socket,
+ "STREAM %s [send to %s]: socket changed after poll().", rrdhost_hostname(s->host), s->connected_to);
+
+ // Spurious wake-ups without error - loop again
+ if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
+ debug(D_STREAM, "Spurious wakeup");
+ continue;
+ }
+
+ // Only errors from poll() are internal, but try restarting the connection
+ if(unlikely(poll_rc == -1)) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
+ error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
+ rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
+ rrdpush_sender_thread_close_socket(s->host);
+ continue;
+ }
+
+ // If we have data and have seen the TCP window open then try to close it by a transmission.
+ if(likely(outstanding && (fds[Socket].revents & POLLOUT))) {
+ worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
+ ssize_t bytes = attempt_to_send(s);
+ if(bytes > 0) {
+ s->last_traffic_seen_t = now_monotonic_sec();
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes);
+ }
+ }
+
+ // If the collector woke us up then empty the pipe to remove the signal
+ if (fds[Collector].revents & (POLLIN|POLLPRI)) {
+ worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
+ debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
+
+ if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
+ error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
+ }
+
+ // Read as much as possible to fill the buffer, split into full lines for execution.
+ if (fds[Socket].revents & POLLIN) {
+ worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
+ ssize_t bytes = attempt_read(s);
+ if(bytes > 0) {
+ s->last_traffic_seen_t = now_monotonic_sec();
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes);
+ }
+ }
+
+ if(unlikely(s->read_len))
+ execute_commands(s);
+
+ if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
+ char *error = NULL;
+
+ if (unlikely(fds[Collector].revents & POLLERR))
+ error = "pipe reports errors (POLLERR)";
+ else if (unlikely(fds[Collector].revents & POLLHUP))
+ error = "pipe closed (POLLHUP)";
+ else if (unlikely(fds[Collector].revents & POLLNVAL))
+ error = "pipe is invalid (POLLNVAL)";
+
+ if(error) {
+ rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
+ error("STREAM %s [send to %s]: restarting internal pipe: %s.",
+ rrdhost_hostname(s->host), s->connected_to, error);
+ }
+ }
+
+ if(unlikely(fds[Socket].revents & (POLLERR|POLLHUP|POLLNVAL))) {
+ char *error = NULL;
+
+ if (unlikely(fds[Socket].revents & POLLERR))
+ error = "socket reports errors (POLLERR)";
+ else if (unlikely(fds[Socket].revents & POLLHUP))
+ error = "connection closed by remote end (POLLHUP)";
+ else if (unlikely(fds[Socket].revents & POLLNVAL))
+ error = "connection is invalid (POLLNVAL)";
+
+ if(unlikely(error)) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR);
+ error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
+ rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ }
+
+ // protection from overflow
+ if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
+ errno = 0;
+ error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
+ rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests));
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
diff --git a/streaming/stream.conf b/streaming/stream.conf
new file mode 100644
index 0000000..7c9ccc9
--- /dev/null
+++ b/streaming/stream.conf
@@ -0,0 +1,246 @@
+# netdata configuration for aggregating data from remote hosts
+#
+# API keys authorize a pair of sending-receiving netdata servers.
+# Once their communication is authorized, they can exchange metrics for any
+# number of hosts.
+#
+# You can generate API keys, with the linux command: uuidgen
+
+
+# -----------------------------------------------------------------------------
+# 1. ON CHILD NETDATA - THE ONE THAT WILL BE SENDING METRICS
+
+[stream]
+ # Enable this on child nodes, to have them send metrics.
+ enabled = no
+
+ # Where is the receiving netdata?
+ # A space separated list of:
+ #
+ # [PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]
+ #
+ # If many are given, the first available will get the metrics.
+ #
+ # PROTOCOL = tcp, udp, or unix (only tcp and unix are supported by parent nodes)
+ # HOST = an IPv4, IPv6 IP, or a hostname, or a unix domain socket path.
+ # IPv6 IPs should be given with brackets [ip:address]
+ # INTERFACE = the network interface to use (only for IPv6)
+ # PORT = the port number or service name (/etc/services)
+ # SSL = when this word appear at the end of the destination string
+ # the Netdata will encrypt the connection with the parent.
+ #
+ # This communication is not HTTP (it cannot be proxied by web proxies).
+ destination =
+
+ # Skip Certificate verification?
+ # The netdata child is configurated to avoid invalid SSL/TLS certificate,
+ # so certificates that are self-signed or expired will stop the streaming.
+ # Case the server certificate is not valid, you can enable the use of
+ # 'bad' certificates setting the next option as 'yes'.
+ #ssl skip certificate verification = yes
+
+ # Certificate Authority Path
+ # OpenSSL has a default directory where the known certificates are stored.
+ # In case it is necessary, it is possible to change this rule using the variable
+ # "CApath", e.g. CApath = /etc/ssl/certs/
+ #
+ #CApath =
+
+ # Certificate Authority file
+ # When the Netdata parent has a certificate that is not recognized as valid,
+ # we can add it to the list of known certificates in "CApath" and give it to
+ # Netdata as an argument, e.g. CAfile = /etc/ssl/certs/cert.pem
+ #
+ #CAfile =
+
+ # The API_KEY to use (as the sender)
+ api key =
+
+ # Stream Compression
+ # The default is enabled
+ # You can control stream compression in this agent with options: yes | no
+ #enable compression = yes
+
+ # The timeout to connect and send metrics
+ timeout seconds = 60
+
+ # If the destination line above does not specify a port, use this
+ default port = 19999
+
+ # filter the charts to be streamed
+ # netdata SIMPLE PATTERN:
+ # - space separated list of patterns (use \ to include spaces in patterns)
+ # - use * as wildcard, any number of times within each pattern
+ # - prefix a pattern with ! for a negative match (ie not stream the charts it matches)
+ # - the order of patterns is important (left to right)
+ # To send all except a few, use: !this !that * (ie append a wildcard pattern)
+ send charts matching = *
+
+ # The buffer to use for sending metrics.
+ # 10MB is good for 60 seconds of data, so increase this if you expect latencies.
+ # The buffer is flushed on reconnects (this will not prevent gaps at the charts).
+ buffer size bytes = 10485760
+
+ # If the connection fails, or it disconnects,
+ # retry after that many seconds.
+ reconnect delay seconds = 5
+
+ # Sync the clock of the charts for that many iterations, when starting.
+ # It is ignored when replication is enabled
+ initial clock resync iterations = 60
+
+# -----------------------------------------------------------------------------
+# 2. ON PARENT NETDATA - THE ONE THAT WILL BE RECEIVING METRICS
+
+# You can have one API key per child,
+# or the same API key for all child nodes.
+#
+# netdata searches for options in this order:
+#
+# a) parent netdata settings (netdata.conf)
+# b) [stream] section (above)
+# c) [API_KEY] section (below, settings for the API key)
+# d) [MACHINE_GUID] section (below, settings for each machine)
+#
+# You can combine the above (the more specific setting will be used).
+
+# API key authentication
+# If the key is not listed here, it will not be able to push metrics.
+
+# [API_KEY] is [YOUR-API-KEY], i.e [11111111-2222-3333-4444-555555555555]
+[API_KEY]
+ # Default settings for this API key
+
+ # This GUID is to be used as an API key from remote agents connecting
+ # to this machine. Failure to match such a key, denies access.
+ # YOU MUST SET THIS FIELD ON ALL API KEYS.
+ type = api
+
+ # You can disable the API key, by setting this to: no
+ # The default (for unknown API keys) is: no
+ enabled = no
+
+ # A list of simple patterns matching the IPs of the servers that
+ # will be pushing metrics using this API key.
+ # The metrics are received via the API port, so the same IPs
+ # should also be matched at netdata.conf [web].allow connections from
+ allow from = *
+
+ # The default history in entries, for all hosts using this API key.
+ # You can also set it per host below.
+ # For the default db mode (dbengine), this is ignored.
+ #default history = 3600
+
+ # The default memory mode to be used for all hosts using this API key.
+ # You can also set it per host below.
+ # If you don't set it here, the memory mode of netdata.conf will be used.
+ # Valid modes:
+ # save save on exit, load on start
+ # map like swap (continuously syncing to disks - you need SSD)
+ # ram keep it in RAM, don't touch the disk
+ # none no database at all (use this on headless proxies)
+ # dbengine like a traditional database
+ #default memory mode = dbengine
+
+ # Shall we enable health monitoring for the hosts using this API key?
+ # 3 possible values:
+ # yes enable alarms
+ # no do not enable alarms
+ # auto enable alarms, only when the sending netdata is connected. For ephemeral child nodes or child system restarts,
+ # ensure that the netdata process on the child is gracefully stopped, to prevent invalid last_collected alarms
+ # You can also set it per host, below.
+ # The default is taken from [health].enabled of netdata.conf
+ #health enabled by default = auto
+
+ # postpone alarms for a short period after the sender is connected
+ default postpone alarms on connect seconds = 60
+
+ # need to route metrics differently? set these.
+ # the defaults are the ones at the [stream] section (above)
+ #default proxy enabled = yes | no
+ #default proxy destination = IP:PORT IP:PORT ...
+ #default proxy api key = API_KEY
+ #default proxy send charts matching = *
+
+ # Stream Compression
+ # By default it is enabled.
+ # You can control stream compression in this parent agent stream with options: yes | no
+ #enable compression = yes
+
+ # Replication
+ # Enable replication for all hosts using this api key. Default: enabled
+ #enable replication = yes
+
+ # How many seconds to replicate from each child. Default: a day
+ #seconds to replicate = 86400
+
+ # The duration we want to replicate per each step.
+ #replication_step = 600
+
+# -----------------------------------------------------------------------------
+# 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA
+# THIS IS OPTIONAL - YOU DON'T HAVE TO CONFIGURE IT
+
+# This section exists to give you finer control of the parent settings for each
+# child host, when the same API key is used by many netdata child nodes / proxies.
+#
+# Each netdata has a unique GUID - generated the first time netdata starts.
+# You can find it at /var/lib/netdata/registry/netdata.public.unique.id
+# (at the child).
+#
+# The host sending data will have one. If the host is not ephemeral,
+# you can give settings for each sending host here.
+
+[MACHINE_GUID]
+ # This GUID is to be used as a MACHINE GUID from remote agents connecting
+ # to this machine, not an API key.
+ # YOU MUST SET THIS FIELD ON ALL MACHINE GUIDs.
+ type = machine
+
+ # enable this host: yes | no
+ # When disabled, the parent will not receive metrics for this host.
+ # THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID.
+ # Use only the API key for security.
+ enabled = no
+
+ # A list of simple patterns matching the IPs of the servers that
+ # will be pushing metrics using this MACHINE GUID.
+ # The metrics are received via the API port, so the same IPs
+ # should also be matched at netdata.conf [web].allow connections from
+ # and at stream.conf [API_KEY].allow from
+ allow from = *
+
+ # The number of entries in the database.
+ # This is ignored for db mode dbengine.
+ #history = 3600
+
+ # The memory mode of the database: save | map | ram | none | dbengine
+ #memory mode = dbengine
+
+ # Health / alarms control: yes | no | auto
+ #health enabled = auto
+
+ # postpone alarms when the sender connects
+ postpone alarms on connect seconds = 60
+
+ # need to route metrics differently?
+ # the defaults are the ones at the [API KEY] section
+ #proxy enabled = yes | no
+ #proxy destination = IP:PORT IP:PORT ...
+ #proxy api key = API_KEY
+ #proxy send charts matching = *
+
+ # Stream Compression
+ # By default, enabled.
+ # You can control stream compression in this parent agent stream with options: yes | no
+ #enable compression = yes
+
+ # Replication
+ # Enable replication for all hosts using this api key.
+ #enable replication = yes
+
+ # How many seconds to replicate from each child.
+ #seconds to replicate = 86400
+
+ # The duration we want to replicate per each step.
+ #replication_step = 600