summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/README.md656
-rw-r--r--streaming/receiver.c26
-rw-r--r--streaming/replication.c340
-rw-r--r--streaming/rrdpush.c142
-rw-r--r--streaming/rrdpush.h33
-rw-r--r--streaming/sender.c31
6 files changed, 643 insertions, 585 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 37d2c261e..bf11f32e4 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -1,152 +1,160 @@
----
-title: "Streaming and replication"
-description: "Replicate and mirror Netdata's metrics through real-time streaming from child to parent nodes. Then combine, correlate, and export."
-custom_edit_url: https://github.com/netdata/netdata/edit/master/streaming/README.md
----
+# Streaming and replication reference
+This document contains advanced streaming options and suggested deployment options for production.
+If you haven't already done so, we suggest you first go through the
+[quick introduction to streaming](https://github.com/netdata/netdata/blob/master/docs/metrics-storage-management/enable-streaming.md)
+, for your first, basic parent child setup.
-Each Netdata node is able to replicate/mirror its database to another Netdata node, by streaming the collected
-metrics in real-time. This is quite different to [data archiving to third party time-series
-databases](https://github.com/netdata/netdata/blob/master/exporting/README.md).
-The nodes that send metrics are called **child** nodes, and the nodes that receive metrics are called **parent** nodes.
-
-There are also **proxy** nodes, which collect metrics from a child and sends it to a parent.
-
-When one Netdata node streams metrics another, the receiving instance can use the data for all features of a typical Netdata node, for example:
-
-- Visualize metrics with a dashboard
-- Run health checks that trigger alarms and send alarm notifications
-- Export metrics to an external time-series database
-
-
-
-
-## Supported configurations
-
-### Netdata without a database or web API (headless collector)
-
-A local Netdata Agent (child), **without any database or alarms**, collects metrics and sends them to another Netdata node
-(parent).
-The same parent can collect data for any number of child nodes and serves alerts for each child.
-
-The node menu shows a list of all "databases streamed to" the parent. Clicking one of those links allows the user to
-view the full dashboard of the child node. The URL has the form
-`http://parent-host:parent-port/host/child-host/`.
+## Configuration
+There are two files responsible for configuring Netdata's streaming capabilities: `stream.conf` and `netdata.conf`.
-In a headless setup, the child acts as a plain data collector. It spawns all external plugins, but instead of maintaining a
-local database and accepting dashboard requests, it streams all metrics to the parent.
+From within your Netdata config directory (typically `/etc/netdata`), [use `edit-config`](https://github.com/netdata/netdata/blob/master/docs/configure/nodes.md) to
+open either `stream.conf` or `netdata.conf`.
-This setup works great to reduce the memory footprint. Depending on the enabled plugins, memory usage is between 6 MiB and 40 MiB. To reduce the memory usage as much as
-possible, refer to the [performance optimization guide](https://github.com/netdata/netdata/blob/master/docs/guides/configure/performance.md).
+```
+sudo ./edit-config stream.conf
+sudo ./edit-config netdata.conf
+```
+### `stream.conf`
-### Database Replication
+The `stream.conf` file contains three sections. The `[stream]` section is for configuring child nodes.
-The local Netdata Agent (child), **with a local database (and possibly alarms)**, collects metrics and
-sends them to another Netdata node (parent).
+The `[API_KEY]` and `[MACHINE_GUID]` sections are both for configuring parent nodes, and share the same settings.
+`[API_KEY]` settings affect every child node using that key, whereas `[MACHINE_GUID]` settings affect only the child
+node with a matching GUID.
-The user can use all the functions **at both** `http://child-ip:child-port/` and
-`http://parent-host:parent-port/host/child-host/`.
+The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each
+node**. This file is automatically generated by Netdata the first time it is started and remains unaltered forever.
-The child and the parent may have different data retention policies for the same metrics.
+#### `[stream]` section
-Alerts for the child are triggered by **both** the child and the parent.
-It is possible to enable different alert configurations on the parent and the child.
+| Setting | Default | Description |
+| :---------------------------------------------- | :------------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| `enabled` | `no` | Whether this node streams metrics to any parent. Change to `yes` to enable streaming. |
+| [`destination`](#destination) | ` ` | A space-separated list of parent nodes to attempt to stream to, with the first available parent receiving metrics, using the following format: `[PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]`. [Read more →](#destination) |
+| `ssl skip certificate verification` | `yes` | If you want to accept self-signed or expired certificates, set to `yes` and uncomment. |
+| `CApath` | `/etc/ssl/certs/` | The directory where known certificates are found. Defaults to OpenSSL's default path. |
+| `CAfile` | `/etc/ssl/certs/cert.pem` | Add a parent node certificate to the list of known certificates in `CAPath`. |
+| `api key` | ` ` | The `API_KEY` to use as the child node. |
+| `timeout seconds` | `60` | The timeout to connect and send metrics to a parent. |
+| `default port` | `19999` | The port to use if `destination` does not specify one. |
+| [`send charts matching`](#send-charts-matching) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) to filter which charts are streamed. [Read more →](#send-charts-matching) |
+| `buffer size bytes` | `10485760` | The size of the buffer to use when sending metrics. The default `10485760` equals a buffer of 10MB, which is good for 60 seconds of data. Increase this if you expect latencies higher than that. The buffer is flushed on reconnect. |
+| `reconnect delay seconds` | `5` | How long to wait until retrying to connect to the parent node. |
+| `initial clock resync iterations` | `60` | Sync the clock of charts for how many seconds when starting. |
-In order for custom chart names on the child to work correctly, follow the form `type.name`. The parent will truncate the `type` part and substitute the original chart `type` to store the name in the database.
+### `[API_KEY]` and `[MACHINE_GUID]` sections
-### Netdata proxies
+| Setting | Default | Description |
+| :---------------------------------------------- | :------------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| `enabled` | `no` | Whether this API KEY enabled or disabled. |
+| [`allow from`](#allow-from) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) matching the IPs of nodes that will stream metrics using this API key. [Read more →](#allow-from) |
+| `default history` | `3600` | The default amount of child metrics history to retain when using the `save`, `map`, or `ram` memory modes. |
+| [`default memory mode`](#default-memory-mode) | `ram` | The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `map`, `save`, `ram`, or `none`. [Read more →](#default-memory-mode) |
+| `health enabled by default` | `auto` | Whether alarms and notifications should be enabled for nodes using this `API_KEY`. `auto` enables alarms when the child is connected. `yes` enables alarms always, and `no` disables alarms. |
+| `default postpone alarms on connect seconds` | `60` | Postpone alarms and notifications for a period of time after the child connects. |
+| `default proxy enabled` | ` ` | Route metrics through a proxy. |
+| `default proxy destination` | ` ` | Space-separated list of `IP:PORT` for proxies. |
+| `default proxy api key` | ` ` | The `API_KEY` of the proxy. |
+| `default send charts matching` | `*` | See [`send charts matching`](#send-charts-matching). |
-The local Netdata Agent(child), with or without a database, collects metrics and sends them to another
-Netdata node(**proxy**), which may or may not maintain a database, which forwards them to another
-Netdata (parent).
+#### `destination`
-Alerts for the child can be triggered by any of the involved hosts that maintains a database.
+A space-separated list of parent nodes to attempt to stream to, with the first available parent receiving metrics, using
+the following format: `[PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]`.
-You can daisy-chain any number of Netdata, each with or without a database and
-with or without alerts for the child metrics.
+- `PROTOCOL`: `tcp`, `udp`, or `unix`. (only tcp and unix are supported by parent nodes)
+- `HOST`: A IPv4, IPv6 IP, or a hostname, or a unix domain socket path. IPv6 IPs should be given with brackets
+ `[ip:address]`.
+- `INTERFACE` (IPv6 only): The network interface to use.
+- `PORT`: The port number or service name (`/etc/services`) to use.
+- `SSL`: To enable TLS/SSL encryption of the streaming connection.
-### Mix and match with exporting engine
+To enable TCP streaming to a parent node at `203.0.113.0` on port `20000` and with TLS/SSL encryption:
-All nodes that maintain a database can also send their data to an external database.
-This allows quite complex setups.
+```conf
+[stream]
+ destination = tcp:203.0.113.0:20000:SSL
+```
-Example:
+#### `send charts matching`
-1. Netdata nodes `A` and `B` do not maintain a database and stream metrics to Netdata node `C`(live streaming functionality).
-2. Netdata node `C` maintains a database for `A`, `B`, `C` and archives all metrics to `graphite` with 10 second detail (exporting functionality).
-3. Netdata node `C` also streams data for `A`, `B`, `C` to Netdata `D`, which also collects data from `E`, `F` and `G` from another DMZ (live streaming functionality).
-4. Netdata node `D` is just a proxy, without a database, that streams all data to a remote site at Netdata `H`.
-5. Netdata node `H` maintains a database for `A`, `B`, `C`, `D`, `E`, `F`, `G`, `H` and sends all data to `opentsdb` with 5 seconds detail (exporting functionality)
-6. Alerts are triggered by `H` for all hosts.
-7. Users can use all Netdata nodes that maintain a database to view metrics (i.e. at `H` all hosts can be viewed).
+A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) to filter which charts are streamed.
-## Configuration
+The default is a single wildcard `*`, which streams all charts.
-The following options affect how Netdata streams:
+To send only a few charts, list them explicitly, or list a group using a wildcard. To send _only_ the `apps.cpu` chart
+and charts with contexts beginning with `system.`:
-```
-[global]
- memory mode = none | ram | save | map | dbengine
+```conf
+[stream]
+ send charts matching = apps.cpu system.*
```
-`[global].memory mode = none` disables the database at this host. This also disables health
-monitoring because a node can't have health monitoring without a database.
+To send all but a few charts, use `!` to create a negative match. To send _all_ charts _but_ `apps.cpu`:
+```conf
+[stream]
+ send charts matching = !apps.cpu *
```
-[web]
- mode = none | static-threaded
- accept a streaming request every seconds = 0
-```
-
-`[web].mode = none` disables the API (Netdata will not listen to any ports).
-This also disables the registry (there cannot be a registry without an API).
-`accept a streaming request every seconds` can be used to set a limit on how often a parent node will accept streaming
-requests from its child nodes. 0 sets no limit, 1 means maximum once every second. If this is set, you may see error log
-entries "... too busy to accept new streaming request. Will be allowed in X secs".
+#### `allow from`
-You can [use](https://github.com/netdata/netdata/blob/master/exporting/README.md#configuration) the exporting engine to configure data archiving to an external database (it archives all databases maintained on
-this host).
+A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) matching the IPs of nodes that
+will stream metrics using this API key. The order is important, left to right, as the first positive or negative match is used.
-### Streaming configuration
+The default is `*`, which accepts all requests including the `API_KEY`.
-The new file `stream.conf` contains streaming configuration for a sending and a receiving Netdata node.
+To allow from only a specific IP address:
-To configure streaming on your system:
-1. Generate an API key using `uuidgen`. Note: API keys are just random GUIDs. You can use the same API key on all your Netdata, or use a different API key for any pair of sending-receiving Netdata nodes.
+```conf
+[API_KEY]
+ allow from = 203.0.113.10
+```
-2. Authorize the communication between a pair of sending-receiving Netdata nodes using the generated API key.
-Once the communication is authorized, the sending Netdata node can push metrics for any number of hosts.
+To allow all IPs starting with `10.*`, except `10.1.2.3`:
-3. To edit `stream.conf`, run `/etc/netdata/edit-config stream.conf`
+```conf
+[API_KEY]
+ allow from = !10.1.2.3 10.*
+```
-The following sections describe how you can configure sending and receiving Netdata nodes.
+> If you set specific IP addresses here, and also use the `allow connections` setting in the `[web]` section of
+> `netdata.conf`, be sure to add the IP address there so that it can access the API port.
+#### `default memory mode`
-
+The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `ram`,
+`save`, `map`, or `none`.
+- `dbengine`: The default, recommended time-series database (TSDB) for Netdata. Stores recent metrics in memory, then
+ efficiently spills them to disk for long-term storage.
+- `ram`: Stores metrics _only_ in memory, which means metrics are lost when Netdata stops or restarts. Ideal for
+ streaming configurations that use ephemeral nodes.
+- `save`: Stores metrics in memory, but saves metrics to disk when Netdata stops or restarts, and loads historical
+ metrics on start.
+- `map`: Stores metrics in memory-mapped files, like swap, with constant disk write.
+- `none`: No database.
-##### Options for the sending node
+When using `default memory mode = dbengine`, the parent node creates a separate instance of the TSDB to store metrics
+from child nodes. The [size of _each_ instance is configurable](https://github.com/netdata/netdata/blob/master/docs/store/change-metrics-storage.md) with the `page
+cache size` and `dbengine multihost disk space` settings in the `[global]` section in `netdata.conf`.
-This is the section for the sending Netdata node. On the receiving node, `[stream].enabled` can be `no`.
-If it is `yes`, the receiving node will also stream the metrics to another node (i.e. it will be
-a proxy).
+### `netdata.conf`
-```
-[stream]
- enabled = yes | no
- destination = IP:PORT[:SSL] ...
- api key = XXXXXXXXXXX
+| Setting | Default | Description |
+| :----------------------------------------- | :---------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| **`[global]` section** | | |
+| `memory mode` | `dbengine` | Determines the [database type](https://github.com/netdata/netdata/blob/master/database/README.md) to be used on that node. Other options settings include `none`, `ram`, `save`, and `map`. `none` disables the database at this host. This also disables alarms and notifications, as those can't run without a database. |
+| **`[web]` section** | | |
+| `mode` | `static-threaded` | Determines the [web server](https://github.com/netdata/netdata/blob/master/web/server/README.md) type. The other option is `none`, which disables the dashboard, API, and registry. |
+| `accept a streaming request every seconds` | `0` | Set a limit on how often a parent node accepts streaming requests from child nodes. `0` equals no limit. If this is set, you may see `... too busy to accept new streaming request. Will be allowed in X secs` in Netdata's `error.log`. |
-[API_KEY]
- enabled = yes | no
+### Basic use cases
-[MACHINE_GUID]
- enabled = yes | no
-```
-This is an overview of how these options can be combined:
+This is an overview of how the main options can be combined:
| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|exporting|alarms|dashboard|
|------|:-------------:|:----------:|:----------------:|:-----:|:----:|:-------:|
@@ -155,171 +163,26 @@ This is an overview of how these options can be combined:
| proxy with db|not `none`|not `none`|`yes`|possible|possible|yes|
| central netdata|not `none`|not `none`|`no`|possible|possible|yes|
-For the options to encrypt the data stream between the child and the parent, refer to [securing the communication](#securing-streaming-communications)
-
+### Per-child settings
-##### Options for the receiving node
+While the `[API_KEY]` section applies settings for any child node using that key, you can also use per-child settings
+with the `[MACHINE_GUID]` section.
-For a receiving Netdata node, the `stream.conf` looks like this:
+For example, the metrics streamed from only the child node with `MACHINE_GUID` are saved in memory, not using the
+default `dbengine` as specified by the `API_KEY`, and alarms are disabled.
-```sh
-# replace API_KEY with your uuidgen generated GUID
+```conf
[API_KEY]
enabled = yes
- default history = 3600
- default memory mode = save
+ default memory mode = dbengine
health enabled by default = auto
allow from = *
-```
-
-You can add many such sections, one for each API key. The above are used as default values for
-all hosts pushed with this API key.
-
-You can also add sections like this:
-```sh
-# replace MACHINE_GUID with the child /var/lib/netdata/registry/netdata.public.unique.id
[MACHINE_GUID]
enabled = yes
- history = 3600
memory mode = save
- health enabled = yes
- allow from = *
-```
-
-The above is the parent configuration of a single host, at the parent end. `MACHINE_GUID` is
-the unique id the Netdata generating the metrics (i.e. the Netdata that originally collects
-them `/var/lib/netdata/registry/netdata.unique.id`). So, metrics for Netdata `A` that pass through
-any number of other Netdata, will have the same `MACHINE_GUID`.
-
-You can also use `default memory mode = dbengine` for an API key or `memory mode = dbengine` for
- a single host. The additional `page cache size` and `dbengine multihost disk space` configuration options
- are inherited from the global Netdata configuration.
-
-##### Allow from
-
-`allow from` settings are [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md): string matches
-that use `*` as wildcard (any number of times) and a `!` prefix for a negative match.
-So: `allow from = !10.1.2.3 10.*` will allow all IPs in `10.*` except `10.1.2.3`. The order is
-important: left to right, the first positive or negative match is used.
-
-
-##### Tracing
-
-When a child is trying to push metrics to a parent or proxy, it logs entries like these:
-
+ health enabled = no
```
-2017-02-25 01:57:44: netdata: ERROR: Failed to connect to '10.11.12.1', port '19999' (errno 111, Connection refused)
-2017-02-25 01:57:44: netdata: ERROR: STREAM costa-pc [send to 10.11.12.1:19999]: failed to connect
-2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: initializing communication...
-2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: waiting response from remote netdata...
-2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [send to 10.11.12.1:19999]: established communication - sending metrics...
-2017-02-25 01:58:14: netdata: ERROR: STREAM costa-pc [send]: discarding 1900 bytes of metrics already in the buffer.
-2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [send]: ready - sending metrics...
-```
-
-The receiving end (proxy or parent) logs entries like these:
-
-```
-2017-02-25 01:58:04: netdata: INFO : STREAM [receive from [10.11.12.11]:33554]: new client connection.
-2017-02-25 01:58:04: netdata: INFO : STREAM costa-pc [10.11.12.11]:33554: receive thread created (task id 7698)
-2017-02-25 01:58:14: netdata: INFO : Host 'costa-pc' with guid '12345678-b5a6-11e6-8a50-00508db7e9c9' initialized, os: linux, update every: 1, memory mode: ram, history entries: 3600, streaming: disabled, health: enabled, cache_dir: '/var/cache/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9', varlib_dir: '/var/lib/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9', health_log: '/var/lib/netdata/12345678-b5a6-11e6-8a50-00508db7e9c9/health/health-log.db', alarms default handler: '/usr/libexec/netdata/plugins.d/alarm-notify.sh', alarms default recipient: 'root'
-2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [receive from [10.11.12.11]:33554]: initializing communication...
-2017-02-25 01:58:14: netdata: INFO : STREAM costa-pc [receive from [10.11.12.11]:33554]: receiving metrics...
-```
-
-For Netdata v1.9+, streaming can also be monitored via `access.log`.
-
-### Securing streaming communications
-
-Netdata does not activate TLS encryption by default. To encrypt streaming connections:
-1. On the parent node (receiving node), [enable TLS support](https://github.com/netdata/netdata/blob/master/web/server/README.md#enabling-tls-support).
-2. On the child's `stream.conf`, configure the destination as follows:
-
-```
-[stream]
- destination = host:port:SSL
-```
-
-The word `SSL` appended to the end of the destination tells the child that connections must be encrypted.
-
-> While Netdata uses Transport Layer Security (TLS) 1.2 to encrypt communications rather than the obsolete SSL protocol,
-> it's still common practice to refer to encrypted web connections as `SSL`. Many vendors, like Nginx and even Netdata
-> itself, use `SSL` in configuration files, whereas documentation will always refer to encrypted communications as `TLS`
-> or `TLS/SSL`.
-
-#### Certificate verification
-
-When TLS/SSL is enabled on the child, the default behavior will be to not connect with the parent unless the server's certificate can be verified via the default chain. In case you want to avoid this check, add the following to the child's `stream.conf` file:
-
-```
-[stream]
- ssl skip certificate verification = yes
-```
-
-#### Trusted certificate
-
-If you've enabled [certificate verification](#certificate-verification), you might see errors from the OpenSSL library when there's a problem with checking the certificate chain (`X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY`). More importantly, OpenSSL will reject self-signed certificates.
-
-Given these known issues, you have two options. If you trust your certificate, you can set the options `CApath` and `CAfile` to inform Netdata where your certificates, and the certificate trusted file, are stored.
-
-For more details about these options, you can read about [verify locations](https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_load_verify_locations.html).
-
-Before you changed your streaming configuration, you need to copy your trusted certificate to your child system and add the certificate to OpenSSL's list.
-
-On most Linux distributions, the `update-ca-certificates` command searches inside the `/usr/share/ca-certificates` directory for certificates. You should double-check by reading the `update-ca-certificate` manual (`man update-ca-certificate`), and then change the directory in the below commands if needed.
-
-If you have `sudo` configured on your child system, you can use that to run the following commands. If not, you'll have to log in as `root` to complete them.
-
-```
-# mkdir /usr/share/ca-certificates/netdata
-# cp parent_cert.pem /usr/share/ca-certificates/netdata/parent_cert.crt
-# chown -R netdata.netdata /usr/share/ca-certificates/netdata/
-```
-
-First, you create a new directory to store your certificates for Netdata. Next, you need to change the extension on your certificate from `.pem` to `.crt` so it's compatible with `update-ca-certificate`. Finally, you need to change permissions so the user that runs Netdata can access the directory where you copied in your certificate.
-
-Next, edit the file `/etc/ca-certificates.conf` and add the following line:
-
-```
-netdata/parent_cert.crt
-```
-
-Now you update the list of certificates running the following, again either as `sudo` or `root`:
-
-```
-# update-ca-certificates
-```
-
-> Some Linux distributions have different methods of updating the certificate list. For more details, please read this
-> guide on [adding trusted root certificates](https://github.com/Busindre/How-to-Add-trusted-root-certificates).
-
-Once you update your certificate list, you can set the stream parameters for Netdata to trust the parent certificate. Open `stream.conf` for editing and change the following lines:
-
-```
-[stream]
- CApath = /etc/ssl/certs/
- CAfile = /etc/ssl/certs/parent_cert.pem
-```
-
-With this configuration, the `CApath` option tells Netdata to search for trusted certificates inside `/etc/ssl/certs`. The `CAfile` option specifies the Netdata parent certificate is located at `/etc/ssl/certs/parent_cert.pem`. With this configuration, you can skip using the system's entire list of certificates and use Netdata's parent certificate instead.
-
-#### Expected behaviors
-
-With the introduction of TLS/SSL, the parent-child communication behaves as shown in the table below, depending on the following configurations:
-
-- **Parent TLS (Yes/No)**: Whether the `[web]` section in `netdata.conf` has `ssl key` and `ssl certificate`.
-- **Parent port TLS (-/force/optional)**: Depends on whether the `[web]` section `bind to` contains a `^SSL=force` or `^SSL=optional` directive on the port(s) used for streaming.
-- **Child TLS (Yes/No)**: Whether the destination in the child's `stream.conf` has `:SSL` at the end.
-- **Child TLS Verification (yes/no)**: Value of the child's `stream.conf` `ssl skip certificate verification` parameter (default is no).
-
-| Parent TLS enabled|Parent port SSL|Child TLS|Child SSL Ver.|Behavior|
-|:----------------:|:-------------:|:-------:|:------------:|:-------|
-| No|-|No|no|Legacy behavior. The parent-child stream is unencrypted.|
-| Yes|force|No|no|The parent rejects the child connection.|
-| Yes|-/optional|No|no|The parent-child stream is unencrypted (expected situation for legacy child nodes and newer parent nodes)|
-| Yes|-/force/optional|Yes|no|The parent-child stream is encrypted, provided that the parent has a valid TLS/SSL certificate. Otherwise, the child refuses to connect.|
-| Yes|-/force/optional|Yes|yes|The parent-child stream is encrypted.|
### Streaming compression
@@ -416,83 +279,147 @@ Same thing applies with the `[MACHINE_GUID]` configuration.
[MACHINE_GUID]
enable compression = yes | no
```
-## Viewing remote host dashboards, using mirrored databases
-On any receiving Netdata, that maintains remote databases and has its web server enabled,
-The node menu will include a list of the mirrored databases.
+### Securing streaming with TLS/SSL
-![image](https://cloud.githubusercontent.com/assets/2662304/24080824/24cd2d3c-0caf-11e7-909d-a8dd1dbb95d7.png)
+Netdata does not activate TLS encryption by default. To encrypt streaming connections, you first need to [enable TLS
+support](https://github.com/netdata/netdata/blob/master/web/server/README.md#enabling-tls-support) on the parent. With encryption enabled on the receiving side, you
+need to instruct the child to use TLS/SSL as well. On the child's `stream.conf`, configure the destination as follows:
-Selecting any of these, the server will offer a dashboard using the mirrored metrics.
+```
+[stream]
+ destination = host:port:SSL
+```
-## Monitoring ephemeral nodes
+The word `SSL` appended to the end of the destination tells the child that connections must be encrypted.
-Auto-scaling is probably the most trendy service deployment strategy these days.
+> While Netdata uses Transport Layer Security (TLS) 1.2 to encrypt communications rather than the obsolete SSL protocol,
+> it's still common practice to refer to encrypted web connections as `SSL`. Many vendors, like Nginx and even Netdata
+> itself, use `SSL` in configuration files, whereas documentation will always refer to encrypted communications as `TLS`
+> or `TLS/SSL`.
-Auto-scaling detects the need for additional resources and boots VMs on demand, based on a template. Soon after they start running the applications, a load balancer starts distributing traffic to them, allowing the service to grow horizontally to the scale needed to handle the load. When demands falls, auto-scaling starts shutting down VMs that are no longer needed.
+#### Certificate verification
-![Monitoring ephemeral nodes with Netdata](https://cloud.githubusercontent.com/assets/2662304/23627426/65a9074a-02b9-11e7-9664-cd8f258a00af.png)
+When TLS/SSL is enabled on the child, the default behavior will be to not connect with the parent unless the server's
+certificate can be verified via the default chain. In case you want to avoid this check, add the following to the
+child's `stream.conf` file:
-What a fantastic feature for controlling infrastructure costs! Pay only for what you need for the time you need it!
+```
+[stream]
+ ssl skip certificate verification = yes
+```
+
+#### Trusted certificate
+
+If you've enabled [certificate verification](#certificate-verification), you might see errors from the OpenSSL library
+when there's a problem with checking the certificate chain (`X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY`). More
+importantly, OpenSSL will reject self-signed certificates.
-In auto-scaling, all servers are ephemeral, they live for just a few hours. Every VM is a brand new instance of the application, that was automatically created based on a template.
+Given these known issues, you have two options. If you trust your certificate, you can set the options `CApath` and
+`CAfile` to inform Netdata where your certificates, and the certificate trusted file, are stored.
-So, how can we monitor them? How can we be sure that everything is working as expected on all of them?
+For more details about these options, you can read about [verify
+locations](https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_load_verify_locations.html).
-### The Netdata way
+Before you changed your streaming configuration, you need to copy your trusted certificate to your child system and add
+the certificate to OpenSSL's list.
-We recently made a significant improvement at the core of Netdata to support monitoring such setups.
+On most Linux distributions, the `update-ca-certificates` command searches inside the `/usr/share/ca-certificates`
+directory for certificates. You should double-check by reading the `update-ca-certificate` manual (`man
+update-ca-certificate`), and then change the directory in the below commands if needed.
-Following the Netdata way of monitoring, we wanted:
+If you have `sudo` configured on your child system, you can use that to run the following commands. If not, you'll have
+to log in as `root` to complete them.
+
+```
+# mkdir /usr/share/ca-certificates/netdata
+# cp parent_cert.pem /usr/share/ca-certificates/netdata/parent_cert.crt
+# chown -R netdata.netdata /usr/share/ca-certificates/netdata/
+```
-1. **real-time performance monitoring**, collecting ***thousands of metrics per server per second***, visualized in interactive, automatically created dashboards.
-2. **real-time alarms**, for all nodes.
-3. **zero configuration**, all ephemeral servers should have exactly the same configuration, and nothing should be configured at any system for each of the ephemeral nodes. We shouldn't care if 10 or 100 servers are spawned to handle the load.
-4. **self-cleanup**, so that nothing needs to be done for cleaning up the monitoring infrastructure from the hundreds of nodes that may have been monitored through time.
+First, you create a new directory to store your certificates for Netdata. Next, you need to change the extension on your
+certificate from `.pem` to `.crt` so it's compatible with `update-ca-certificate`. Finally, you need to change
+permissions so the user that runs Netdata can access the directory where you copied in your certificate.
+
+Next, edit the file `/etc/ca-certificates.conf` and add the following line:
+
+```
+netdata/parent_cert.crt
+```
+
+Now you update the list of certificates running the following, again either as `sudo` or `root`:
+
+```
+# update-ca-certificates
+```
-### How it works
+> Some Linux distributions have different methods of updating the certificate list. For more details, please read this
+> guide on [adding trusted root certificates](https://github.com/Busindre/How-to-Add-trusted-root-certificates).
-All monitoring solutions, including Netdata, work like this:
+Once you update your certificate list, you can set the stream parameters for Netdata to trust the parent certificate.
+Open `stream.conf` for editing and change the following lines:
-1. Collect metrics from the system and the running applications
-2. Store metrics in a time-series database
-3. Examine metrics periodically, for triggering alarms and sending alarm notifications
-4. Visualize metrics so that users can see what exactly is happening
+```
+[stream]
+ CApath = /etc/ssl/certs/
+ CAfile = /etc/ssl/certs/parent_cert.pem
+```
-Netdata used to be self-contained, so that all these functions were handled entirely by each server. The changes we made, allow each Netdata to be configured independently for each function. So, each Netdata can now act as:
+With this configuration, the `CApath` option tells Netdata to search for trusted certificates inside `/etc/ssl/certs`.
+The `CAfile` option specifies the Netdata parent certificate is located at `/etc/ssl/certs/parent_cert.pem`. With this
+configuration, you can skip using the system's entire list of certificates and use Netdata's parent certificate instead.
-- A self-contained system, much like it used to be.
-- A data collector that collects metrics from a host and pushes them to another Netdata (with or without a local database and alarms).
-- A proxy, which receives metrics from other hosts and pushes them immediately to other Netdata servers. Netdata proxies can also be `store and forward proxies` meaning that they are able to maintain a local database for all metrics passing through them (with or without alarms).
-- A time-series database node, where data are kept, alarms are run and queries are served to visualise the metrics.
+#### Expected behaviors
-### Configuring an auto-scaling setup
+With the introduction of TLS/SSL, the parent-child communication behaves as shown in the table below, depending on the
+following configurations:
-![A diagram of an auto-scaling setup with Netdata](https://user-images.githubusercontent.com/1153921/84290043-0c1c1600-aaf8-11ea-9757-dd8dd8a8ec6c.png)
+- **Parent TLS (Yes/No)**: Whether the `[web]` section in `netdata.conf` has `ssl key` and `ssl certificate`.
+- **Parent port TLS (-/force/optional)**: Depends on whether the `[web]` section `bind to` contains a `^SSL=force` or
+ `^SSL=optional` directive on the port(s) used for streaming.
+- **Child TLS (Yes/No)**: Whether the destination in the child's `stream.conf` has `:SSL` at the end.
+- **Child TLS Verification (yes/no)**: Value of the child's `stream.conf` `ssl skip certificate verification`
+ parameter (default is no).
-You need a Netdata parent. This node should not be ephemeral. It will be the node where all ephemeral child
-nodes will send their metrics.
+| Parent TLS enabled | Parent port SSL | Child TLS | Child SSL Ver. | Behavior |
+| :----------------- | :--------------- | :-------- | :------------- | :--------------------------------------------------------------------------------------------------------------------------------------- |
+| No | - | No | no | Legacy behavior. The parent-child stream is unencrypted. |
+| Yes | force | No | no | The parent rejects the child connection. |
+| Yes | -/optional | No | no | The parent-child stream is unencrypted (expected situation for legacy child nodes and newer parent nodes) |
+| Yes | -/force/optional | Yes | no | The parent-child stream is encrypted, provided that the parent has a valid TLS/SSL certificate. Otherwise, the child refuses to connect. |
+| Yes | -/force/optional | Yes | yes | The parent-child stream is encrypted. |
-The parent will need to authorize child nodes to receive their metrics. This is done with an API key.
+### Proxy
-#### API keys
+A proxy is a node that receives metrics from a child, then streams them onward to a parent. To configure a proxy,
+configure it as a receiving and a sending Netdata at the same time.
-API keys are just random GUIDs. Use the Linux command `uuidgen` to generate one. You can use the same API key for all your child nodes, or you can configure one API for each of them. This is entirely your decision.
+Netdata proxies may or may not maintain a database for the metrics passing through them. When they maintain a database,
+they can also run health checks (alarms and notifications) for the remote host that is streaming the metrics.
-We suggest to use the same API key for each ephemeral node template you have, so that all replicas of the same ephemeral node will have exactly the same configuration.
+In the following example, the proxy receives metrics from a child node using the `API_KEY` of
+`66666666-7777-8888-9999-000000000000`, then stores metrics using `dbengine`. It then uses the `API_KEY` of
+`11111111-2222-3333-4444-555555555555` to proxy those same metrics on to a parent node at `203.0.113.0`.
-I will use this API_KEY: `11111111-2222-3333-4444-555555555555`. Replace it with your own.
+```conf
+[stream]
+ enabled = yes
+ destination = 203.0.113.0
+ api key = 11111111-2222-3333-4444-555555555555
-#### Configuring the parent
+[66666666-7777-8888-9999-000000000000]
+ enabled = yes
+ default memory mode = dbengine
+```
-To configure the parent node:
+### Ephemeral nodes
-1. On the parent node, edit `stream.conf` by using the `edit-config` script:
-`/etc/netdata/edit-config stream.conf`
+Netdata can help you monitor ephemeral nodes, such as containers in an auto-scaling infrastructure, by always streaming
+metrics to any number of permanently-running parent nodes.
-2. Set the following parameters:
+On the parent, set the following in `stream.conf`:
-```bash
+```conf
[11111111-2222-3333-4444-555555555555]
# enable/disable this API key
enabled = yes
@@ -507,24 +434,7 @@ To configure the parent node:
health enabled by default = auto
```
-_`stream.conf` on the parent, to enable receiving metrics from its child nodes using the API key._
-
-If you used many API keys, you can add one such section for each API key.
-
-When done, restart Netdata on the parent node. It is now ready to receive metrics.
-
-Note that `health enabled by default = auto` will still trigger `last_collected` alarms, if a connected child does not exit gracefully. If the `netdata` process running on the child is
-stopped, it will close the connection to the parent, ensuring that no `last_collected` alarms are triggered. For example, a proper container restart would first terminate
-the `netdata` process, but a system power issue would leave the connection open on the parent side. In the second case, you will still receive alarms.
-
-#### Configuring the child nodes
-
-To configure the child node:
-
-1. On the child node, edit `stream.conf` by using the `edit-config` script:
-`/etc/netdata/edit-config stream.conf`
-
-2. Set the following parameters:
+On the child nodes, set the following in `stream.conf`:
```bash
[stream]
@@ -534,44 +444,26 @@ To configure the child node:
# the IP and PORT of the parent
destination = 10.11.12.13:19999
- # the API key to use
+ # the API key to use
api key = 11111111-2222-3333-4444-555555555555
```
-_`stream.conf` on child nodes, to enable pushing metrics to their parent at `10.11.12.13:19999`._
-
-Using just the above configuration, the child nodes will be pushing their metrics to the parent Netdata, but they will still maintain a local database of the metrics and run health checks. To disable them, edit `/etc/netdata/netdata.conf` and set:
+In addition, edit `netdata.conf` on each child node to disable the database and alarms.
```bash
[global]
# disable the local database
- memory mode = none
+ memory mode = none
[health]
# disable health checks
enabled = no
```
-_`netdata.conf` configuration on child nodes, to disable the local database and health checks._
-
-Keep in mind that setting `memory mode = none` will also force `[health].enabled = no` (health checks require access to a local database). But you can keep the database and disable health checks if you need to. You are however sending all the metrics to the parent node, which can handle the health checking (`[health].enabled = yes`)
-
-#### Netdata unique ID
-
-The file `/var/lib/netdata/registry/netdata.public.unique.id` contains a random GUID that **uniquely identifies each Netdata Agent**. This file is automatically generated, by Netdata, the first time it is started and remains unaltered forever.
-
-> If you are building an image to be used for automated provisioning of autoscaled VMs, it important to delete that file from the image, so that each instance of your image will generate its own.
-
-#### Troubleshooting metrics streaming
+## Troubleshooting
Both parent and child nodes log information at `/var/log/netdata/error.log`.
-To obtain the error logs, run the following on both the parent and child nodes:
-
-```
-tail -f /var/log/netdata/error.log | grep STREAM
-```
-
If the child manages to connect to the parent you will see something like (on the parent):
```
@@ -591,53 +483,7 @@ and something like this on the child:
2017-03-09 09:38:28: netdata: INFO : STREAM xxx [send to box:19999]: established communication - sending metrics...
```
-### Archiving to a time-series database
-
-The parent Netdata node can also archive metrics, for all its child nodes, to a time-series database. At the time of
-this writing, Netdata supports:
-
-- graphite
-- opentsdb
-- prometheus
-- json document DBs
-- all the compatibles to the above (e.g. kairosdb, influxdb, etc)
-
-Check the Netdata [exporting documentation](https://github.com/netdata/netdata/blob/master/docs/export/external-databases.md) for configuring this.
-
-This is how such a solution will work:
-
-![Diagram showing an example configuration for archiving to a time-series
-database](https://user-images.githubusercontent.com/1153921/84291308-c2ccc600-aaf9-11ea-98a9-89ccbf3a62dd.png)
-
-### An advanced setup
-
-Netdata also supports `proxies` with and without a local database, and data retention can be different between all nodes.
-
-This means a setup like the following is also possible:
-
-<p align="center">
-<img src="https://cloud.githubusercontent.com/assets/2662304/23629551/bb1fd9c2-02c0-11e7-90f5-cab5a3ed4c53.png"/>
-</p>
-
-## Proxies
-
-A proxy is a Netdata node that is receiving metrics from a Netdata node, and streams them to another Netdata node.
-
-Netdata proxies may or may not maintain a database for the metrics passing through them.
-When they maintain a database, they can also run health checks (alarms and notifications)
-for the remote host that is streaming the metrics.
-
-To configure a proxy, configure it as a receiving and a sending Netdata at the same time,
-using `stream.conf`.
-
-The sending side of a Netdata proxy, connects and disconnects to the final destination of the
-metrics, following the same pattern of the receiving side.
-
-For a practical example see [Monitoring ephemeral nodes](#monitoring-ephemeral-nodes).
-
-## Troubleshooting streaming connections
-
-This section describes the most common issues you might encounter when connecting parent and child nodes.
+The following sections describe the most common issues you might encounter when connecting parent and child nodes.
### Slow connections between parent and child
@@ -655,8 +501,8 @@ On the parent side, you may see various error messages, most commonly the follow
netdata ERROR : STREAM_PARENT[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : read failed: end of file
```
-Another common problem in slow connections is the CHILD sending a partial message to the parent. In this case,
-the parent will write the following in its `error.log`:
+Another common problem in slow connections is the child sending a partial message to the parent. In this case, the
+parent will write the following to its `error.log`:
```
ERROR : STREAM_RECEIVER[CHILD HOSTNAME,[CHILD IP]:CHILD PORT] : sent command 'B' which is not known by netdata, for host 'HOSTNAME'. Disabling it.
@@ -681,11 +527,11 @@ down), you will see the following in the child's `error.log`.
ERROR : STREAM_SENDER[HOSTNAME] : Failed to connect to 'PARENT IP', port 'PARENT PORT' (errno 113, No route to host)
```
-### 'Is this a Netdata node?'
+### 'Is this a Netdata?'
This question can appear when Netdata starts the stream and receives an unexpected response. This error can appear when
the parent is using SSL and the child tries to connect using plain text. You will also see this message when
-Netdata connects to another server that isn't a Netdata node. The complete error message will look like this:
+Netdata connects to another server that isn't Netdata. The complete error message will look like this:
```
ERROR : STREAM_SENDER[CHILD HOSTNAME] : STREAM child HOSTNAME [send to PARENT HOSTNAME:PARENT PORT]: server is not replying properly (is it a netdata?).
@@ -710,15 +556,15 @@ STREAM [receive from [child HOSTNAME]:child IP]: `MESSAGE`. Forbidding access."
`MESSAGE` will have one of the following patterns:
-- `request without KEY` : The message received is incomplete and the KEY value can be API, hostname, machine GUID.
-- `API key 'VALUE' is not valid GUID`: The UUID received from child does not have the format defined in [RFC 4122]
- (https://tools.ietf.org/html/rfc4122)
-- `machine GUID 'VALUE' is not GUID.`: This error with machine GUID is like the previous one.
-- `API key 'VALUE' is not allowed`: This stream has a wrong API key.
-- `API key 'VALUE' is not permitted from this IP`: The IP is not allowed to use STREAM with this parent.
-- `machine GUID 'VALUE' is not allowed.`: The GUID that is trying to send stream is not allowed.
-- `Machine GUID 'VALUE' is not permitted from this IP. `: The IP does not match the pattern or IP allowed to connect
- to use stream.
+- `request without KEY` : The message received is incomplete and the KEY value can be API, hostname, machine GUID.
+- `API key 'VALUE' is not valid GUID`: The UUID received from child does not have the format defined in [RFC
+ 4122](https://tools.ietf.org/html/rfc4122)
+- `machine GUID 'VALUE' is not GUID.`: This error with machine GUID is like the previous one.
+- `API key 'VALUE' is not allowed`: This stream has a wrong API key.
+- `API key 'VALUE' is not permitted from this IP`: The IP is not allowed to use STREAM with this parent.
+- `machine GUID 'VALUE' is not allowed.`: The GUID that is trying to send stream is not allowed.
+- `Machine GUID 'VALUE' is not permitted from this IP. `: The IP does not match the pattern or IP allowed to connect to
+ use stream.
### Netdata could not create a stream
@@ -730,5 +576,3 @@ file descriptor given is not a valid stream
```
After logging this error, Netdata will close the stream.
-
-
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 95652942e..ff7a95629 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-#include "parser/parser.h"
// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
@@ -340,10 +339,14 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
.host = rpt->host,
.opaque = rpt,
.cd = cd,
- .trust_durations = 1
+ .trust_durations = 1,
+ .capabilities = rpt->capabilities,
};
- PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+ PARSER *parser = parser_init(&user, NULL, NULL, fd,
+ PARSER_INPUT_SPLIT, ssl);
+
+ pluginsd_keywords_init(parser, PARSER_INIT_STREAMING);
rrd_collector_started();
@@ -416,7 +419,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
}
done:
- result = user.count;
+ result = user.data_collections_count;
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -434,7 +437,7 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) {
rrdhost_receiver_replicating_charts_zero(host);
}
-bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
+static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
bool signal_rrdcontext = false;
bool set_this = false;
@@ -469,6 +472,7 @@ bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
rrdpush_receiver_replication_reset(host);
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+ aclk_queue_node_info(rpt->host, true);
set_this = true;
}
@@ -689,6 +693,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
return 1;
}
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
+ rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
+ close(rpt->fd);
+ return 1;
+ }
+
// system_info has been consumed by the host structure
rpt->system_info = NULL;
@@ -724,16 +734,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
struct plugind cd = {
.update_every = default_rrd_update_every,
- .serial_failures = 0,
- .successful_collections = 0,
.unsafe = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.running = true,
.enabled = true,
},
.started_t = now_realtime_sec(),
- .next = NULL,
- .capabilities = 0,
};
// put the client IP and port into the buffers used by plugins.d
@@ -804,8 +810,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED");
- cd.capabilities = rpt->capabilities;
-
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
diff --git a/streaming/replication.c b/streaming/replication.c
index 7c1f16b4c..a50913a1a 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -88,6 +88,7 @@ struct replication_query {
bool locked_data_collection;
bool execute;
bool interrupted;
+ STREAM_CAPABILITIES capabilities;
} query;
time_t wall_clock_time;
@@ -95,7 +96,7 @@ struct replication_query {
size_t points_read;
size_t points_generated;
- struct storage_engine_query_ops *ops;
+ STORAGE_ENGINE_BACKEND backend;
struct replication_request *rq;
size_t dimensions;
@@ -112,7 +113,8 @@ static struct replication_query *replication_query_prepare(
time_t query_after,
time_t query_before,
bool query_enable_streaming,
- time_t wall_clock_time
+ time_t wall_clock_time,
+ STREAM_CAPABILITIES capabilities
) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
@@ -131,6 +133,7 @@ static struct replication_query *replication_query_prepare(
q->query.after = query_after;
q->query.before = query_before;
q->query.enable_streaming = query_enable_streaming;
+ q->query.capabilities = capabilities;
q->wall_clock_time = wall_clock_time;
@@ -159,7 +162,7 @@ static struct replication_query *replication_query_prepare(
}
}
- q->ops = &st->rrdhost->db[0].eng->api.query_ops;
+ q->backend = st->rrdhost->db[0].eng->backend;
// prepare our array of dimensions
size_t count = 0;
@@ -181,7 +184,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
+ storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -209,32 +212,40 @@ static struct replication_query *replication_query_prepare(
return q;
}
-static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
+static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
+ NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(!rd->exposed) continue;
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
- rrddim_id(rd),
- (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
- rd->last_collected_value,
- rd->last_calculated_value,
- rd->last_stored_value
- );
- }
+ rrddim_foreach_read(rd, st){
+ if (!rd->exposed) continue;
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
+ sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->last_collected_time.tv_usec);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_int64_encoded(wb, encoding, rd->last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
rrddim_foreach_done(rd);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
- (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
- (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
+ buffer_fast_strcat(wb, "\n", 1);
}
static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
size_t dimensions = q->dimensions;
if(wb && q->query.enable_streaming)
- replication_send_chart_collection_state(wb, q->st);
+ replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
if(q->query.locked_data_collection) {
netdata_spinlock_unlock(&q->st->data_collection_lock);
@@ -249,7 +260,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
struct replication_dimension *d = &q->data[i];
if (unlikely(!d->enabled)) continue;
- q->ops->finalize(&d->handle);
+ storage_engine_query_finalize(&d->handle);
dictionary_acquired_item_release(d->dict, d->rda);
@@ -281,7 +292,7 @@ static void replication_query_align_to_optimal_before(struct replication_query *
struct replication_dimension *d = &q->data[i];
if(unlikely(!d->enabled)) continue;
- time_t new_before = q->ops->align_to_optimal_before(&d->handle);
+ time_t new_before = storage_engine_align_to_optimal_before(&d->handle);
if (!expanded_before || new_before < expanded_before)
expanded_before = new_before;
}
@@ -296,13 +307,14 @@ static void replication_query_align_to_optimal_before(struct replication_query *
static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
replication_query_align_to_optimal_before(q);
+ NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
time_t after = q->query.after;
time_t before = q->query.before;
size_t dimensions = q->dimensions;
- struct storage_engine_query_ops *ops = q->ops;
time_t wall_clock_time = q->wall_clock_time;
- size_t points_read = q->points_read, points_generated = q->points_generated;
+ bool finished_with_gap = false;
+ size_t points_read = 0, points_generated = 0;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
time_t actual_after = 0, actual_before = 0;
@@ -318,8 +330,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
// fetch the first valid point for the dimension
int max_skip = 1000;
- while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
- d->sp = ops->next_metric(&d->handle);
+ while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
+ d->sp = storage_engine_query_next_metric(&d->handle);
points_read++;
}
@@ -328,9 +340,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
- rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
- (unsigned long long) now);
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
+ "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
+ (unsigned long long) now);
continue;
}
@@ -374,9 +387,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
else
fix_min_start_time = min_end_time - min_update_every;
+#ifdef NETDATA_INTERNAL_CHECKS
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
- "misaligned dimensions "
+ "misaligned dimensions, "
"update every (min: %ld, max: %ld), "
"start time (min: %ld, max: %ld), "
"end time (min %ld, max %ld), "
@@ -389,6 +403,7 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
now, last_end_time_in_buffer,
fix_min_start_time
);
+#endif
min_start_time = fix_min_start_time;
}
@@ -410,7 +425,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
q->query.before = last_end_time_in_buffer;
q->query.enable_streaming = false;
- internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. "
+ internal_error(true, "REPLICATION: current buffer size %zu is more than the "
+ "max message size %zu for chart '%s' of host '%s'. "
"Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
@@ -422,11 +438,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
}
last_end_time_in_buffer = min_end_time;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
- (unsigned long long) min_start_time,
- (unsigned long long) min_end_time,
- (unsigned long long) wall_clock_time
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
+ buffer_print_uint64_encoded(wb, encoding, min_start_time);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, min_end_time);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
// output the replay values for this time
for (size_t i = 0; i < dimensions; i++) {
@@ -438,8 +456,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
!storage_point_is_unset(d->sp) &&
!storage_point_is_gap(d->sp))) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
- rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
+ buffer_fast_strcat(wb, "\" ", 2);
+ buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
+ buffer_fast_strcat(wb, "\n", 1);
points_generated++;
}
@@ -450,9 +473,16 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
else if(unlikely(min_end_time < now))
// the query does not progress
break;
- else
+ else {
// we have gap - all points are in the future
now = min_start_time;
+
+ if(min_start_time > before && !points_generated) {
+ before = q->query.before = min_start_time - 1;
+ finished_with_gap = true;
+ break;
+ }
+ }
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -462,28 +492,33 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
(unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
(unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
}
else
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
- q->points_read = points_read;
- q->points_generated = points_generated;
+ q->points_read += points_read;
+ q->points_generated += points_generated;
- bool finished_with_gap = false;
if(last_end_time_in_buffer < before - q->st->update_every)
finished_with_gap = true;
return finished_with_gap;
}
-static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
+static struct replication_query *replication_response_prepare(
+ RRDSET *st,
+ bool requested_enable_streaming,
+ time_t requested_after,
+ time_t requested_before,
+ STREAM_CAPABILITIES capabilities
+ ) {
time_t wall_clock_time = now_realtime_sec();
if(requested_after > requested_before) {
@@ -509,7 +544,8 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r
bool query_enable_streaming = requested_enable_streaming;
time_t db_first_entry = 0, db_last_entry = 0;
- rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
+ rrdset_get_retention_of_tier_for_collected_chart(
+ st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
// no data requested - just enable streaming
@@ -543,7 +579,7 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r
db_first_entry, db_last_entry,
requested_after, requested_before, requested_enable_streaming,
query_after, query_before, query_enable_streaming,
- wall_clock_time);
+ wall_clock_time, capabilities);
}
void replication_response_cancel_and_finalize(struct replication_query *q) {
@@ -553,6 +589,7 @@ void replication_response_cancel_and_finalize(struct replication_query *q) {
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
+ NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
struct replication_request *rq = q->rq;
RRDSET *st = q->st;
RRDHOST *host = st->rrdhost;
@@ -562,7 +599,11 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// holding the host's buffer lock for too long
BUFFER *wb = sender_start(host->sender);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "'\n", 2);
+
+// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
bool locked_data_collection = q->query.locked_data_collection;
q->query.locked_data_collection = false;
@@ -585,23 +626,22 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// end with first/last entries we have, and the first start time and
// last end time of the data we sent
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
- // current chart update every
- (int)st->update_every
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
+ buffer_print_int64_encoded(wb, encoding, st->update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, db_first_entry);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, db_last_entry);
- // child first db time, child end db time
- , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
+ buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
- // start streaming boolean
- , enable_streaming ? "true" : "false"
-
- // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
- , (unsigned long long)after, (unsigned long long)before
-
- // child world clock time
- , (unsigned long long)wall_clock_time
- );
+ buffer_print_uint64_encoded(wb, encoding, after);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, before);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
sender_commit(host->sender, wb);
@@ -733,9 +773,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
, msg
, r->last_request.after, r->last_request.before
, r->child_db.first_entry_t, r->child_db.last_entry_t
- , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+ , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD"
, r->local_db.first_entry_t, r->local_db.last_entry_t
- , r->local_db.now
+ , r->local_db.wall_clock_time
, r->gap.from, r->gap.to
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -928,11 +968,12 @@ struct replication_sort_entry {
// the global variables for the replication thread
static struct replication_thread {
+ ARAL *aral_rse;
+
SPINLOCK spinlock;
struct {
size_t pending; // number of requests pending in the queue
- Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
// statistics
size_t added; // number of requests added to the queue
@@ -951,6 +992,7 @@ static struct replication_thread {
} unsafe; // protected from replication_recursive_lock()
struct {
+ Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
size_t executed; // the number of replication requests executed
size_t latest_first_time; // the 'after' timestamp of the last request we executed
size_t memory; // the total memory allocated by replication
@@ -964,10 +1006,10 @@ static struct replication_thread {
} main_thread; // access is allowed only by the main thread
} replication_globals = {
+ .aral_rse = NULL,
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.unsafe = {
.pending = 0,
- .unique_id = 0,
.added = 0,
.removed = 0,
@@ -984,6 +1026,7 @@ static struct replication_thread {
},
},
.atomic = {
+ .unique_id = 0,
.executed = 0,
.latest_first_time = 0,
.memory = 0,
@@ -1047,17 +1090,15 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) {
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
- fatal_when_replication_is_not_locked_for_me();
-
- struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+ struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
__atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
// copy the request
rse->rq = rq;
- rse->unique_id = ++replication_globals.unsafe.unique_id;
+ rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST);
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
@@ -1068,26 +1109,30 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
}
static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
- freez(rse);
+ aral_freez(replication_globals.aral_rse, rse);
__atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
}
static void replication_sort_entry_add(struct replication_request *rq) {
- replication_recursive_lock();
-
if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
rq->not_indexed_preprocessing = false;
+ replication_recursive_lock();
replication_globals.unsafe.pending_no_room++;
replication_recursive_unlock();
return;
}
- if(rq->not_indexed_buffer_full)
- replication_globals.unsafe.pending_no_room--;
+ // cache this, because it will be changed
+ bool decrement_no_room = rq->not_indexed_buffer_full;
+
+ struct replication_sort_entry *rse = replication_sort_entry_create(rq);
+
+ replication_recursive_lock();
- struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
+ if(decrement_no_room)
+ replication_globals.unsafe.pending_no_room--;
// if(rq->after < (time_t)replication_globals.protected.queue.after &&
// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
@@ -1371,7 +1416,12 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
if(likely(workers))
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ rq->q = replication_response_prepare(
+ rq->st,
+ rq->start_streaming,
+ rq->after,
+ rq->before,
+ rq->sender->capabilities);
}
if(likely(workers))
@@ -1580,67 +1630,85 @@ static void replication_initialize_workers(bool master) {
#define REQUEST_QUEUE_EMPTY (-1)
#define REQUEST_CHART_NOT_FOUND (-2)
-static int replication_execute_next_pending_request(bool cancel) {
- static __thread int max_requests_ahead = 0;
- static __thread struct replication_request *rqs = NULL;
- static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
- static __thread size_t queue_rounds = 0; (void)queue_rounds;
+static __thread struct replication_thread_pipeline {
+ int max_requests_ahead;
+ struct replication_request *rqs;
+ int rqs_last_executed, rqs_last_prepared;
+ size_t queue_rounds;
+} rtp = {
+ .max_requests_ahead = 0,
+ .rqs = NULL,
+ .rqs_last_executed = 0,
+ .rqs_last_prepared = 0,
+ .queue_rounds = 0,
+};
+
+static void replication_pipeline_cancel_and_cleanup(void) {
+ if(!rtp.rqs)
+ return;
+
struct replication_request *rq;
+ size_t cancelled = 0;
+
+ do {
+ if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+ rtp.rqs_last_executed = 0;
- if(unlikely(cancel)) {
- if(rqs) {
- size_t cancelled = 0;
- do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
+ rq = &rtp.rqs[rtp.rqs_last_executed];
- rq = &rqs[rqs_last_executed];
+ if (rq->q) {
+ internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+ internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
- if (rq->q) {
- internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
- internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
+ replication_response_cancel_and_finalize(rq->q);
+ rq->q = NULL;
+ cancelled++;
+ }
- replication_response_cancel_and_finalize(rq->q);
- rq->q = NULL;
- cancelled++;
- }
+ rq->executed = true;
+ rq->found = false;
- rq->executed = true;
- rq->found = false;
+ } while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
- } while (rqs_last_executed != rqs_last_prepared);
+ internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
- internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
- }
- return REQUEST_QUEUE_EMPTY;
- }
+ freez(rtp.rqs);
+ rtp.rqs = NULL;
+ rtp.max_requests_ahead = 0;
+ rtp.rqs_last_executed = 0;
+ rtp.rqs_last_prepared = 0;
+ rtp.queue_rounds = 0;
+}
+
+static int replication_pipeline_execute_next(void) {
+ struct replication_request *rq;
- if(unlikely(!rqs)) {
- max_requests_ahead = get_netdata_cpus() / 2;
+ if(unlikely(!rtp.rqs)) {
+ rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
- if(max_requests_ahead > libuv_worker_threads * 2)
- max_requests_ahead = libuv_worker_threads * 2;
+ if(rtp.max_requests_ahead > libuv_worker_threads * 2)
+ rtp.max_requests_ahead = libuv_worker_threads * 2;
- if(max_requests_ahead < 2)
- max_requests_ahead = 2;
+ if(rtp.max_requests_ahead < 2)
+ rtp.max_requests_ahead = 2;
- rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
- __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
+ rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
+ __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
}
// fill the queue
do {
- if(++rqs_last_prepared >= max_requests_ahead) {
- rqs_last_prepared = 0;
- queue_rounds++;
+ if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
+ rtp.rqs_last_prepared = 0;
+ rtp.queue_rounds++;
}
- internal_fatal(rqs[rqs_last_prepared].q,
+ internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
"REPLAY FATAL: slot is used by query that has not been executed!");
worker_is_busy(WORKER_JOB_FIND_NEXT);
- rqs[rqs_last_prepared] = replication_request_get_first_available();
- rq = &rqs[rqs_last_prepared];
+ rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
+ rq = &rtp.rqs[rtp.rqs_last_prepared];
if(rq->found) {
if (!rq->st) {
@@ -1650,20 +1718,25 @@ static int replication_execute_next_pending_request(bool cancel) {
if (rq->st && !rq->q) {
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ rq->q = replication_response_prepare(
+ rq->st,
+ rq->start_streaming,
+ rq->after,
+ rq->before,
+ rq->sender->capabilities);
}
rq->executed = false;
}
- } while(rq->found && rqs_last_prepared != rqs_last_executed);
+ } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
// pick the first usable
do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
+ if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+ rtp.rqs_last_executed = 0;
- rq = &rqs[rqs_last_executed];
+ rq = &rtp.rqs[rtp.rqs_last_executed];
if(rq->found) {
internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
@@ -1696,7 +1769,7 @@ static int replication_execute_next_pending_request(bool cancel) {
else
internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
- } while(!rq->found && rqs_last_executed != rqs_last_prepared);
+ } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
if(unlikely(!rq->found)) {
worker_is_idle();
@@ -1720,7 +1793,7 @@ static int replication_execute_next_pending_request(bool cancel) {
}
static void replication_worker_cleanup(void *ptr __maybe_unused) {
- replication_execute_next_pending_request(true);
+ replication_pipeline_cancel_and_cleanup();
worker_unregister();
}
@@ -1730,7 +1803,7 @@ static void *replication_worker_thread(void *ptr) {
netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
while(service_running(SERVICE_REPLICATION)) {
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
@@ -1746,7 +1819,7 @@ static void replication_main_cleanup(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- replication_execute_next_pending_request(true);
+ replication_pipeline_cancel_and_cleanup();
int threads = (int)replication_globals.main_thread.threads;
for(int i = 0; i < threads ;i++) {
@@ -1758,12 +1831,21 @@ static void replication_main_cleanup(void *ptr) {
replication_globals.main_thread.threads_ptrs = NULL;
__atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
+ aral_destroy(replication_globals.aral_rse);
+ replication_globals.aral_rse = NULL;
+
// custom code
worker_unregister();
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
+void replication_initialize(void) {
+ replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry),
+ 0, 65536, aral_by_size_statistics(),
+ NULL, NULL, false, false);
+}
+
void *replication_thread_main(void *ptr __maybe_unused) {
replication_initialize_workers(true);
@@ -1863,7 +1945,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_is_idle();
}
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 256fa8282..62b537f0c 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-#include "parser/parser.h"
/*
* rrdpush
@@ -69,6 +68,23 @@ static void load_stream_conf() {
freez(filename);
}
+STREAM_CAPABILITIES stream_our_capabilities() {
+ return STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_HAS_COMPRESSION |
+ (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
+ 0;
+}
+
bool rrdpush_receiver_needs_dbengine() {
struct section *co;
@@ -174,8 +190,8 @@ static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
else
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
}
- else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
- simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st)))
+ else if(simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->id) ||
+ simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->name))
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
else
@@ -305,9 +321,11 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
(unsigned long long)db_last_time_t,
(unsigned long long)now);
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ }
replication_progress = true;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -327,7 +345,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "\" ", 2);
if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
- buffer_print_llu(wb, st->usec_since_last_update);
+ buffer_print_uint64(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);
@@ -342,7 +360,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
- buffer_print_ll(wb, rd->collected_value);
+ buffer_print_int64(wb, rd->collected_value);
buffer_fast_strcat(wb, "\n", 1);
}
else {
@@ -378,7 +396,74 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
return true;
}
-void rrdset_done_push(RRDSET *st) {
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+ rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags);
+}
+
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+ if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
+ return;
+
+ NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ BUFFER *wb = rsb->wb;
+ time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
+ if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) {
+
+ if(unlikely(rsb->begin_v2_added))
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
+ buffer_fast_strcat(wb, " ", 1);
+ if(point_end_time_s == rsb->wall_clock_time)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
+
+ rsb->last_point_end_time_s = point_end_time_s;
+ rsb->begin_v2_added = true;
+ }
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+
+ if((NETDATA_DOUBLE)rd->last_collected_value == n)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
+
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, flags, true);
+ buffer_fast_strcat(wb, "\n", 1);
+}
+
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ if(!rsb->wb)
+ return;
+
+ if(rsb->v2 && rsb->begin_v2_added) {
+ if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
+
+ buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+ }
+
+ sender_commit(st->rrdhost->sender, rsb->wb);
+
+ *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
+}
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
// fetch the flags we need to check with one atomic operation
@@ -395,7 +480,7 @@ void rrdset_done_push(RRDSET *st) {
error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
- return;
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
@@ -408,17 +493,24 @@ void rrdset_done_push(RRDSET *st) {
if(unlikely((exposed_upstream && replication_in_progress) ||
!should_send_chart_matching(st, rrdset_flags)))
- return;
-
- BUFFER *wb = sender_start(host->sender);
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
- if(unlikely(!exposed_upstream))
+ if(unlikely(!exposed_upstream)) {
+ BUFFER *wb = sender_start(host->sender);
replication_in_progress = rrdpush_send_chart_definition(wb, st);
+ sender_commit(host->sender, wb);
+ }
- if (likely(!replication_in_progress))
- rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
+ if(replication_in_progress)
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
- sender_commit(host->sender, wb);
+ return (RRDSET_STREAM_BUFFER) {
+ .capabilities = host->sender->capabilities,
+ .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
+ .rrdset_flags = rrdset_flags,
+ .wb = sender_start(host->sender),
+ .wall_clock_time = wall_clock_time,
+ };
}
// labels
@@ -633,7 +725,7 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
}
void *rrdpush_receiver_thread(void *ptr);
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
if(!service_running(ABILITY_STREAMING_CONNECTIONS))
return rrdpush_receiver_too_busy_now(w);
@@ -665,11 +757,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
// parse the parameters and fill rpt and rpt->system_info
- while(url) {
- char *value = mystrsep(&url, "&");
+ while(decoded_query_string) {
+ char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&");
if(!value || !*value) continue;
- char *name = mystrsep(&value, "=");
+ char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@@ -851,7 +943,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
{
SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
appconfig_get(&stream_config, rpt->key, "allow from", "*"),
- NULL, SIMPLE_PATTERN_EXACT);
+ NULL, SIMPLE_PATTERN_EXACT, true);
if(key_allow_from) {
if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
@@ -898,7 +990,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
{
SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
- NULL, SIMPLE_PATTERN_EXACT);
+ NULL, SIMPLE_PATTERN_EXACT, true);
if(machine_allow_from) {
if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
@@ -1077,6 +1169,8 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
+ if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED ");
+ if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 ");
}
void log_receiver_capabilities(struct receiver_state *rpt) {
@@ -1118,7 +1212,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
if(caps & STREAM_CAP_V2)
caps &= ~(STREAM_CAP_V1);
- return caps & STREAM_OUR_CAPABILITIES;
+ return caps & stream_our_capabilities();
}
int32_t stream_capabilities_to_vn(uint32_t caps) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 94c1320e7..ff8958440 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -41,6 +41,8 @@ typedef enum {
STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
+ STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
+ STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -53,12 +55,9 @@ typedef enum {
#define STREAM_HAS_COMPRESSION 0
#endif // ENABLE_COMPRESSION
-#define STREAM_OUR_CAPABILITIES ( \
- STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
- STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
- STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY )
+STREAM_CAPABILITIES stream_our_capabilities();
-#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
+#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
// ----------------------------------------------------------------------------
// stream handshake
@@ -187,12 +186,17 @@ struct sender_state {
} replication;
struct {
+ bool pending_data;
size_t buffer_used_percentage; // the current utilization of the sending buffer
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
};
+#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
+
#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
@@ -303,7 +307,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
-void rrdset_done_push(RRDSET *st);
+
+typedef struct rrdset_stream_buffer {
+ STREAM_CAPABILITIES capabilities;
+ bool v2;
+ bool begin_v2_added;
+ time_t wall_clock_time;
+ uint64_t rrdset_flags; // RRDSET_FLAGS
+ time_t last_point_end_time_s;
+ BUFFER *wb;
+} RRDSET_STREAM_BUFFER;
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
+
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
@@ -312,7 +331,7 @@ void rrdpush_claimed_id(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
diff --git a/streaming/sender.c b/streaming/sender.c
index 854b57fc5..179c2dc60 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-#include "parser/parser.h"
#define WORKER_SENDER_JOB_CONNECT 0
#define WORKER_SENDER_JOB_PIPE_READ 1
@@ -104,6 +103,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
+// FILE *fp = fopen("/tmp/stream.txt", "a");
+// fprintf(fp,
+// "\n--- SEND BEGIN: %s ----\n"
+// "%s"
+// "--- SEND END ----------------------------------------\n"
+// , rrdhost_hostname(s->host), src);
+// fclose(fp);
+
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
@@ -171,8 +178,16 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
replication_recalculate_buffer_used_ratio_unsafe(s);
+ bool signal_sender = false;
+ if(!rrdpush_sender_pipe_has_pending_data(s)) {
+ rrdpush_sender_pipe_set_pending_data(s);
+ signal_sender = true;
+ }
+
netdata_mutex_unlock(&s->mutex);
- rrdpush_signal_sender_to_wake_up(s);
+
+ if(signal_sender)
+ rrdpush_signal_sender_to_wake_up(s);
}
static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
@@ -522,7 +537,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
#endif
// reset our capabilities to default
- s->capabilities = STREAM_OUR_CAPABILITIES;
+ s->capabilities = stream_our_capabilities();
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
@@ -894,7 +909,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
pluginsd_function_result_begin_to_buffer(wb
, string2str(tmp->transaction)
, code
- , functions_content_type_to_format(func_wb->contenttype)
+ , functions_content_type_to_format(func_wb->content_type)
, func_wb->expires);
buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
@@ -929,7 +944,7 @@ void execute_commands(struct sender_state *s) {
// internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+ size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS);
const char *keyword = get_word(words, num_words, 0);
@@ -1009,7 +1024,6 @@ void execute_commands(struct sender_state *s) {
}
struct rrdpush_sender_thread_data {
- struct sender_state *sender_state;
RRDHOST *host;
char *pipe_buffer;
};
@@ -1242,7 +1256,6 @@ void *rrdpush_sender_thread(void *ptr) {
struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
thread_data->pipe_buffer = mallocz(pipe_buffer_size);
- thread_data->sender_state = s;
thread_data->host = s->host;
netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
@@ -1298,8 +1311,10 @@ void *rrdpush_sender_thread(void *ptr) {
netdata_mutex_lock(&s->mutex);
size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->buffer);
- if (unlikely(!outstanding))
+ if (unlikely(!outstanding)) {
+ rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
+ }
netdata_mutex_unlock(&s->mutex);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);