summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/Makefile.am12
-rw-r--r--streaming/README.md624
-rw-r--r--streaming/receiver.c499
-rw-r--r--streaming/rrdpush.c731
-rw-r--r--streaming/rrdpush.h117
-rw-r--r--streaming/sender.c723
-rw-r--r--streaming/stream.conf205
7 files changed, 2911 insertions, 0 deletions
diff --git a/streaming/Makefile.am b/streaming/Makefile.am
new file mode 100644
index 0000000..95c31ca
--- /dev/null
+++ b/streaming/Makefile.am
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_libconfig_DATA = \
+ stream.conf \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/streaming/README.md b/streaming/README.md
new file mode 100644
index 0000000..94ab1f2
--- /dev/null
+++ b/streaming/README.md
@@ -0,0 +1,624 @@
+<!--
+title: "Streaming and replication"
+description: "Replicate and mirror Netdata's metrics through real-time streaming from child to parent nodes. Then combine, correlate, and export."
+custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README.md
+-->
+
+# Streaming and replication
+
+Each Netdata is able to replicate/mirror its database to another Netdata, by streaming collected
+metrics, in real-time to it. This is quite different to [data archiving to third party time-series
+databases](/exporting/README.md).
+
+When Netdata streams metrics to another Netdata, the receiving one is able to perform everything a Netdata instance is
+capable of:
+
+- Visualize metrics with a dashboard
+- Run health checks that trigger alarms and send alarm notifications
+- Export metrics to a external time-series database
+
+The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
+There are also **proxies**, which collects metrics from a child and sends it to a parent.
+
+## Supported configurations
+
+### Netdata without a database or web API (headless collector)
+
+Local Netdata (child), **without any database or alarms**, collects metrics and sends them to another Netdata
+(parent).
+
+The node menu shows a list of all "databases streamed to" the parent. Clicking one of those links allows the user to
+view the full dashboard of the child node. The URL has the form
+`http://parent-host:parent-port/host/child-host/`.
+
+Alarms for the child are served by the parent.
+
+In this mode the child is just a plain data collector. It spawns all external plugins, but instead of maintaining a
+local database and accepting dashboard requests, it streams all metrics to the parent. The memory footprint is reduced
+significantly, to between 6 MiB and 40 MiB, depending on the enabled plugins. To reduce the memory usage as much as
+possible, refer to the [performance optimization guide](/docs/guides/configure/performance.md).
+
+The same parent can collect data for any number of child nodes.
+
+### Database Replication
+
+Local Netdata (child), **with a local database (and possibly alarms)**, collects metrics and
+sends them to another Netdata (parent).
+
+The user can use all the functions **at both** `http://child-ip:child-port/` and
+`http://parent-host:parent-port/host/child-host/`.
+
+The child and the parent may have different data retention policies for the same metrics.
+
+Alarms for the child are triggered by **both** the child and the parent (and actually
+each can have different alarms configurations or have alarms disabled).
+
+Take a note, that custom chart names, configured on the child, should be in the form `type.name` to work correctly. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
+
+### Netdata proxies
+
+Local Netdata (child), with or without a database, collects metrics and sends them to another
+Netdata (**proxy**), which may or may not maintain a database, which forwards them to another
+Netdata (parent).
+
+Alarms for the child can be triggered by any of the involved hosts that maintains a database.
+
+Any number of daisy chaining Netdata servers are supported, each with or without a database and
+with or without alarms for the child metrics.
+
+### mix and match with backends
+
+All nodes that maintain a database can also send their data to a backend database.
+This allows quite complex setups.
+
+Example:
+
+1. Netdata `A`, `B` do not maintain a database and stream metrics to Netdata `C`(live streaming functionality, i.e. this PR)
+2. Netdata `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (backends functionality)
+3. Netdata `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality, i.e. this PR)
+4. Netdata `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`
+5. Netdata `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (backends functionality)
+6. alarms are triggered by `H` for all hosts
+7. users can use all the Netdata that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
+
+## Configuration
+
+These are options that affect the operation of Netdata in this area:
+
+```
+[global]
+ memory mode = none | ram | save | map | dbengine
+```
+
+`[global].memory mode = none` disables the database at this host. This also disables health
+monitoring (there cannot be health monitoring without a database).
+
+```
+[web]
+ mode = none | static-threaded
+ accept a streaming request every seconds = 0
+```
+
+`[web].mode = none` disables the API (Netdata will not listen to any ports).
+This also disables the registry (there cannot be a registry without an API).
+
+`accept a streaming request every seconds` can be used to set a limit on how often a parent node will accept streaming
+requests from its child nodes. 0 sets no limit, 1 means maximum once every second. If this is set, you may see error log
+entries "... too busy to accept new streaming request. Will be allowed in X secs".
+
+```
+[backend]
+ enabled = yes | no
+ type = graphite | opentsdb
+ destination = IP:PORT ...
+ update every = 10
+```
+
+`[backend]` configures data archiving to a backend (it archives all databases maintained on
+this host).
+
+### streaming configuration
+
+A new file is introduced: `stream.conf` (to edit it on your system run
+`/etc/netdata/edit-config stream.conf`). This file holds streaming configuration for both the
+sending and the receiving Netdata.
+
+API keys are used to authorize the communication of a pair of sending-receiving Netdata.
+Once the communication is authorized, the sending Netdata can push metrics for any number of hosts.
+
+You can generate an API key with the command `uuidgen`. API keys are just random GUIDs.
+You can use the same API key on all your Netdata, or use a different API key for any pair of
+sending-receiving Netdata.
+
+##### options for the sending node
+
+This is the section for the sending Netdata. On the receiving node, `[stream].enabled` can be `no`.
+If it is `yes`, the receiving node will also stream the metrics to another node (i.e. it will be
+a proxy).
+
+```
+[stream]
+ enabled = yes | no
+ destination = IP:PORT[:SSL] ...
+ api key = XXXXXXXXXXX
+```
+
+This is an overview of how these options can be combined:
+
+| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|backend|alarms|dashboard|
+|------|:-------------:|:----------:|:----------------:|:-----:|:----:|:-------:|
+| headless collector|`none`|`none`|`yes`|only for `data source = as collected`|not possible|no|
+| headless proxy|`none`|not `none`|`yes`|only for `data source = as collected`|not possible|no|
+| proxy with db|not `none`|not `none`|`yes`|possible|possible|yes|
+| central netdata|not `none`|not `none`|`no`|possible|possible|yes|
+
+For the options to encrypt the data stream between the child and the parent, refer to [securing the communication](#securing-streaming-communications)
+
+##### options for the receiving node
+
+`stream.conf` looks like this:
+
+```sh
+# replace API_KEY with your uuidgen generated GUID
+[API_KEY]
+ enabled = yes
+ default history = 3600
+ default memory mode = save
+ health enabled by default = auto
+ allow from = *
+```
+
+You can add many such sections, one for each API key. The above are used as default values for
+all hosts pushed with this API key.
+
+You can also add sections like this:
+
+```sh
+# replace MACHINE_GUID with the child /var/lib/netdata/registry/netdata.public.unique.id
+[MACHINE_GUID]
+ enabled = yes
+ history = 3600
+ memory mode = save
+ health enabled = yes
+ allow from = *
+```
+
+The above is the parent configuration of a single host, at the parent end. `MACHINE_GUID` is
+the unique id the Netdata generating the metrics (i.e. the Netdata that originally collects
+them `/var/lib/netdata/registry/netdata.unique.id`). So, metrics for Netdata `A` that pass through
+any number of other Netdata, will have the same `MACHINE_GUID`.
+
+You can also use `default memory mode = dbengine` for an API key or `memory mode = dbengine` for
+ a single host. The additional `page cache size` and `dbengine multihost disk space` configuration options
+ are inherited from the global Netdata configuration.
+
+##### allow from
+
+`allow from` settings are [Netdata simple patterns](/libnetdata/simple_pattern/README.md): string matches
+that use `*` as wildcard (any number of times) and a `!` prefix for a negative match.
+So: `allow from = !10.1.2.3 10.*` will allow all IPs in `10.*` except `10.1.2.3`. The order is
+important: left to right, the first positive or negative match is used.
+
+`allow from` is available in Netdata v1.9+
+
+##### tracing
+
+When a child is trying to push metrics to a parent or proxy, it logs entries like these:
+
+```
+2017-02-25 01:57:44: netdata: ERROR: Failed to connect to '10.11.12.1', port '19999' (errno 111, Connection refused)
+2017-02-25 01:57:44: netdata: ERROR: STREAM costa-pc [send to 10.11.12.1:19999]: failed to connect
+2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: initializing communication...
+2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: waiting response from remote netdata...
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: established communication - sending metrics...
+2017-02-25 01:58:14: netdata: ERROR: STREAM costa-pc [send]: discarding 1900 bytes of metrics already in the buffer.
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [send]: ready - sending metrics...
+```
+
+The receiving end (proxy or parent) logs entries like these:
+
+```
+2017-02-25 01:58:04: netdata: INFO : STREAM [receive from [10.11.12.11]:33554]: new client connection.
+2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [10.11.12.11]:33554: receive thread created (task id 7698)
+2017-02-25 01:58:14: netdata: INFO : Host 'costa-pc' with guid '12345678-b5a6-11e6-8a50-00508db7e9c9' initialized, os: linux, update every: 1, memory mode: ram, history entries: 3600, streaming: disabled, health: enabled, cache_dir: '/var/cache/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9', varlib_dir: '/var/lib/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9', health_log: '/var/lib/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9/health/health-log.db', alarms default handler: '/usr/libexec/netdata/plugins.d/alarm-notify.sh', alarms default recipient: 'root'
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [receive from [10.11.12.11]:33554]: initializing communication...
+2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [receive from [10.11.12.11]:33554]: receiving metrics...
+```
+
+For Netdata v1.9+, streaming can also be monitored via `access.log`.
+
+### Securing streaming communications
+
+Netdata does not activate TLS encryption by default. To encrypt streaming connections, you first need to [enable TLS support](/web/server/README.md#enabling-tls-support) on the parent. With encryption enabled on the receiving side, you need to instruct the child to use TLS/SSL as well. On the child's `stream.conf`, configure the destination as follows:
+
+```
+[stream]
+ destination = host:port:SSL
+```
+
+The word `SSL` appended to the end of the destination tells the child that connections must be encrypted.
+
+> While Netdata uses Transport Layer Security (TLS) 1.2 to encrypt communications rather than the obsolete SSL protocol,
+> it's still common practice to refer to encrypted web connections as `SSL`. Many vendors, like Nginx and even Netdata
+> itself, use `SSL` in configuration files, whereas documentation will always refer to encrypted communications as `TLS`
+> or `TLS/SSL`.
+
+#### Certificate verification
+
+When TLS/SSL is enabled on the child, the default behavior will be to not connect with the parent unless the server's certificate can be verified via the default chain. In case you want to avoid this check, add the following to the child's `stream.conf` file:
+
+```
+[stream]
+ ssl skip certificate verification = yes
+```
+
+#### Trusted certificate
+
+If you've enabled [certificate verification](#certificate-verification), you might see errors from the OpenSSL library when there's a problem with checking the certificate chain (`X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY`). More importantly, OpenSSL will reject self-signed certificates.
+
+Given these known issues, you have two options. If you trust your certificate, you can set the options `CApath` and `CAfile` to inform Netdata where your certificates, and the certificate trusted file, are stored.
+
+For more details about these options, you can read about [verify locations](https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_load_verify_locations.html).
+
+Before you changed your streaming configuration, you need to copy your trusted certificate to your child system and add the certificate to OpenSSL's list.
+
+On most Linux distributions, the `update-ca-certificates` command searches inside the `/usr/share/ca-certificates` directory for certificates. You should double-check by reading the `update-ca-certificate` manual (`man update-ca-certificate`), and then change the directory in the below commands if needed.
+
+If you have `sudo` configured on your child system, you can use that to run the following commands. If not, you'll have to log in as `root` to complete them.
+
+```
+# mkdir /usr/share/ca-certificates/netdata
+# cp parent_cert.pem /usr/share/ca-certificates/netdata/parent_cert.crt
+# chown -R netdata.netdata /usr/share/ca-certificates/netdata/
+```
+
+First, you create a new directory to store your certificates for Netdata. Next, you need to change the extension on your certificate from `.pem` to `.crt` so it's compatible with `update-ca-certificate`. Finally, you need to change permissions so the user that runs Netdata can access the directory where you copied in your certificate.
+
+Next, edit the file `/etc/ca-certificates.conf` and add the following line:
+
+```
+netdata/parent_cert.crt
+```
+
+Now you update the list of certificates running the following, again either as `sudo` or `root`:
+
+```
+# update-ca-certificates
+```
+
+> Some Linux distributions have different methods of updating the certificate list. For more details, please read this
+> guide on [adding trusted root certificates](https://github.com/Busindre/How-to-Add-trusted-root-certificates).
+
+Once you update your certificate list, you can set the stream parameters for Netdata to trust the parent certificate. Open `stream.conf` for editing and change the following lines:
+
+```
+[stream]
+ CApath = /etc/ssl/certs/
+ CAfile = /etc/ssl/certs/parent_cert.pem
+```
+
+With this configuration, the `CApath` option tells Netdata to search for trusted certificates inside `/etc/ssl/certs`. The `CAfile` option specifies the Netdata parent certificate is located at `/etc/ssl/certs/parent_cert.pem`. With this configuration, you can skip using the system's entire list of certificates and use Netdata's parent certificate instead.
+
+#### Expected behaviors
+
+With the introduction of TLS/SSL, the parent-child communication behaves as shown in the table below, depending on the following configurations:
+
+- **Parent TLS (Yes/No)**: Whether the `[web]` section in `netdata.conf` has `ssl key` and `ssl certificate`.
+- **Parent port TLS (-/force/optional)**: Depends on whether the `[web]` section `bind to` contains a `^SSL=force` or `^SSL=optional` directive on the port(s) used for streaming.
+- **Child TLS (Yes/No)**: Whether the destination in the child's `stream.conf` has `:SSL` at the end.
+- **Child TLS Verification (yes/no)**: Value of the child's `stream.conf` `ssl skip certificate verification` parameter (default is no).
+
+| Parent TLS enabled|Parent port SSL|Child TLS|Child SSL Ver.|Behavior|
+|:----------------:|:-------------:|:-------:|:------------:|:-------|
+| No|-|No|no|Legacy behavior. The parent-child stream is unencrypted.|
+| Yes|force|No|no|The parent rejects the child connection.|
+| Yes|-/optional|No|no|The parent-child stream is unencrypted (expected situation for legacy child nodes and newer parent nodes)|
+| Yes|-/force/optional|Yes|no|The parent-child stream is encrypted, provided that the parent has a valid TLS/SSL certificate. Otherwise, the child refuses to connect.|
+| Yes|-/force/optional|Yes|yes|The parent-child stream is encrypted.|
+
+## Viewing remote host dashboards, using mirrored databases
+
+On any receiving Netdata, that maintains remote databases and has its web server enabled,
+The node menu will include a list of the mirrored databases.
+
+![image](https://cloud.githubusercontent.com/assets/2662304/24080824/24cd2d3c-0caf-11e7-909d-a8dd1dbb95d7.png)
+
+Selecting any of these, the server will offer a dashboard using the mirrored metrics.
+
+## Monitoring ephemeral nodes
+
+Auto-scaling is probably the most trendy service deployment strategy these days.
+
+Auto-scaling detects the need for additional resources and boots VMs on demand, based on a template. Soon after they start running the applications, a load balancer starts distributing traffic to them, allowing the service to grow horizontally to the scale needed to handle the load. When demands falls, auto-scaling starts shutting down VMs that are no longer needed.
+
+![Monitoring ephemeral nodes with Netdata](https://cloud.githubusercontent.com/assets/2662304/23627426/65a9074a-02b9-11e7-9664-cd8f258a00af.png)
+
+What a fantastic feature for controlling infrastructure costs! Pay only for what you need for the time you need it!
+
+In auto-scaling, all servers are ephemeral, they live for just a few hours. Every VM is a brand new instance of the application, that was automatically created based on a template.
+
+So, how can we monitor them? How can we be sure that everything is working as expected on all of them?
+
+### The Netdata way
+
+We recently made a significant improvement at the core of Netdata to support monitoring such setups.
+
+Following the Netdata way of monitoring, we wanted:
+
+1. **real-time performance monitoring**, collecting ***thousands of metrics per server per second***, visualized in interactive, automatically created dashboards.
+2. **real-time alarms**, for all nodes.
+3. **zero configuration**, all ephemeral servers should have exactly the same configuration, and nothing should be configured at any system for each of the ephemeral nodes. We shouldn't care if 10 or 100 servers are spawned to handle the load.
+4. **self-cleanup**, so that nothing needs to be done for cleaning up the monitoring infrastructure from the hundreds of nodes that may have been monitored through time.
+
+### How it works
+
+All monitoring solutions, including Netdata, work like this:
+
+1. Collect metrics from the system and the running applications
+2. Store metrics in a time-series database
+3. Examine metrics periodically, for triggering alarms and sending alarm notifications
+4. Visualize metrics so that users can see what exactly is happening
+
+Netdata used to be self-contained, so that all these functions were handled entirely by each server. The changes we made, allow each Netdata to be configured independently for each function. So, each Netdata can now act as:
+
+- A self-contained system, much like it used to be.
+- A data collector that collects metrics from a host and pushes them to another Netdata (with or without a local database and alarms).
+- A proxy, which receives metrics from other hosts and pushes them immediately to other Netdata servers. Netdata proxies can also be `store and forward proxies` meaning that they are able to maintain a local database for all metrics passing through them (with or without alarms).
+- A time-series database node, where data are kept, alarms are run and queries are served to visualise the metrics.
+
+### Configuring an auto-scaling setup
+
+![A diagram of an auto-scaling setup with Netdata](https://user-images.githubusercontent.com/1153921/84290043-0c1c1600-aaf8-11ea-9757-dd8dd8a8ec6c.png)
+
+You need a Netdata parent. This node should not be ephemeral. It will be the node where all ephemeral child
+nodes will send their metrics.
+
+The parent will need to authorize child nodes to receive their metrics. This is done with an API key.
+
+#### API keys
+
+API keys are just random GUIDs. Use the Linux command `uuidgen` to generate one. You can use the same API key for all your child nodes, or you can configure one API for each of them. This is entirely your decision.
+
+We suggest to use the same API key for each ephemeral node template you have, so that all replicas of the same ephemeral node will have exactly the same configuration.
+
+I will use this API_KEY: `11111111-2222-3333-4444-555555555555`. Replace it with your own.
+
+#### Configuring the parent
+
+On the parent, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+
+```bash
+[11111111-2222-3333-4444-555555555555]
+ # enable/disable this API key
+ enabled = yes
+
+ # one hour of data for each of the child nodes
+ default history = 3600
+
+ # do not save child metrics on disk
+ default memory = ram
+
+ # alarms checks, only while the child is connected
+ health enabled by default = auto
+```
+
+_`stream.conf` on the parent, to enable receiving metrics from its child nodes using the API key._
+
+If you used many API keys, you can add one such section for each API key.
+
+When done, restart Netdata on the parent node. It is now ready to receive metrics.
+
+Note that `health enabled by default = auto` will still trigger `last_collected` alarms, if a connected child does not exit gracefully. If the `netdata` process running on the child is
+stopped, it will close the connection to the parent, ensuring that no `last_collected` alarms are triggered. For example, a proper container restart would first terminate
+the `netdata` process, but a system power issue would leave the connection open on the parent side. In the second case, you will still receive alarms.
+
+#### Configuring the child nodes
+
+On each of the child nodes, edit `/etc/netdata/stream.conf` (to edit it on your system run `/etc/netdata/edit-config stream.conf`) and set these:
+
+```bash
+[stream]
+ # stream metrics to another Netdata
+ enabled = yes
+
+ # the IP and PORT of the parent
+ destination = 10.11.12.13:19999
+
+ # the API key to use
+ api key = 11111111-2222-3333-4444-555555555555
+```
+
+_`stream.conf` on child nodes, to enable pushing metrics to their parent at `10.11.12.13:19999`._
+
+Using just the above configuration, the child nodes will be pushing their metrics to the parent Netdata, but they will still maintain a local database of the metrics and run health checks. To disable them, edit `/etc/netdata/netdata.conf` and set:
+
+```bash
+[global]
+ # disable the local database
+ memory mode = none
+
+[health]
+ # disable health checks
+ enabled = no
+```
+
+_`netdata.conf` configuration on child nodes, to disable the local database and health checks._
+
+Keep in mind that setting `memory mode = none` will also force `[health].enabled = no` (health checks require access to a local database). But you can keep the database and disable health checks if you need to. You are however sending all the metrics to the parent node, which can handle the health checking (`[health].enabled = yes`)
+
+#### Netdata unique id
+
+The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each Netdata**. This file is automatically generated, by Netdata, the first time it is started and remains unaltered forever.
+
+> If you are building an image to be used for automated provisioning of autoscaled VMs, it important to delete that file from the image, so that each instance of your image will generate its own.
+
+#### Troubleshooting metrics streaming
+
+Both parent and child nodes log information at `/var/log/netdata/error.log`.
+
+Run the following on both the parent and child nodes:
+
+```
+tail -f /var/log/netdata/error.log | grep STREAM
+```
+
+If the child manages to connect to the parent you will see something like (on the parent):
+
+```
+2017-03-09 09:38:52: netdata: INFO : STREAM [receive from [10.11.12.86]:38564]: new client connection.
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [10.11.12.86]:38564: receive thread created (task id 27721)
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [receive from [10.11.12.86]:38564]: client willing to stream metrics for host 'xxx' with machine_guid '1234567-1976-11e6-ae19-7cdd9077342a': update every = 1, history = 3600, memory mode = ram, health auto
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [receive from [10.11.12.86]:38564]: initializing communication...
+2017-03-09 09:38:52: netdata: INFO : STREAM xxx [receive from [10.11.12.86]:38564]: receiving metrics...
+```
+
+and something like this on the child:
+
+```
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: connecting...
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: initializing communication...
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: waiting response from remote netdata...
+2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: established communication - sending metrics...
+```
+
+### Archiving to a time-series database
+
+The parent Netdata node can also archive metrics, for all its child nodes, to a time-series database. At the time of
+this writing, Netdata supports:
+
+- graphite
+- opentsdb
+- prometheus
+- json document DBs
+- all the compatibles to the above (e.g. kairosdb, influxdb, etc)
+
+Check the Netdata [exporting documentation](/docs/export/external-databases.md) for configuring this.
+
+This is how such a solution will work:
+
+![Diagram showing an example configuration for archiving to a time-series
+database](https://user-images.githubusercontent.com/1153921/84291308-c2ccc600-aaf9-11ea-98a9-89ccbf3a62dd.png)
+
+### An advanced setup
+
+Netdata also supports `proxies` with and without a local database, and data retention can be different between all nodes.
+
+This means a setup like the following is also possible:
+
+<p align="center">
+<img src="https://cloud.githubusercontent.com/assets/2662304/23629551/bb1fd9c2-02c0-11e7-90f5-cab5a3ed4c53.png"/>
+</p>
+
+## Proxies
+
+A proxy is a Netdata instance that is receiving metrics from a Netdata, and streams them to another Netdata.
+
+Netdata proxies may or may not maintain a database for the metrics passing through them.
+When they maintain a database, they can also run health checks (alarms and notifications)
+for the remote host that is streaming the metrics.
+
+To configure a proxy, configure it as a receiving and a sending Netdata at the same time,
+using `stream.conf`.
+
+The sending side of a Netdata proxy, connects and disconnects to the final destination of the
+metrics, following the same pattern of the receiving side.
+
+For a practical example see [Monitoring ephemeral nodes](#monitoring-ephemeral-nodes).
+
+## Troubleshooting streaming connections
+
+This section describes the most common issues you might encounter when connecting parent and child nodes.
+
+### Slow connections between parent and child
+
+When you have a slow connection between parent and child, Netdata raises a few different errors. Most of the
+errors will appear in the child's `error.log`.
+
+```bash
+netdata ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM CHILD HOSTNAME [send to PARENT IP:PARENT PORT]: too many data pending - buffer is X bytes long,
+Y unsent - we have sent Z bytes in total, W on this connection. Closing connection to flush the data.
+```
+
+On the parent side, you may see various error messages, most commonly the following:
+
+```
+netdata ERROR : STREAM_PARENT[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : read failed: end of file
+```
+
+Another common problem in slow connections is the CHILD sending a partial message to the parent. In this case,
+the parent will write the following in its `error.log`:
+
+```
+ERROR : STREAM_RECEIVER[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : sent command 'B' which is not known by netdata, for host 'HOSTNAME'. Disabling it.
+```
+
+In this example, `B` was part of a `BEGIN` message that was cut due to connection problems.
+
+Slow connections can also cause problems when the parent misses a message and then receives a command related to the
+missed message. For example, a parent might miss a message containing the child's charts, and then doesn't know
+what to do with the `SET` message that follows. When that happens, the parent will show a message like this:
+
+```
+ERROR : STREAM_RECEIVER[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : requested a SET on chart 'CHART NAME' of host 'HOSTNAME', without a dimension. Disabling it.
+```
+
+### Child cannot connect to parent
+
+When the child can't connect to a parent for any reason (misconfiguration, networking, firewalls, parent
+down), you will see the following in the child's `error.log`.
+
+```
+ERROR : STREAM_SENDER[HOSTNAME] : Failed to connect to 'PARENT IP', port 'PARENT PORT' (errno 113, No route to host)
+```
+
+### 'Is this a Netdata?'
+
+This question can appear when Netdata starts the stream and receives an unexpected response. This error can appear when
+the parent is using SSL and the child tries to connect using plain text. You will also see this message when
+Netdata connects to another server that isn't Netdata. The complete error message will look like this:
+
+```
+ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM child HOSTNAME [send to PARENT HOSTNAME:PARENT PORT]: server is not replying properly (is it a netdata?).
+```
+
+### Stream charts wrong
+
+Chart data needs to be consistent between child and parent nodes. If there are differences between chart data on
+a parent and a child, such as gaps in metrics collection, it most often means your child's `memory mode`
+does not match the parent's. To learn more about the different ways Netdata can store metrics, and thus keep chart
+data consistent, read our [memory mode documentation](/database/README.md).
+
+### Forbidding access
+
+You may see errors about "forbidding access" for a number of reasons. It could be because of a slow connection between
+the parent and child nodes, but it could also be due to other failures. Look in your parent's `error.log` for errors
+that look like this:
+
+```
+STREAM [receive from [child HOSTNAME]:child IP]: `MESSAGE`. Forbidding access."
+```
+
+`MESSAGE` will have one of the following patterns:
+
+- `request without KEY` : The message received is incomplete and the KEY value can be API, hostname, machine GUID.
+- `API key 'VALUE' is not valid GUID`: The UUID received from child does not have the format defined in [RFC 4122]
+ (https://tools.ietf.org/html/rfc4122)
+- `machine GUID 'VALUE' is not GUID.`: This error with machine GUID is like the previous one.
+- `API key 'VALUE' is not allowed`: This stream has a wrong API key.
+- `API key 'VALUE' is not permitted from this IP`: The IP is not allowed to use STREAM with this parent.
+- `machine GUID 'VALUE' is not allowed.`: The GUID that is trying to send stream is not allowed.
+- `Machine GUID 'VALUE' is not permitted from this IP. `: The IP does not match the pattern or IP allowed to connect
+ to use stream.
+
+### Netdata could not create a stream
+
+The connection between parent and child is a stream. When the parent can't convert the initial connection into
+a stream, it will write the following message inside `error.log`:
+
+```
+file descriptor given is not a valid stream
+```
+
+After logging this error, Netdata will close the stream.
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fstreaming%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
diff --git a/streaming/receiver.c b/streaming/receiver.c
new file mode 100644
index 0000000..3ea1580
--- /dev/null
+++ b/streaming/receiver.c
@@ -0,0 +1,499 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+
+extern struct config stream_config;
+
+void destroy_receiver_state(struct receiver_state *rpt) {
+ freez(rpt->key);
+ freez(rpt->hostname);
+ freez(rpt->registry_hostname);
+ freez(rpt->machine_guid);
+ freez(rpt->os);
+ freez(rpt->timezone);
+ freez(rpt->tags);
+ freez(rpt->client_ip);
+ freez(rpt->client_port);
+ freez(rpt->program_name);
+ freez(rpt->program_version);
+#ifdef ENABLE_HTTPS
+ if(rpt->ssl.conn){
+ SSL_free(rpt->ssl.conn);
+ }
+#endif
+ freez(rpt);
+}
+
+static void rrdpush_receiver_thread_cleanup(void *ptr) {
+ static __thread int executed = 0;
+ if(!executed) {
+ executed = 1;
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
+ // the host pointer as it is unpredicable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
+ if (netdata_exit && rpt->host) {
+ rpt->exited = 1;
+ return;
+ }
+
+ // Make sure that we detach this thread and don't kill a freshly arriving receiver
+ if (!netdata_exit && rpt->host) {
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == rpt)
+ rpt->host->receiver = NULL;
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ }
+
+ info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+ destroy_receiver_state(rpt);
+ }
+}
+
+#include "../collectors/plugins.d/pluginsd_parser.h"
+
+PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+{
+ UNUSED(plugins_action);
+ char *remote_time_txt = words[1];
+ time_t remote_time = 0;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+ struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
+ if (cd->version < VERSION_GAP_FILLING ) {
+ error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd,
+ cd->version);
+ return PARSER_RC_OK; // Ignore error and continue stream
+ }
+ if (remote_time_txt && *remote_time_txt) {
+ remote_time = str2ull(remote_time_txt);
+ time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host);
+ time_t gap = 0;
+ if (prev == 0)
+ info("STREAM %s from %s: Initial connection (no gap to check), remote=%ld local=%ld slew=%ld",
+ host->hostname, cd->cmd, remote_time, now, now-remote_time);
+ else {
+ gap = now - prev;
+ info("STREAM %s from %s: Checking for gaps... remote=%ld local=%ld..%ld slew=%ld %ld-sec gap",
+ host->hostname, cd->cmd, remote_time, prev, now, remote_time - now, gap);
+ }
+ char message[128];
+ sprintf(message,"REPLICATE %ld %ld\n", remote_time - gap, remote_time);
+ int ret;
+#ifdef ENABLE_HTTPS
+ SSL *conn = host->stream_ssl.conn ;
+ if(conn && !host->stream_ssl.flags) {
+ ret = SSL_write(conn, message, strlen(message));
+ } else {
+ ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
+ }
+#else
+ ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
+#endif
+ if (ret != (int)strlen(message))
+ error("Failed to send initial timestamp - gaps may appear in charts");
+ return PARSER_RC_OK;
+ }
+ return PARSER_RC_ERROR;
+}
+
+#define CLAIMED_ID_MIN_WORDS 3
+PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+{
+ UNUSED(plugins_action);
+
+ int i;
+ uuid_t uuid;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
+ for (i = 0; words[i]; i++) ;
+ if (i != CLAIMED_ID_MIN_WORDS) {
+ error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1);
+ return PARSER_RC_ERROR;
+ }
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(words[1], uuid)) {
+ error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) {
+ error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(words[1], host->machine_guid)) {
+ error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewehere else
+ }
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ freez(host->aclk_state.claimed_id);
+ host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
+ rrdhost_aclk_state_unlock(host);
+
+ rrdpush_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
+/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
+ */
+static int receiver_read(struct receiver_state *r, FILE *fp) {
+#ifdef ENABLE_HTTPS
+ if (r->ssl.conn && !r->ssl.flags) {
+ ERR_clear_error();
+ int desired = sizeof(r->read_buffer) - r->read_len - 1;
+ int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired);
+ if (ret > 0 ) {
+ r->read_len += ret;
+ return 0;
+ }
+ // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
+ u_long err;
+ char buf[256];
+ while ((err = ERR_get_error()) != 0) {
+ ERR_error_string_n(err, buf, sizeof(buf));
+ error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf);
+ }
+ return 1;
+ }
+#endif
+ if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
+ return 1;
+ r->read_len = strlen(r->read_buffer);
+ return 0;
+}
+
+/* Produce a full line if one exists, statefully return where we start next time.
+ * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
+ */
+static char *receiver_next_line(struct receiver_state *r, int *pos) {
+ int start = *pos, scan = *pos;
+ if (scan >= r->read_len) {
+ r->read_len = 0;
+ return NULL;
+ }
+ while (scan < r->read_len && r->read_buffer[scan] != '\n')
+ scan++;
+ if (scan < r->read_len && r->read_buffer[scan] == '\n') {
+ *pos = scan+1;
+ r->read_buffer[scan] = 0;
+ return &r->read_buffer[start];
+ }
+ memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
+ r->read_len -= start;
+ return NULL;
+}
+
+
+size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
+ size_t result;
+ PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
+ user->enabled = cd->enabled;
+ user->host = rpt->host;
+ user->opaque = rpt;
+ user->cd = cd;
+ user->trust_durations = 0;
+
+ PARSER *parser = parser_init(rpt->host, user, fp, PARSER_INPUT_SPLIT);
+ parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
+ parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
+
+ if (unlikely(!parser)) {
+ error("Failed to initialize parser");
+ cd->serial_failures++;
+ freez(user);
+ return 0;
+ }
+
+ parser->plugins_action->begin_action = &pluginsd_begin_action;
+ parser->plugins_action->flush_action = &pluginsd_flush_action;
+ parser->plugins_action->end_action = &pluginsd_end_action;
+ parser->plugins_action->disable_action = &pluginsd_disable_action;
+ parser->plugins_action->variable_action = &pluginsd_variable_action;
+ parser->plugins_action->dimension_action = &pluginsd_dimension_action;
+ parser->plugins_action->label_action = &pluginsd_label_action;
+ parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
+ parser->plugins_action->chart_action = &pluginsd_chart_action;
+ parser->plugins_action->set_action = &pluginsd_set_action;
+
+ user->parser = parser;
+
+ do {
+ if (receiver_read(rpt, fp))
+ break;
+ int pos = 0;
+ char *line;
+ while ((line = receiver_next_line(rpt, &pos))) {
+ if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line)))
+ goto done;
+ }
+ rpt->last_msg_t = now_realtime_sec();
+ }
+ while(!netdata_exit);
+done:
+ result= user->count;
+ freez(user);
+ parser_destroy(parser);
+ return result;
+}
+
+
+static int rrdpush_receive(struct receiver_state *rpt)
+{
+ int history = default_rrd_history_entries;
+ RRD_MEMORY_MODE mode = default_rrd_memory_mode;
+ int health_enabled = default_health_enabled;
+ int rrdpush_enabled = default_rrdpush_enabled;
+ char *rrdpush_destination = default_rrdpush_destination;
+ char *rrdpush_api_key = default_rrdpush_api_key;
+ char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+ time_t alarms_delay = 60;
+
+ rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every);
+ if(rpt->update_every < 0) rpt->update_every = 1;
+
+ history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history);
+ history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history);
+ if(history < 5) history = 5;
+
+ mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode)));
+ mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode)));
+
+ health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled);
+ health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled);
+
+ alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay);
+ alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay);
+
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled);
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled);
+
+ rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination);
+ rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination);
+
+ rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key);
+ rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key);
+
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
+ rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+
+ (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
+
+ if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT");
+ error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
+ close(rpt->fd);
+ return 1;
+ }
+
+ if (rpt->host==NULL) {
+
+ rpt->host = rrdhost_find_or_create(
+ rpt->hostname
+ , rpt->registry_hostname
+ , rpt->machine_guid
+ , rpt->os
+ , rpt->timezone
+ , rpt->tags
+ , rpt->program_name
+ , rpt->program_version
+ , rpt->update_every
+ , history
+ , mode
+ , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+ , rrdpush_destination
+ , rrdpush_api_key
+ , rrdpush_send_charts_matching
+ , rpt->system_info
+ );
+
+ if(!rpt->host) {
+ close(rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST");
+ error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port);
+ return 1;
+ }
+
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == NULL)
+ rpt->host->receiver = rpt;
+ else {
+ error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid);
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ close(rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION");
+ return 1;
+ }
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ }
+
+ int ssl = 0;
+#ifdef ENABLE_HTTPS
+ if (rpt->ssl.conn != NULL)
+ ssl = 1;
+#endif
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
+ , rpt->hostname
+ , rpt->client_ip
+ , rpt->client_port
+ , rpt->host->hostname
+ , rpt->host->machine_guid
+ , rpt->host->rrd_update_every
+ , rpt->host->rrd_history_entries
+ , rrd_memory_mode_name(rpt->host->rrd_memory_mode)
+ , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
+ , ssl ? " SSL," : ""
+ , rpt->host->tags?rpt->host->tags:""
+ );
+#endif // NETDATA_INTERNAL_CHECKS
+
+
+ struct plugind cd = {
+ .enabled = 1,
+ .update_every = default_rrd_update_every,
+ .pid = 0,
+ .serial_failures = 0,
+ .successful_collections = 0,
+ .obsolete = 0,
+ .started_t = now_realtime_sec(),
+ .next = NULL,
+ .version = 0,
+ };
+
+ // put the client IP and port into the buffers used by plugins.d
+ snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.filename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+ snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
+
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ char initial_response[HTTP_HEADER_SIZE];
+ if (rpt->stream_version > 1) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
+ } else if (rpt->stream_version == 1) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
+ } else {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT);
+ }
+ debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
+ #ifdef ENABLE_HTTPS
+ rpt->host->stream_ssl.conn = rpt->ssl.conn;
+ rpt->host->stream_ssl.flags = rpt->ssl.flags;
+ if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+#else
+ if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
+#endif
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ close(rpt->fd);
+ return 0;
+ }
+
+ // remove the non-blocking flag from the socket
+ if(sock_delnonblock(rpt->fd) < 0)
+ error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+
+ // convert the socket to a FILE *
+ FILE *fp = fdopen(rpt->fd, "r");
+ if(!fp) {
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR");
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ close(rpt->fd);
+ return 0;
+ }
+
+ rrdhost_wrlock(rpt->host);
+/* if(rpt->host->connected_senders > 0) {
+ rrdhost_unlock(rpt->host);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ fclose(fp);
+ return 0;
+ }
+*/
+
+// rpt->host->connected_senders++;
+ rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
+
+ if(health_enabled != CONFIG_BOOLEAN_NO) {
+ if(alarms_delay > 0) {
+ rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
+ info("Postponing health checks for %ld seconds, on host '%s', because it was just connected."
+ , alarms_delay
+ , rpt->host->hostname
+ );
+ }
+ }
+ rrdhost_unlock(rpt->host);
+
+ // call the plugins.d processor to receive the metrics
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED");
+
+ cd.version = rpt->stream_version;
+
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // new slave connected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT);
+#endif
+
+ size_t count = streaming_parser(rpt, &cd, fp);
+
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
+ "DISCONNECTED");
+ error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
+ rpt->client_port, count);
+
+#ifdef ENABLE_ACLK
+ // in case we have cloud connection we inform cloud
+ // new slave connected
+ if (netdata_cloud_setting)
+ aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT);
+#endif
+
+ // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
+ if (!netdata_exit && rpt->host) {
+ rrd_rdlock();
+ rrdhost_wrlock(rpt->host);
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == rpt) {
+ rpt->host->senders_disconnected_time = now_realtime_sec();
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
+ if(health_enabled == CONFIG_BOOLEAN_AUTO)
+ rpt->host->health_enabled = 0;
+ }
+ rrdhost_unlock(rpt->host);
+ if (rpt->host->receiver == rpt) {
+ rrdpush_sender_thread_stop(rpt->host);
+ }
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
+ rrd_unlock();
+ }
+
+ // cleanup
+ fclose(fp);
+ return (int)count;
+}
+
+void *rrdpush_receiver_thread(void *ptr) {
+ netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
+
+ struct receiver_state *rpt = (struct receiver_state *)ptr;
+ info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
+
+ rrdpush_receive(rpt);
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
new file mode 100644
index 0000000..f54fc60
--- /dev/null
+++ b/streaming/rrdpush.c
@@ -0,0 +1,731 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+#include "../parser/parser.h"
+
+/*
+ * rrdpush
+ *
+ * 3 threads are involved for all stream operations
+ *
+ * 1. a random data collection thread, calling rrdset_done_push()
+ * this is called for each chart.
+ *
+ * the output of this work is kept in a BUFFER in RRDHOST
+ * the sender thread is signalled via a pipe (also in RRDHOST)
+ *
+ * 2. a sender thread running at the sending netdata
+ * this is spawned automatically on the first chart to be pushed
+ *
+ * It tries to push the metrics to the remote netdata, as fast
+ * as possible (i.e. immediately after they are collected).
+ *
+ * 3. a receiver thread, running at the receiving netdata
+ * this is spawned automatically when the sender connects to
+ * the receiver.
+ *
+ */
+
+struct config stream_config = {
+ .first_section = NULL,
+ .last_section = NULL,
+ .mutex = NETDATA_MUTEX_INITIALIZER,
+ .index = {
+ .avl_tree = {
+ .root = NULL,
+ .compar = appconfig_section_compare
+ },
+ .rwlock = AVL_LOCK_INITIALIZER
+ }
+};
+
+unsigned int default_rrdpush_enabled = 0;
+char *default_rrdpush_destination = NULL;
+char *default_rrdpush_api_key = NULL;
+char *default_rrdpush_send_charts_matching = NULL;
+#ifdef ENABLE_HTTPS
+int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
+char *netdata_ssl_ca_path = NULL;
+char *netdata_ssl_ca_file = NULL;
+#endif
+
+static void load_stream_conf() {
+ errno = 0;
+ char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
+ if(!appconfig_load(&stream_config, filename, 0, NULL)) {
+ info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ freez(filename);
+
+ filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
+ if(!appconfig_load(&stream_config, filename, 0, NULL))
+ info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ }
+ freez(filename);
+}
+
+int rrdpush_init() {
+ // --------------------------------------------------------------------
+ // load stream.conf
+ load_stream_conf();
+
+ default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
+ default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
+ default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
+ default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
+ rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
+
+
+ if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
+ error("STREAM [send]: cannot enable sending thread - information is missing.");
+ default_rrdpush_enabled = 0;
+ }
+
+#ifdef ENABLE_HTTPS
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) {
+ if (default_rrdpush_destination){
+ char *test = strstr(default_rrdpush_destination,":SSL");
+ if(test){
+ *test = 0X00;
+ netdata_use_ssl_on_stream = NETDATA_SSL_FORCE;
+ }
+ }
+ }
+
+ char *invalid_certificate = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", "no");
+
+ if ( !strcmp(invalid_certificate,"yes")){
+ if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
+ info("Netdata is configured to accept invalid SSL certificate.");
+ netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
+ }
+ }
+
+ netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", "/etc/ssl/certs/");
+ netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", "/etc/ssl/certs/certs.pem");
+#endif
+
+ return default_rrdpush_enabled;
+}
+
+// data collection happens from multiple threads
+// each of these threads calls rrdset_done()
+// which in turn calls rrdset_done_push()
+// which uses this pipe to notify the streaming thread
+// that there are more data ready to be sent
+#define PIPE_READ 0
+#define PIPE_WRITE 1
+
+// to have the remote netdata re-sync the charts
+// to its current clock, we send for this many
+// iterations a BEGIN line without microseconds
+// this is for the first iterations of each chart
+unsigned int remote_clock_resync_iterations = 60;
+
+
+static inline int should_send_chart_matching(RRDSET *st) {
+ if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+ else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
+ RRDHOST *host = st->rrdhost;
+
+ if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||
+ simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ }
+ else {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ }
+ }
+
+ return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
+}
+
+int configured_as_parent() {
+ struct section *section = NULL;
+ int is_parent = 0;
+
+ appconfig_wrlock(&stream_config);
+ for (section = stream_config.first_section; section; section = section->next) {
+ uuid_t uuid;
+
+ if (uuid_parse(section->name, uuid) != -1 &&
+ appconfig_get_boolean_by_section(section, "enabled", 0)) {
+ is_parent = 1;
+ break;
+ }
+ }
+ appconfig_unlock(&stream_config);
+
+ return is_parent;
+}
+
+// checks if the current chart definition has been sent
+static inline int need_to_send_chart_definition(RRDSET *st) {
+ rrdset_check_rdlock(st);
+
+ if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
+ return 1;
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(unlikely(!rd->exposed)) {
+ #ifdef NETDATA_INTERNAL_CHECKS
+ info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id);
+ #endif
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+// Send the current chart definition.
+// Assumes that collector thread has already called sender_start for mutex / buffer state.
+static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
+ // properly set the name for the remote end to parse it
+ char *name = "";
+ if(likely(st->name)) {
+ if(unlikely(strcmp(st->id, st->name))) {
+ // they differ
+ name = strchr(st->name, '.');
+ if(name)
+ name++;
+ else
+ name = "";
+ }
+ }
+
+ // send the chart
+ buffer_sprintf(
+ host->sender->build
+ , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
+ , st->id
+ , name
+ , st->title
+ , st->units
+ , st->family
+ , st->context
+ , rrdset_type_name(st->chart_type)
+ , st->priority
+ , st->update_every
+ , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
+ , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
+ , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
+ , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
+ , (st->plugin_name)?st->plugin_name:""
+ , (st->module_name)?st->module_name:""
+ );
+
+ // send the dimensions
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ buffer_sprintf(
+ host->sender->build
+ , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
+ , rd->id
+ , rd->name
+ , rrd_algorithm_name(rd->algorithm)
+ , rd->multiplier
+ , rd->divisor
+ , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
+ , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
+ , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
+ );
+ rd->exposed = 1;
+ }
+
+ // send the chart local custom variables
+ RRDSETVAR *rs;
+ for(rs = st->variables; rs ;rs = rs->next) {
+ if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
+ calculated_number *value = (calculated_number *) rs->value;
+
+ buffer_sprintf(
+ host->sender->build
+ , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n"
+ , rs->variable
+ , *value
+ );
+ }
+ }
+
+ st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+}
+
+// sends the current chart dimensions
+static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
+ RRDHOST *host = st->rrdhost;
+ buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
+ if (s->version >= VERSION_GAP_FILLING)
+ buffer_sprintf(host->sender->build, " %ld\n", st->last_collected_time.tv_sec);
+ else
+ buffer_strcat(host->sender->build, "\n");
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(rd->updated && rd->exposed)
+ buffer_sprintf(host->sender->build
+ , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n"
+ , rd->id
+ , rd->collected_value
+ );
+ }
+ buffer_strcat(host->sender->build, "END\n");
+}
+
+static void rrdpush_sender_thread_spawn(RRDHOST *host);
+
+// Called from the internal collectors to mark a chart obsolete.
+void rrdset_push_chart_definition_now(RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st)))
+ return;
+
+ rrdset_rdlock(st);
+ sender_start(host->sender);
+ rrdpush_send_chart_definition_nolock(st);
+ sender_commit(host->sender);
+ rrdset_unlock(st);
+}
+
+void rrdset_done_push(RRDSET *st) {
+ if(unlikely(!should_send_chart_matching(st)))
+ return;
+
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
+ rrdpush_sender_thread_spawn(host);
+
+ // Handle non-connected case
+ if(unlikely(!host->rrdpush_sender_connected)) {
+ if(unlikely(!host->rrdpush_sender_error_shown))
+ error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
+ host->rrdpush_sender_error_shown = 1;
+ return;
+ }
+ else if(unlikely(host->rrdpush_sender_error_shown)) {
+ info("STREAM %s [send]: sending metrics...", host->hostname);
+ host->rrdpush_sender_error_shown = 0;
+ }
+
+ sender_start(host->sender);
+
+ if(need_to_send_chart_definition(st))
+ rrdpush_send_chart_definition_nolock(st);
+
+ rrdpush_send_chart_metrics_nolock(st, host->sender);
+
+ // signal the sender there are more data
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+
+ sender_commit(host->sender);
+}
+
+// labels
+void rrdpush_send_labels(RRDHOST *host) {
+ if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM))
+ return;
+
+ sender_start(host->sender);
+ rrdhost_rdlock(host);
+ netdata_rwlock_rdlock(&host->labels.labels_rwlock);
+
+ struct label *label_i = host->labels.head;
+ while(label_i) {
+ buffer_sprintf(host->sender->build
+ , "LABEL \"%s\" = %d %s\n"
+ , label_i->key
+ , (int)label_i->label_source
+ , label_i->value);
+
+ label_i = label_i->next;
+ }
+
+ buffer_sprintf(host->sender->build
+ , "OVERWRITE %s\n", "labels");
+
+ netdata_rwlock_unlock(&host->labels.labels_rwlock);
+ rrdhost_unlock(host);
+ sender_commit(host->sender);
+
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+
+ host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+}
+
+void rrdpush_claimed_id(RRDHOST *host)
+{
+ if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected))
+ return;
+
+ if(host->sender->version < STREAM_VERSION_CLAIM)
+ return;
+
+ sender_start(host->sender);
+ rrdhost_aclk_state_lock(host);
+
+ buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
+
+ rrdhost_aclk_state_unlock(host);
+ sender_commit(host->sender);
+
+ // signal the sender there are more data
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+}
+
+// ----------------------------------------------------------------------------
+// rrdpush sender thread
+
+// Either the receiver lost the connection or the host is being destroyed.
+// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
+void rrdpush_sender_thread_stop(RRDHOST *host) {
+
+ netdata_mutex_lock(&host->sender->mutex);
+ netdata_thread_t thr = 0;
+
+ if(host->rrdpush_sender_spawn) {
+ info("STREAM %s [send]: signaling sending thread to stop...", host->hostname);
+
+ // signal the thread that we want to join it
+ host->rrdpush_sender_join = 1;
+
+ // copy the thread id, so that we will be waiting for the right one
+ // even if a new one has been spawn
+ thr = host->rrdpush_sender_thread;
+
+ // signal it to cancel
+ netdata_thread_cancel(host->rrdpush_sender_thread);
+ }
+
+ netdata_mutex_unlock(&host->sender->mutex);
+
+ if(thr != 0) {
+ info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
+ void *result;
+ netdata_thread_join(thr, &result);
+ info("STREAM %s [send]: sending thread has exited.", host->hostname);
+ }
+}
+
+
+// ----------------------------------------------------------------------------
+// rrdpush receiver thread
+
+void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
+ log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
+}
+
+
+static void rrdpush_sender_thread_spawn(RRDHOST *host) {
+ netdata_mutex_lock(&host->sender->mutex);
+
+ if(!host->rrdpush_sender_spawn) {
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname);
+
+ if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender))
+ error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
+ else
+ host->rrdpush_sender_spawn = 1;
+ }
+ netdata_mutex_unlock(&host->sender->mutex);
+}
+
+int rrdpush_receiver_permission_denied(struct web_client *w) {
+ // we always respond with the same message and error code
+ // to prevent an attacker from gaining info about the error
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
+ return 401;
+}
+
+int rrdpush_receiver_too_busy_now(struct web_client *w) {
+ // we always respond with the same message and error code
+ // to prevent an attacker from gaining info about the error
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
+ return 503;
+}
+
+void *rrdpush_receiver_thread(void *ptr);
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
+ info("clients wants to STREAM metrics.");
+
+ char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
+ int update_every = default_rrd_update_every;
+ uint32_t stream_version = UINT_MAX;
+ char buf[GUID_LEN + 1];
+
+ struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
+
+ while(url) {
+ char *value = mystrsep(&url, "&");
+ if(!value || !*value) continue;
+
+ char *name = mystrsep(&value, "=");
+ if(!name || !*name) continue;
+ if(!value || !*value) continue;
+
+ if(!strcmp(name, "key"))
+ key = value;
+ else if(!strcmp(name, "hostname"))
+ hostname = value;
+ else if(!strcmp(name, "registry_hostname"))
+ registry_hostname = value;
+ else if(!strcmp(name, "machine_guid"))
+ machine_guid = value;
+ else if(!strcmp(name, "update_every"))
+ update_every = (int)strtoul(value, NULL, 0);
+ else if(!strcmp(name, "os"))
+ os = value;
+ else if(!strcmp(name, "timezone"))
+ timezone = value;
+ else if(!strcmp(name, "tags"))
+ tags = value;
+ else if(!strcmp(name, "ver"))
+ stream_version = MIN((uint32_t) strtoul(value, NULL, 0), STREAMING_PROTOCOL_CURRENT_VERSION);
+ else {
+ // An old Netdata child does not have a compatible streaming protocol, map to something sane.
+ if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
+ name = "NETDATA_HOST_OS_NAME";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
+ name = "NETDATA_HOST_OS_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
+ name = "NETDATA_HOST_OS_ID_LIKE";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
+ name = "NETDATA_HOST_OS_VERSION";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
+ name = "NETDATA_HOST_OS_VERSION_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
+ name = "NETDATA_HOST_OS_DETECTION";
+ else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) {
+ stream_version = 1;
+ }
+
+ if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
+ info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
+ w->client_ip, w->client_port, name, value);
+ }
+ }
+ }
+
+ if (stream_version == UINT_MAX)
+ stream_version = 0;
+
+ if(!key || !*key) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
+ error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!hostname || !*hostname) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME");
+ error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!machine_guid || !*machine_guid) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID");
+ error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(regenerate_guid(key, buf) == -1) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID KEY");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(regenerate_guid(machine_guid, buf) == -1) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID MACHINE GUID");
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - KEY NOT ENABLED");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ {
+ SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ if(key_allow_from) {
+ if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
+ simple_pattern_free(key_allow_from);
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key);
+ return rrdpush_receiver_permission_denied(w);
+ }
+ simple_pattern_free(key_allow_from);
+ }
+ }
+
+ if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - MACHINE GUID NOT ENABLED");
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ {
+ SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ if(machine_allow_from) {
+ if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
+ simple_pattern_free(machine_allow_from);
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
+ error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ return rrdpush_receiver_permission_denied(w);
+ }
+ simple_pattern_free(machine_allow_from);
+ }
+ }
+
+ if(unlikely(web_client_streaming_rate_t > 0)) {
+ static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER;
+ static volatile time_t last_stream_accepted_t = 0;
+
+ netdata_mutex_lock(&stream_rate_mutex);
+ time_t now = now_realtime_sec();
+
+ if(unlikely(last_stream_accepted_t == 0))
+ last_stream_accepted_t = now;
+
+ if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
+ netdata_mutex_unlock(&stream_rate_mutex);
+ rrdhost_system_info_free(system_info);
+ error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
+ return rrdpush_receiver_too_busy_now(w);
+ }
+
+ last_stream_accepted_t = now;
+ netdata_mutex_unlock(&stream_rate_mutex);
+ }
+
+ /*
+ * Quick path for rejecting multiple connections. The lock taken is fine-grained - it only protects the receiver
+ * pointer within the host (if a host exists). This protects against multiple concurrent web requests hitting
+ * separate threads within the web-server and landing here. The lock guards the thread-shutdown sequence that
+ * detaches the receiver from the host. If the host is being created (first time-access) then we also use the
+ * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a
+ * lookup to the now-attached structure).
+ */
+ struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+
+ rrd_rdlock();
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
+ if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
+ host = NULL;
+ if (host) {
+ rrdhost_wrlock(host);
+ netdata_mutex_lock(&host->receiver_lock);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->senders_disconnected_time = 0;
+ if (host->receiver != NULL) {
+ time_t age = now_realtime_sec() - host->receiver->last_msg_t;
+ if (age > 30) {
+ host->receiver->shutdown = 1;
+ shutdown(host->receiver->fd, SHUT_RDWR);
+ host->receiver = NULL; // Thread holds reference to structure
+ info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is dead (%ld sec), accepting new connection.", host->hostname, w->client_ip, w->client_port, age);
+ }
+ else {
+ netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ rrd_unlock();
+ log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname,
+ "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - existing connection is active (within last %ld sec), rejecting new connection.", host->hostname, w->client_ip, w->client_port, age);
+ // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
+ buffer_flush(w->response.data);
+ buffer_strcat(w->response.data, "This GUID is already streaming to this server");
+ freez(rpt);
+ return 409;
+ }
+ }
+ host->receiver = rpt;
+ netdata_mutex_unlock(&host->receiver_lock);
+ rrdhost_unlock(host);
+ }
+ rrd_unlock();
+
+ rpt->last_msg_t = now_realtime_sec();
+
+ rpt->host = host;
+ rpt->fd = w->ifd;
+ rpt->key = strdupz(key);
+ rpt->hostname = strdupz(hostname);
+ rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
+ rpt->machine_guid = strdupz(machine_guid);
+ rpt->os = strdupz(os);
+ rpt->timezone = strdupz(timezone);
+ rpt->tags = (tags)?strdupz(tags):NULL;
+ rpt->client_ip = strdupz(w->client_ip);
+ rpt->client_port = strdupz(w->client_port);
+ rpt->update_every = update_every;
+ rpt->system_info = system_info;
+ rpt->stream_version = stream_version;
+#ifdef ENABLE_HTTPS
+ rpt->ssl.conn = w->ssl.conn;
+ rpt->ssl.flags = w->ssl.flags;
+
+ w->ssl.conn = NULL;
+ w->ssl.flags = NETDATA_SSL_START;
+#endif
+
+ if(w->user_agent && w->user_agent[0]) {
+ char *t = strchr(w->user_agent, '/');
+ if(t && *t) {
+ *t = '\0';
+ t++;
+ }
+
+ rpt->program_name = strdupz(w->user_agent);
+ if(t && *t) rpt->program_version = strdupz(t);
+ }
+
+
+
+ debug(D_SYSTEM, "starting STREAM receive thread.");
+
+ char tag[FILENAME_MAX + 1];
+ snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
+
+ if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
+ error("Failed to create new STREAM receive thread for client.");
+
+ // prevent the caller from closing the streaming socket
+ if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
+ web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
+ }
+ else {
+ if(w->ifd == w->ofd)
+ w->ifd = w->ofd = -1;
+ else
+ w->ifd = -1;
+ }
+
+ buffer_flush(w->response.data);
+ return 200;
+}
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
new file mode 100644
index 0000000..225d0c2
--- /dev/null
+++ b/streaming/rrdpush.h
@@ -0,0 +1,117 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDPUSH_H
+#define NETDATA_RRDPUSH_H 1
+
+#include "../database/rrd.h"
+#include "../libnetdata/libnetdata.h"
+#include "web/server/web_client.h"
+#include "daemon/common.h"
+
+#define CONNECTED_TO_SIZE 100
+
+// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3
+#define VERSION_GAP_FILLING 4
+#define STREAM_VERSION_CLAIM 3
+
+#define STREAMING_PROTOCOL_VERSION "1.1"
+#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
+#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
+
+#define HTTP_HEADER_SIZE 8192
+
+typedef enum {
+ RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
+ RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
+} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
+
+typedef struct {
+ char *os_name;
+ char *os_id;
+ char *os_version;
+ char *kernel_name;
+ char *kernel_version;
+} stream_encoded_t;
+
+// Thread-local storage
+ // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+
+struct sender_state {
+ RRDHOST *host;
+ pid_t task_id;
+ unsigned int overflow:1;
+ int timeout, default_port;
+ usec_t reconnect_delay;
+ char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
+ size_t begin;
+ size_t reconnects_counter;
+ size_t sent_bytes;
+ size_t sent_bytes_on_this_connection;
+ size_t send_attempts;
+ time_t last_sent_t;
+ size_t not_connected_loops;
+ // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
+ // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
+ netdata_mutex_t mutex;
+ struct circular_buffer *buffer;
+ BUFFER *build;
+ char read_buffer[512];
+ int read_len;
+ int32_t version;
+};
+
+struct receiver_state {
+ RRDHOST *host;
+ netdata_thread_t thread;
+ int fd;
+ char *key;
+ char *hostname;
+ char *registry_hostname;
+ char *machine_guid;
+ char *os;
+ char *timezone; // Unused?
+ char *tags;
+ char *client_ip; // Duplicated in pluginsd
+ char *client_port; // Duplicated in pluginsd
+ char *program_name; // Duplicated in pluginsd
+ char *program_version;
+ struct rrdhost_system_info *system_info;
+ int update_every;
+ uint32_t stream_version;
+ time_t last_msg_t;
+ char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
+ int read_len;
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl;
+#endif
+ unsigned int shutdown:1; // Tell the thread to exit
+ unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
+};
+
+
+extern unsigned int default_rrdpush_enabled;
+extern char *default_rrdpush_destination;
+extern char *default_rrdpush_api_key;
+extern char *default_rrdpush_send_charts_matching;
+extern unsigned int remote_clock_resync_iterations;
+
+extern void sender_init(struct sender_state *s, RRDHOST *parent);
+void sender_start(struct sender_state *s);
+void sender_commit(struct sender_state *s);
+extern int rrdpush_init();
+extern int configured_as_parent();
+extern void rrdset_done_push(RRDSET *st);
+extern void rrdset_push_chart_definition_now(RRDSET *st);
+extern void *rrdpush_sender_thread(void *ptr);
+extern void rrdpush_send_labels(RRDHOST *host);
+extern void rrdpush_claimed_id(RRDHOST *host);
+
+extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
+extern void rrdpush_sender_thread_stop(RRDHOST *host);
+
+extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
+extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
+
+#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
new file mode 100644
index 0000000..d55a420
--- /dev/null
+++ b/streaming/sender.c
@@ -0,0 +1,723 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+
+extern struct config stream_config;
+extern int netdata_use_ssl_on_stream;
+extern char *netdata_ssl_ca_path;
+extern char *netdata_ssl_ca_file;
+
+// Collector thread starting a transmission
+void sender_start(struct sender_state *s) {
+ netdata_mutex_lock(&s->mutex);
+ buffer_flush(s->build);
+}
+
+// Collector thread finishing a transmission
+void sender_commit(struct sender_state *s) {
+ if(cbuffer_add_unsafe(s->host->sender->buffer, buffer_tostring(s->host->sender->build),
+ s->host->sender->build->len))
+ s->overflow = 1;
+ buffer_flush(s->build);
+ netdata_mutex_unlock(&s->mutex);
+}
+
+
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+ host->rrdpush_sender_connected = 0;
+
+ if(host->rrdpush_sender_socket != -1) {
+ close(host->rrdpush_sender_socket);
+ host->rrdpush_sender_socket = -1;
+ }
+}
+
+static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
+ calculated_number *value = (calculated_number *)rv->value;
+
+ buffer_sprintf(
+ host->sender->build
+ , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n"
+ , rv->name
+ , *value
+ );
+
+ debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value);
+}
+
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
+ if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) {
+ sender_start(host->sender);
+ rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
+ sender_commit(host->sender);
+ }
+}
+
+
+static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) {
+ RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
+ RRDHOST *host = (RRDHOST *)host_ptr;
+
+ if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) {
+ rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
+
+ // return 1, so that the traversal will return the number of variables sent
+ return 1;
+ }
+
+ // returning a negative number will break the traversal
+ return 0;
+}
+
+static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
+ sender_start(host->sender);
+ int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
+ (void)ret;
+ sender_commit(host->sender);
+
+ debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+}
+
+// resets all the chart, so that their definitions
+// will be resent to the central netdata
+static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
+ rrdhost_rdlock(host);
+
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+
+ st->upstream_resync_time = 0;
+
+ rrdset_rdlock(st);
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st)
+ rd->exposed = 0;
+
+ rrdset_unlock(st);
+ }
+
+ rrdhost_unlock(host);
+}
+
+static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
+ netdata_mutex_lock(&host->sender->mutex);
+
+ size_t len = cbuffer_next_unsafe(host->sender->buffer, NULL);
+ if (len)
+ error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, len);
+
+ cbuffer_remove_unsafe(host->sender->buffer, len);
+ netdata_mutex_unlock(&host->sender->mutex);
+
+ rrdpush_sender_thread_reset_all_charts(host);
+ rrdpush_sender_thread_send_custom_host_variables(host);
+}
+
+static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
+ host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM;
+ host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+}
+
+void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
+{
+ se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):"";
+ se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):"";
+ se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):"";
+ se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):"";
+ se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):"";
+}
+
+void rrdpush_clean_encoded(stream_encoded_t *se)
+{
+ if (se->os_name)
+ freez(se->os_name);
+
+ if (se->os_id)
+ freez(se->os_id);
+
+ if (se->os_version)
+ freez(se->os_version);
+
+ if (se->kernel_name)
+ freez(se->kernel_name);
+
+ if (se->kernel_version)
+ freez(se->kernel_version);
+}
+
+static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout,
+ struct sender_state *s) {
+
+ struct timeval tv = {
+ .tv_sec = timeout,
+ .tv_usec = 0
+ };
+
+ // make sure the socket is closed
+ rrdpush_sender_thread_close_socket(host);
+
+ debug(D_STREAM, "STREAM: Attempting to connect...");
+ info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
+
+ host->rrdpush_sender_socket = connect_to_one_of(
+ host->rrdpush_send_destination
+ , default_port
+ , &tv
+ , &s->reconnects_counter
+ , s->connected_to
+ , sizeof(s->connected_to)-1
+ );
+
+ if(unlikely(host->rrdpush_sender_socket == -1)) {
+ error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination);
+ return 0;
+ }
+
+ info("STREAM %s [send to %s]: initializing communication...", host->hostname, s->connected_to);
+
+#ifdef ENABLE_HTTPS
+ if( netdata_client_ctx ){
+ host->ssl.flags = NETDATA_SSL_START;
+ if (!host->ssl.conn){
+ host->ssl.conn = SSL_new(netdata_client_ctx);
+ if(!host->ssl.conn){
+ error("Failed to allocate SSL structure.");
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+ }
+ else{
+ SSL_clear(host->ssl.conn);
+ }
+
+ if (host->ssl.conn)
+ {
+ if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) {
+ error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ } else{
+ host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
+ }
+ }
+ }
+ else {
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+#endif
+
+ /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
+ version negotiation resulted in a high enough version.
+ */
+ stream_encoded_t se;
+ rrdpush_encode_variable(&se, host);
+
+ char http[HTTP_HEADER_SIZE + 1];
+ int eol = snprintfz(http, HTTP_HEADER_SIZE,
+ "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s&ver=%u"
+ "&NETDATA_SYSTEM_OS_NAME=%s"
+ "&NETDATA_SYSTEM_OS_ID=%s"
+ "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
+ "&NETDATA_SYSTEM_OS_VERSION=%s"
+ "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
+ "&NETDATA_SYSTEM_OS_DETECTION=%s"
+ "&NETDATA_HOST_IS_K8S_NODE=%s"
+ "&NETDATA_SYSTEM_KERNEL_NAME=%s"
+ "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
+ "&NETDATA_SYSTEM_ARCHITECTURE=%s"
+ "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
+ "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
+ "&NETDATA_SYSTEM_CONTAINER=%s"
+ "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
+ "&NETDATA_CONTAINER_OS_NAME=%s"
+ "&NETDATA_CONTAINER_OS_ID=%s"
+ "&NETDATA_CONTAINER_OS_ID_LIKE=%s"
+ "&NETDATA_CONTAINER_OS_VERSION=%s"
+ "&NETDATA_CONTAINER_OS_VERSION_ID=%s"
+ "&NETDATA_CONTAINER_OS_DETECTION=%s"
+ "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s"
+ "&NETDATA_SYSTEM_CPU_FREQ=%s"
+ "&NETDATA_SYSTEM_TOTAL_RAM=%s"
+ "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
+ "&NETDATA_PROTOCOL_VERSION=%s"
+ " HTTP/1.1\r\n"
+ "User-Agent: %s/%s\r\n"
+ "Accept: */*\r\n\r\n"
+ , host->rrdpush_send_api_key
+ , host->hostname
+ , host->registry_hostname
+ , host->machine_guid
+ , default_rrd_update_every
+ , host->os
+ , host->timezone
+ , (host->tags) ? host->tags : ""
+ , STREAMING_PROTOCOL_CURRENT_VERSION
+ , se.os_name
+ , se.os_id
+ , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
+ , se.os_version
+ , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : ""
+ , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : ""
+ , (host->system_info->is_k8s_node) ? host->system_info->is_k8s_node : ""
+ , se.kernel_name
+ , se.kernel_version
+ , (host->system_info->architecture) ? host->system_info->architecture : ""
+ , (host->system_info->virtualization) ? host->system_info->virtualization : ""
+ , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
+ , (host->system_info->container) ? host->system_info->container : ""
+ , (host->system_info->container_detection) ? host->system_info->container_detection : ""
+ , (host->system_info->container_os_name) ? host->system_info->container_os_name : ""
+ , (host->system_info->container_os_id) ? host->system_info->container_os_id : ""
+ , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : ""
+ , (host->system_info->container_os_version) ? host->system_info->container_os_version : ""
+ , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : ""
+ , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : ""
+ , (host->system_info->host_cores) ? host->system_info->host_cores : ""
+ , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : ""
+ , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
+ , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
+ , STREAMING_PROTOCOL_VERSION
+ , host->program_name
+ , host->program_version
+ );
+ http[eol] = 0x00;
+ rrdpush_clean_encoded(&se);
+
+#ifdef ENABLE_HTTPS
+ if (!host->ssl.flags) {
+ ERR_clear_error();
+ SSL_set_connect_state(host->ssl.conn);
+ int err = SSL_connect(host->ssl.conn);
+ if (err != 1){
+ err = SSL_get_error(host->ssl.conn, err);
+ error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL));
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }else {
+ host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ }
+ }
+ else {
+ if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
+ if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
+ if ( security_test_certificate(host->ssl.conn)) {
+ error("Closing the stream connection, because the server SSL certificate is not valid.");
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+ }
+ }
+ }
+ }
+ if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
+#else
+ if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
+#endif
+ error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+
+ info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, s->connected_to);
+
+ ssize_t received;
+#ifdef ENABLE_HTTPS
+ received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
+ if(received == -1) {
+#else
+ received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
+ if(received == -1) {
+#endif
+ error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+
+ http[received] = '\0';
+ debug(D_STREAM, "Response to sender from far end: %s", http);
+ int answer = -1;
+ char *version_start = strchr(http, '=');
+ int32_t version = -1;
+ if(version_start) {
+ version_start++;
+ version = (int32_t)strtol(version_start, NULL, 10);
+ answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http));
+ if(!answer) {
+ rrdpush_set_flags_to_newest_stream(host);
+ }
+ } else {
+ answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
+ if(!answer) {
+ version = 1;
+ rrdpush_set_flags_to_newest_stream(host);
+ }
+ else {
+ answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ if(!answer) {
+ version = 0;
+ host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
+ host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ }
+ }
+ }
+
+ if(version == -1) {
+ error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(host);
+ return 0;
+ }
+ s->version = version;
+
+ info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..."
+ , host->hostname
+ , s->connected_to
+ , version);
+
+ if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to);
+
+ if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, s->connected_to);
+
+ debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
+
+ return 1;
+}
+
+static void attempt_to_connect(struct sender_state *state)
+{
+ state->send_attempts = 0;
+
+ if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
+ state->last_sent_t = now_monotonic_sec();
+
+ // reset the buffer, to properly send charts and metrics
+ rrdpush_sender_thread_data_flush(state->host);
+
+ // send from the beginning
+ state->begin = 0;
+
+ // make sure the next reconnection will be immediate
+ state->not_connected_loops = 0;
+
+ // reset the bytes we have sent for this session
+ state->sent_bytes_on_this_connection = 0;
+
+ // let the data collection threads know we are ready
+ state->host->rrdpush_sender_connected = 1;
+ }
+ else {
+ // increase the failed connections counter
+ state->not_connected_loops++;
+
+ // reset the number of bytes sent
+ state->sent_bytes_on_this_connection = 0;
+
+ // slow re-connection on repeating errors
+ sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds
+ }
+}
+
+// TCP window is open and we have data to transmit.
+void attempt_to_send(struct sender_state *s) {
+
+ rrdpush_send_labels(s->host);
+
+ struct circular_buffer *cb = s->buffer;
+
+ netdata_thread_disable_cancelability();
+ netdata_mutex_lock(&s->mutex);
+ char *chunk;
+ size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
+ debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
+ ssize_t ret;
+#ifdef ENABLE_HTTPS
+ SSL *conn = s->host->ssl.conn ;
+ if(conn && !s->host->ssl.flags) {
+ ret = SSL_write(conn, chunk, outstanding);
+ } else {
+ ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
+ }
+#else
+ ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
+#endif
+ if (likely(ret > 0)) {
+ cbuffer_remove_unsafe(s->buffer, ret);
+ s->sent_bytes_on_this_connection += ret;
+ s->sent_bytes += ret;
+ debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret);
+ s->last_sent_t = now_monotonic_sec();
+ }
+ else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
+ debug(D_STREAM, "STREAM %s [send to %s]: unavailable aftering polling POLLOUT", s->host->hostname,
+ s->connected_to);
+ else if (ret == -1) {
+ debug(D_STREAM, "STREAM: Send failed - closing socket...");
+ error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", s->host->hostname, s->connected_to, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ else {
+ debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
+ }
+
+ netdata_mutex_unlock(&s->mutex);
+ netdata_thread_enable_cancelability();
+}
+
+void attempt_read(struct sender_state *s) {
+int ret;
+#ifdef ENABLE_HTTPS
+ if (s->host->ssl.conn && !s->host->stream_ssl.flags) {
+ ERR_clear_error();
+ int desired = sizeof(s->read_buffer) - s->read_len - 1;
+ ret = SSL_read(s->host->ssl.conn, s->read_buffer, desired);
+ if (ret > 0 ) {
+ s->read_len += ret;
+ return;
+ }
+ int sslerrno = SSL_get_error(s->host->ssl.conn, desired);
+ if (sslerrno == SSL_ERROR_WANT_READ || sslerrno == SSL_ERROR_WANT_WRITE)
+ return;
+ u_long err;
+ char buf[256];
+ while ((err = ERR_get_error()) != 0) {
+ ERR_error_string_n(err, buf, sizeof(buf));
+ error("STREAM %s [send to %s] ssl error: %s", s->host->hostname, s->connected_to, buf);
+ }
+ error("Restarting connection");
+ rrdpush_sender_thread_close_socket(s->host);
+ return;
+ }
+#endif
+ ret = recv(s->host->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,
+ MSG_DONTWAIT);
+ if (ret>0) {
+ s->read_len += ret;
+ return;
+ }
+ debug(D_STREAM, "Socket was POLLIN, but req %zu bytes gave %d", sizeof(s->read_buffer) - s->read_len - 1, ret);
+ if (ret<0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
+ return;
+ if (ret==0)
+ error("STREAM %s [send to %s]: connection closed by far end. Restarting connection", s->host->hostname, s->connected_to);
+ else
+ error("STREAM %s [send to %s]: error during read (%d). Restarting connection", s->host->hostname, s->connected_to,
+ ret);
+ rrdpush_sender_thread_close_socket(s->host);
+}
+
+// This is just a placeholder until the gap filling state machine is inserted
+void execute_commands(struct sender_state *s) {
+ char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
+ *end = 0;
+ while( start<end && (newline=strchr(start, '\n')) ) {
+ *newline = 0;
+ info("STREAM %s [send to %s] received command over connection: %s", s->host->hostname, s->connected_to, start);
+ start = newline+1;
+ }
+ if (start<end) {
+ memmove(s->read_buffer, start, end-start);
+ s->read_len = end-start;
+ }
+}
+
+
+static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
+ RRDHOST *host = (RRDHOST *)ptr;
+
+ netdata_mutex_lock(&host->sender->mutex);
+
+ info("STREAM %s [send]: sending thread cleans up...", host->hostname);
+
+ rrdpush_sender_thread_close_socket(host);
+
+ // close the pipe
+ if(host->rrdpush_sender_pipe[PIPE_READ] != -1) {
+ close(host->rrdpush_sender_pipe[PIPE_READ]);
+ host->rrdpush_sender_pipe[PIPE_READ] = -1;
+ }
+
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) {
+ close(host->rrdpush_sender_pipe[PIPE_WRITE]);
+ host->rrdpush_sender_pipe[PIPE_WRITE] = -1;
+ }
+
+ if(!host->rrdpush_sender_join) {
+ info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
+ netdata_thread_detach(netdata_thread_self());
+ }
+
+ host->rrdpush_sender_spawn = 0;
+
+ info("STREAM %s [send]: sending thread now exits.", host->hostname);
+
+ netdata_mutex_unlock(&host->sender->mutex);
+}
+
+void sender_init(struct sender_state *s, RRDHOST *parent) {
+ memset(s, 0, sizeof(*s));
+ s->host = parent;
+ s->buffer = cbuffer_new(1024, 1024*1024);
+ s->build = buffer_create(1);
+ netdata_mutex_init(&s->mutex);
+}
+
+void *rrdpush_sender_thread(void *ptr) {
+ struct sender_state *s = ptr;
+ s->task_id = gettid();
+
+ if(!s->host->rrdpush_send_enabled || !s->host->rrdpush_send_destination ||
+ !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
+ !*s->host->rrdpush_send_api_key) {
+ error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
+ s->host->hostname, s->task_id);
+ return NULL;
+ }
+
+#ifdef ENABLE_HTTPS
+ if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){
+ security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
+ security_location_for_context(netdata_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+ }
+#endif
+
+ info("STREAM %s [send]: thread created (task id %d)", s->host->hostname, s->task_id);
+
+ s->timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
+ s->default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
+ s->buffer->max_size =
+ (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
+ s->reconnect_delay =
+ (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
+ remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM,
+ "initial clock resync iterations",
+ remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
+
+ // initialize rrdpush globals
+ s->host->rrdpush_sender_connected = 0;
+ if(pipe(s->host->rrdpush_sender_pipe) == -1) {
+ error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
+ return NULL;
+ }
+
+ enum {
+ Collector,
+ Socket
+ };
+ struct pollfd fds[2];
+ fds[Collector].fd = s->host->rrdpush_sender_pipe[PIPE_READ];
+ fds[Collector].events = POLLIN;
+
+ netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, s->host);
+ for(; s->host->rrdpush_send_enabled && !netdata_exit ;) {
+ // check for outstanding cancellation requests
+ netdata_thread_testcancel();
+
+ // The connection attempt blocks (after which we use the socket in nonblocking)
+ if(unlikely(s->host->rrdpush_sender_socket == -1)) {
+ s->overflow = 0;
+ s->read_len = 0;
+ s->buffer->read = 0;
+ s->buffer->write = 0;
+ attempt_to_connect(s);
+ if (s->version >= VERSION_GAP_FILLING) {
+ time_t now = now_realtime_sec();
+ sender_start(s);
+ buffer_sprintf(s->build, "TIMESTAMP %ld", now);
+ sender_commit(s);
+ }
+ rrdpush_claimed_id(s->host);
+ continue;
+ }
+
+ // If the TCP window never opened then something is wrong, restart connection
+ if(unlikely(now_monotonic_sec() - s->last_sent_t > s->timeout)) {
+ error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", s->host->hostname, s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
+ rrdpush_sender_thread_close_socket(s->host);
+ continue;
+ }
+
+ // Wait until buffer opens in the socket or a rrdset_done_push wakes us
+ fds[Collector].revents = 0;
+ fds[Socket].revents = 0;
+ fds[Socket].fd = s->host->rrdpush_sender_socket;
+
+ netdata_mutex_lock(&s->mutex);
+ char *chunk;
+ size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk);
+ chunk = NULL; // Do not cache pointer outside of region - could be invalidated
+ netdata_mutex_unlock(&s->mutex);
+ if(outstanding) {
+ s->send_attempts++;
+ fds[Socket].events = POLLIN | POLLOUT;
+ }
+ else {
+ fds[Socket].events = POLLIN;
+ }
+
+ int retval = poll(fds, 2, 1000);
+ debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
+ fds[Collector].revents, fds[Socket].revents, outstanding);
+ if(unlikely(netdata_exit)) break;
+
+ // Spurious wake-ups without error - loop again
+ if (retval == 0 || ((retval == -1) && (errno == EAGAIN || errno == EINTR)))
+ {
+ debug(D_STREAM, "Spurious wakeup");
+ continue;
+ }
+ // Only errors from poll() are internal, but try restarting the connection
+ if(unlikely(retval == -1)) {
+ error("STREAM %s [send to %s]: failed to poll(). Closing socket.", s->host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(s->host);
+ continue;
+ }
+
+ // If the collector woke us up then empty the pipe to remove the signal
+ if (fds[Collector].revents & POLLIN || fds[Collector].revents & POLLPRI) {
+ debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
+
+ char buffer[1000 + 1];
+ if (read(s->host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1)
+ error("STREAM %s [send to %s]: cannot read from internal pipe.", s->host->hostname, s->connected_to);
+ }
+
+ // Read as much as possible to fill the buffer, split into full lines for execution.
+ if (fds[Socket].revents & POLLIN)
+ attempt_read(s);
+ execute_commands(s);
+
+ // If we have data and have seen the TCP window open then try to close it by a transmission.
+ if (outstanding && fds[Socket].revents & POLLOUT)
+ attempt_to_send(s);
+
+ // TODO-GAPS - why do we only check this on the socket, not the pipe?
+ if (outstanding) {
+ char *error = NULL;
+ if (unlikely(fds[Socket].revents & POLLERR))
+ error = "socket reports errors (POLLERR)";
+ else if (unlikely(fds[Socket].revents & POLLHUP))
+ error = "connection closed by remote end (POLLHUP)";
+ else if (unlikely(fds[Socket].revents & POLLNVAL))
+ error = "connection is invalid (POLLNVAL)";
+ if(unlikely(error)) {
+ error("STREAM %s [send to %s]: restart stream because %s - %zu bytes transmitted.", s->host->hostname,
+ s->connected_to, error, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ }
+
+ // protection from overflow
+ if (s->overflow) {
+ errno = 0;
+ error("STREAM %s [send to %s]: buffer full (%zu-bytes) after %zu bytes. Restarting connection",
+ s->host->hostname, s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
+ rrdpush_sender_thread_close_socket(s->host);
+ }
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
diff --git a/streaming/stream.conf b/streaming/stream.conf
new file mode 100644
index 0000000..b514263
--- /dev/null
+++ b/streaming/stream.conf
@@ -0,0 +1,205 @@
+# netdata configuration for aggregating data from remote hosts
+#
+# API keys authorize a pair of sending-receiving netdata servers.
+# Once their communication is authorized, they can exchange metrics for any
+# number of hosts.
+#
+# You can generate API keys, with the linux command: uuidgen
+
+
+# -----------------------------------------------------------------------------
+# 1. ON CHILD NETDATA - THE ONE THAT WILL BE SENDING METRICS
+
+[stream]
+ # Enable this on child nodes, to have them send metrics.
+ enabled = no
+
+ # Where is the receiving netdata?
+ # A space separated list of:
+ #
+ # [PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]
+ #
+ # If many are given, the first available will get the metrics.
+ #
+ # PROTOCOL = tcp, udp, or unix (only tcp and unix are supported by parent nodes)
+ # HOST = an IPv4, IPv6 IP, or a hostname, or a unix domain socket path.
+ # IPv6 IPs should be given with brackets [ip:address]
+ # INTERFACE = the network interface to use (only for IPv6)
+ # PORT = the port number or service name (/etc/services)
+ # SSL = when this word appear at the end of the destination string
+ # the Netdata will encrypt the connection with the parent.
+ #
+ # This communication is not HTTP (it cannot be proxied by web proxies).
+ destination =
+
+ # Skip Certificate verification?
+ #
+ # The netdata child is configurated to avoid invalid SSL/TLS certificate,
+ # so certificates that are self-signed or expired will stop the streaming.
+ # Case the server certificate is not valid, you can enable the use of
+ # 'bad' certificates setting the next option as 'yes'.
+ #
+ #ssl skip certificate verification = yes
+
+ # Certificate Authority Path
+ #
+ # OpenSSL has a default directory where the known certificates are stored,
+ # case it is necessary it is possible to change this rule using the variable
+ # "CApath"
+ #
+ #CApath = /etc/ssl/certs/
+
+ # Certificate Authority file
+ #
+ # When the Netdata parent has certificate, that is not recognized as valid,
+ # we can add this certificate in the list of known certificates in CApath
+ # and give for Netdata as argument.
+ #
+ #CAfile = /etc/ssl/certs/cert.pem
+
+ # The API_KEY to use (as the sender)
+ api key =
+
+ # The timeout to connect and send metrics
+ timeout seconds = 60
+
+ # If the destination line above does not specify a port, use this
+ default port = 19999
+
+ # filter the charts to be streamed
+ # netdata SIMPLE PATTERN:
+ # - space separated list of patterns (use \ to include spaces in patterns)
+ # - use * as wildcard, any number of times within each pattern
+ # - prefix a pattern with ! for a negative match (ie not stream the charts it matches)
+ # - the order of patterns is important (left to right)
+ # To send all except a few, use: !this !that * (ie append a wildcard pattern)
+ send charts matching = *
+
+ # The buffer to use for sending metrics.
+ # 1MB is good for 10-20 seconds of data, so increase this if you expect latencies.
+ # The buffer is flushed on reconnects (this will not prevent gaps at the charts).
+ buffer size bytes = 1048576
+
+ # If the connection fails, or it disconnects,
+ # retry after that many seconds.
+ reconnect delay seconds = 5
+
+ # Sync the clock of the charts for that many iterations, when starting.
+ initial clock resync iterations = 60
+
+# -----------------------------------------------------------------------------
+# 2. ON PARENT NETDATA - THE ONE THAT WILL BE RECEIVING METRICS
+
+# You can have one API key per child,
+# or the same API key for all child nodes.
+#
+# netdata searches for options in this order:
+#
+# a) parent netdata settings (netdata.conf)
+# b) [stream] section (above)
+# c) [API_KEY] section (below, settings for the API key)
+# d) [MACHINE_GUID] section (below, settings for each machine)
+#
+# You can combine the above (the more specific setting will be used).
+
+# API key authentication
+# If the key is not listed here, it will not be able to push metrics.
+
+# [API_KEY] is [YOUR-API-KEY], i.e [11111111-2222-3333-4444-555555555555]
+[API_KEY]
+ # Default settings for this API key
+
+ # You can disable the API key, by setting this to: no
+ # The default (for unknown API keys) is: no
+ enabled = no
+
+ # A list of simple patterns matching the IPs of the servers that
+ # will be pushing metrics using this API key.
+ # The metrics are received via the API port, so the same IPs
+ # should also be matched at netdata.conf [web].allow connections from
+ allow from = *
+
+ # The default history in entries, for all hosts using this API key.
+ # You can also set it per host below.
+ # If you don't set it here, the history size of the central netdata
+ # will be used.
+ default history = 3600
+
+ # The default memory mode to be used for all hosts using this API key.
+ # You can also set it per host below.
+ # If you don't set it here, the memory mode of netdata.conf will be used.
+ # Valid modes:
+ # save save on exit, load on start
+ # map like swap (continuously syncing to disks - you need SSD)
+ # ram keep it in RAM, don't touch the disk
+ # none no database at all (use this on headless proxies)
+ # dbengine like a traditional database
+ default memory mode = ram
+
+ # Shall we enable health monitoring for the hosts using this API key?
+ # 3 possible values:
+ # yes enable alarms
+ # no do not enable alarms
+ # auto enable alarms, only when the sending netdata is connected. For ephemeral child nodes or child system restarts,
+ # ensure that the netdata process on the child is gracefully stopped, to prevent invalid last_collected alarms
+ # You can also set it per host, below.
+ # The default is taken from [health].enabled of netdata.conf
+ health enabled by default = auto
+
+ # postpone alarms for a short period after the sender is connected
+ default postpone alarms on connect seconds = 60
+
+ # need to route metrics differently? set these.
+ # the defaults are the ones at the [stream] section (above)
+ #default proxy enabled = yes | no
+ #default proxy destination = IP:PORT IP:PORT ...
+ #default proxy api key = API_KEY
+ #default proxy send charts matching = *
+
+
+# -----------------------------------------------------------------------------
+# 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA
+# THIS IS OPTIONAL - YOU DON'T HAVE TO CONFIGURE IT
+
+# This section exists to give you finer control of the parent settings for each
+# child host, when the same API key is used by many netdata child nodes / proxies.
+#
+# Each netdata has a unique GUID - generated the first time netdata starts.
+# You can find it at /var/lib/netdata/registry/netdata.public.unique.id
+# (at the child).
+#
+# The host sending data will have one. If the host is not ephemeral,
+# you can give settings for each sending host here.
+
+[MACHINE_GUID]
+ # enable this host: yes | no
+ # When disabled, the parent will not receive metrics for this host.
+ # THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID.
+ # Use only the API key for security.
+ enabled = no
+
+ # A list of simple patterns matching the IPs of the servers that
+ # will be pushing metrics using this MACHINE GUID.
+ # The metrics are received via the API port, so the same IPs
+ # should also be matched at netdata.conf [web].allow connections from
+ # and at stream.conf [API_KEY].allow from
+ allow from = *
+
+ # The number of entries in the database
+ history = 3600
+
+ # The memory mode of the database: save | map | ram | none | dbengine
+ memory mode = save
+
+ # Health / alarms control: yes | no | auto
+ health enabled = yes
+
+ # postpone alarms when the sender connects
+ postpone alarms on connect seconds = 60
+
+ # need to route metrics differently?
+ # the defaults are the ones at the [API KEY] section
+ #proxy enabled = yes | no
+ #proxy destination = IP:PORT IP:PORT ...
+ #proxy api key = API_KEY
+ #proxy send charts matching = *