summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/Makefile.am11
-rw-r--r--collectors/plugins.d/README.md599
-rw-r--r--collectors/plugins.d/plugins_d.c290
-rw-r--r--collectors/plugins.d/plugins_d.h103
-rw-r--r--collectors/plugins.d/pluginsd_parser.c1360
-rw-r--r--collectors/plugins.d/pluginsd_parser.h39
6 files changed, 2402 insertions, 0 deletions
diff --git a/collectors/plugins.d/Makefile.am b/collectors/plugins.d/Makefile.am
new file mode 100644
index 0000000..59250a9
--- /dev/null
+++ b/collectors/plugins.d/Makefile.am
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+SUBDIRS = \
+ $(NULL)
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md
new file mode 100644
index 0000000..2ecf233
--- /dev/null
+++ b/collectors/plugins.d/README.md
@@ -0,0 +1,599 @@
+<!--
+title: "External plugins overview"
+custom_edit_url: https://github.com/netdata/netdata/edit/master/collectors/plugins.d/README.md
+-->
+
+# External plugins overview
+
+`plugins.d` is the Netdata internal plugin that collects metrics
+from external processes, thus allowing Netdata to use **external plugins**.
+
+## Provided External Plugins
+
+|plugin|language|O/S|description|
+|:----:|:------:|:-:|:----------|
+|[apps.plugin](/collectors/apps.plugin/README.md)|`C`|linux, freebsd|monitors the whole process tree on Linux and FreeBSD and breaks down system resource usage by **process**, **user** and **user group**.|
+|[charts.d.plugin](/collectors/charts.d.plugin/README.md)|`BASH`|all|a **plugin orchestrator** for data collection modules written in `BASH` v4+.|
+|[cups.plugin](/collectors/cups.plugin/README.md)|`C`|all|monitors **CUPS**|
+|[fping.plugin](/collectors/fping.plugin/README.md)|`C`|all|measures network latency, jitter and packet loss between the monitored node and any number of remote network end points.|
+|[ioping.plugin](/collectors/ioping.plugin/README.md)|`C`|all|measures disk latency.|
+|[freeipmi.plugin](/collectors/freeipmi.plugin/README.md)|`C`|linux|collects metrics from enterprise hardware sensors, on Linux servers.|
+|[nfacct.plugin](/collectors/nfacct.plugin/README.md)|`C`|linux|collects netfilter firewall, connection tracker and accounting metrics using `libmnl` and `libnetfilter_acct`.|
+|[xenstat.plugin](/collectors/xenstat.plugin/README.md)|`C`|linux|collects XenServer and XCP-ng metrics using `lxenstat`.|
+|[perf.plugin](/collectors/perf.plugin/README.md)|`C`|linux|collects CPU performance metrics using performance monitoring units (PMU).|
+|[python.d.plugin](/collectors/python.d.plugin/README.md)|`python`|all|a **plugin orchestrator** for data collection modules written in `python` v2 or v3 (both are supported).|
+|[slabinfo.plugin](/collectors/slabinfo.plugin/README.md)|`C`|linux|collects kernel internal cache objects (SLAB) metrics.|
+
+Plugin orchestrators may also be described as **modular plugins**. They are modular since they accept custom made modules to be included. Writing modules for these plugins is easier than accessing the native Netdata API directly. You will find modules already available for each orchestrator under the directory of the particular modular plugin (e.g. under python.d.plugin for the python orchestrator).
+Each of these modular plugins has each own methods for defining modules. Please check the examples and their documentation.
+
+## Motivation
+
+This plugin allows Netdata to use **external plugins** for data collection:
+
+1. external data collection plugins may be written in any computer language.
+
+2. external data collection plugins may use O/S capabilities or `setuid` to
+ run with escalated privileges (compared to the `netdata` daemon).
+ The communication between the external plugin and Netdata is unidirectional
+ (from the plugin to Netdata), so that Netdata cannot manipulate an external
+ plugin running with escalated privileges.
+
+## Operation
+
+Each of the external plugins is expected to run forever.
+Netdata will start it when it starts and stop it when it exits.
+
+If the external plugin exits or crashes, Netdata will log an error.
+If the external plugin exits or crashes without pushing metrics to Netdata, Netdata will not start it again.
+
+- Plugins that exit with any value other than zero, will be disabled. Plugins that exit with zero, will be restarted after some time.
+- Plugins may also be disabled by Netdata if they output things that Netdata does not understand.
+
+The `stdout` of external plugins is connected to Netdata to receive metrics,
+with the API defined below.
+
+The `stderr` of external plugins is connected to Netdata's `error.log`.
+
+Plugins can create any number of charts with any number of dimensions each. Each chart can have its own characteristics independently of the others generated by the same plugin. For example, one chart may have an update frequency of 1 second, another may have 5 seconds and a third may have 10 seconds.
+
+## Configuration
+
+Netdata will supply the environment variables `NETDATA_USER_CONFIG_DIR` (for user supplied) and `NETDATA_STOCK_CONFIG_DIR` (for Netdata supplied) configuration files to identify the directory where configuration files are stored. It is up to the plugin to read the configuration it needs.
+
+The `netdata.conf` section `[plugins]` section contains a list of all the plugins found at the system where Netdata runs, with a boolean setting to enable them or not.
+
+Example:
+
+```
+[plugins]
+ # enable running new plugins = yes
+ # check for new plugins every = 60
+
+ # charts.d = yes
+ # fping = yes
+ # ioping = yes
+ # python.d = yes
+```
+
+The setting `enable running new plugins` sets the default behavior for all external plugins. It can be
+overridden for distinct plugins by modifying the appropriate plugin value configuration to either `yes` or `no`.
+
+The setting `check for new plugins every` sets the interval between scans of the directory
+`/usr/libexec/netdata/plugins.d`. New plugins can be added any time, and Netdata will detect them in a timely manner.
+
+For each of the external plugins enabled, another `netdata.conf` section
+is created, in the form of `[plugin:NAME]`, where `NAME` is the name of the external plugin.
+This section allows controlling the update frequency of the plugin and provide
+additional command line arguments to it.
+
+For example, for `apps.plugin` the following section is available:
+
+```
+[plugin:apps]
+ # update every = 1
+ # command options =
+```
+
+- `update every` controls the granularity of the external plugin.
+- `command options` allows giving additional command line options to the plugin.
+
+Netdata will provide to the external plugins the environment variable `NETDATA_UPDATE_EVERY`, in seconds (the default is 1). This is the **minimum update frequency** for all charts. A plugin that is updating values more frequently than this, is just wasting resources.
+
+Netdata will call the plugin with just one command line parameter: the number of seconds the user requested this plugin to update its data (by default is also 1).
+
+Other than the above, the plugin configuration is up to the plugin.
+
+Keep in mind, that the user may use Netdata configuration to overwrite chart and dimension parameters. This is transparent to the plugin.
+
+### Autoconfiguration
+
+Plugins should attempt to autoconfigure themselves when possible.
+
+For example, if your plugin wants to monitor `squid`, you can search for it on port `3128` or `8080`. If any succeeds, you can proceed. If it fails you can output an error (on stderr) saying that you cannot find `squid` running and giving instructions about the plugin configuration. Then you can stop (exit with non-zero value), so that Netdata will not attempt to start the plugin again.
+
+## External Plugins API
+
+Any program that can print a few values to its standard output can become a Netdata external plugin.
+
+Netdata parses lines starting with:
+
+- `CHART` - create or update a chart
+- `DIMENSION` - add or update a dimension to the chart just created
+- `VARIABLE` - define a variable (to be used in health calculations)
+- `CLABEL` - add a label to a chart
+- `CLABEL_COMMIT` - commit added labels to the chart
+- `FUNCTION` - define a function that can be called later to execute it
+- `BEGIN` - initialize data collection for a chart
+- `SET` - set the value of a dimension for the initialized chart
+- `END` - complete data collection for the initialized chart
+- `FLUSH` - ignore the last collected values
+- `DISABLE` - disable this plugin
+
+a single program can produce any number of charts with any number of dimensions each.
+
+Charts can be added any time (not just the beginning).
+
+### command line parameters
+
+The plugin **MUST** accept just **one** parameter: **the number of seconds it is
+expected to update the values for its charts**. The value passed by Netdata
+to the plugin is controlled via its configuration file (so there is no need
+for the plugin to handle this configuration option).
+
+The external plugin can overwrite the update frequency. For example, the server may
+request per second updates, but the plugin may ignore it and update its charts
+every 5 seconds.
+
+### environment variables
+
+There are a few environment variables that are set by `netdata` and are
+available for the plugin to use.
+
+|variable|description|
+|:------:|:----------|
+|`NETDATA_USER_CONFIG_DIR`|The directory where all Netdata-related user configuration should be stored. If the plugin requires custom user configuration, this is the place the user has saved it (normally under `/etc/netdata`).|
+|`NETDATA_STOCK_CONFIG_DIR`|The directory where all Netdata -related stock configuration should be stored. If the plugin is shipped with configuration files, this is the place they can be found (normally under `/usr/lib/netdata/conf.d`).|
+|`NETDATA_PLUGINS_DIR`|The directory where all Netdata plugins are stored.|
+|`NETDATA_USER_PLUGINS_DIRS`|The list of directories where custom plugins are stored.|
+|`NETDATA_WEB_DIR`|The directory where the web files of Netdata are saved.|
+|`NETDATA_CACHE_DIR`|The directory where the cache files of Netdata are stored. Use this directory if the plugin requires a place to store data. A new directory should be created for the plugin for this purpose, inside this directory.|
+|`NETDATA_LOG_DIR`|The directory where the log files are stored. By default the `stderr` output of the plugin will be saved in the `error.log` file of Netdata.|
+|`NETDATA_HOST_PREFIX`|This is used in environments where system directories like `/sys` and `/proc` have to be accessed at a different path.|
+|`NETDATA_DEBUG_FLAGS`|This is a number (probably in hex starting with `0x`), that enables certain Netdata debugging features. Check **\[[Tracing Options]]** for more information.|
+|`NETDATA_UPDATE_EVERY`|The minimum number of seconds between chart refreshes. This is like the **internal clock** of Netdata (it is user configurable, defaulting to `1`). There is no meaning for a plugin to update its values more frequently than this number of seconds.|
+
+### The output of the plugin
+
+The plugin should output instructions for Netdata to its output (`stdout`). Since this uses pipes, please make sure you flush stdout after every iteration.
+
+#### DISABLE
+
+`DISABLE` will disable this plugin. This will prevent Netdata from restarting the plugin. You can also exit with the value `1` to have the same effect.
+
+#### CHART
+
+`CHART` defines a new chart.
+
+the template is:
+
+> CHART type.id name title units \[family \[context \[charttype \[priority \[update_every \[options \[plugin [module]]]]]]]]
+
+ where:
+
+- `type.id`
+
+ uniquely identifies the chart,
+ this is what will be needed to add values to the chart
+
+ the `type` part controls the menu the charts will appear in
+
+- `name`
+
+ is the name that will be presented to the user instead of `id` in `type.id`. This means that only the `id` part of
+ `type.id` is changed. When a name has been given, the chart is indexed (and can be referred) as both `type.id` and
+ `type.name`. You can set name to `''`, or `null`, or `(null)` to disable it. If a chart with the same name already
+ exists, a serial number is automatically attached to the name to avoid naming collisions.
+
+- `title`
+
+ the text above the chart
+
+- `units`
+
+ the label of the vertical axis of the chart,
+ all dimensions added to a chart should have the same units
+ of measurement
+
+- `family`
+
+ is used to group charts together
+ (for example all eth0 charts should say: eth0),
+ if empty or missing, the `id` part of `type.id` will be used
+
+ this controls the sub-menu on the dashboard
+
+- `context`
+
+ the context is giving the template of the chart. For example, if multiple charts present the same information for a different family, they should have the same `context`
+
+ this is used for looking up rendering information for the chart (colors, sizes, informational texts) and also apply alarms to it
+
+- `charttype`
+
+ one of `line`, `area` or `stacked`,
+ if empty or missing, the `line` will be used
+
+- `priority`
+
+ is the relative priority of the charts as rendered on the web page,
+ lower numbers make the charts appear before the ones with higher numbers,
+ if empty or missing, `1000` will be used
+
+- `update_every`
+
+ overwrite the update frequency set by the server,
+ if empty or missing, the user configured value will be used
+
+- `options`
+
+ a space separated list of options, enclosed in quotes. 4 options are currently supported: `obsolete` to mark a chart as obsolete (Netdata will hide it and delete it after some time), `detail` to mark a chart as insignificant (this may be used by dashboards to make the charts smaller, or somehow visualize properly a less important chart), `store_first` to make Netdata store the first collected value, assuming there was an invisible previous value set to zero (this is used by statsd charts - if the first data collected value of incremental dimensions is not zero based, unrealistic spikes will appear with this option set) and `hidden` to perform all operations on a chart, but do not offer it on dashboards (the chart will be send to external databases). `CHART` options have been added in Netdata v1.7 and the `hidden` option was added in 1.10.
+
+- `plugin` and `module`
+
+ both are just names that are used to let the user identify the plugin and the module that generated the chart. If `plugin` is unset or empty, Netdata will automatically set the filename of the plugin that generated the chart. `module` has not default.
+
+#### DIMENSION
+
+`DIMENSION` defines a new dimension for the chart
+
+the template is:
+
+> DIMENSION id \[name \[algorithm \[multiplier \[divisor [options]]]]]
+
+ where:
+
+- `id`
+
+ the `id` of this dimension (it is a text value, not numeric),
+ this will be needed later to add values to the dimension
+
+ We suggest to avoid using `.` in dimension ids. External databases expect metrics to be `.` separated and people will get confused if a dimension id contains a dot.
+
+- `name`
+
+ the name of the dimension as it will appear at the legend of the chart,
+ if empty or missing the `id` will be used
+
+- `algorithm`
+
+ one of:
+
+ - `absolute`
+
+ the value is to drawn as-is (interpolated to second boundary),
+ if `algorithm` is empty, invalid or missing, `absolute` is used
+
+ - `incremental`
+
+ the value increases over time,
+ the difference from the last value is presented in the chart,
+ the server interpolates the value and calculates a per second figure
+
+ - `percentage-of-absolute-row`
+
+ the % of this value compared to the total of all dimensions
+
+ - `percentage-of-incremental-row`
+
+ the % of this value compared to the incremental total of
+ all dimensions
+
+- `multiplier`
+
+ an integer value to multiply the collected value,
+ if empty or missing, `1` is used
+
+- `divisor`
+
+ an integer value to divide the collected value,
+ if empty or missing, `1` is used
+
+- `options`
+
+ a space separated list of options, enclosed in quotes. Options supported: `obsolete` to mark a dimension as obsolete (Netdata will delete it after some time) and `hidden` to make this dimension hidden, it will take part in the calculations but will not be presented in the chart.
+
+#### VARIABLE
+
+> VARIABLE [SCOPE] name = value
+
+`VARIABLE` defines a variable that can be used in alarms. This is to used for setting constants (like the max connections a server may accept).
+
+Variables support 2 scopes:
+
+- `GLOBAL` or `HOST` to define the variable at the host level.
+- `LOCAL` or `CHART` to define the variable at the chart level. Use chart-local variables when the same variable may exist for different charts (i.e. Netdata monitors 2 mysql servers, and you need to set the `max_connections` each server accepts). Using chart-local variables is the ideal to build alarm templates.
+
+The position of the `VARIABLE` line, sets its default scope (in case you do not specify a scope). So, defining a `VARIABLE` before any `CHART`, or between `END` and `BEGIN` (outside any chart), sets `GLOBAL` scope, while defining a `VARIABLE` just after a `CHART` or a `DIMENSION`, or within the `BEGIN` - `END` block of a chart, sets `LOCAL` scope.
+
+These variables can be set and updated at any point.
+
+Variable names should use alphanumeric characters, the `.` and the `_`.
+
+The `value` is floating point (Netdata used `long double`).
+
+Variables are transferred to upstream Netdata servers (streaming and database replication).
+
+#### CLABEL
+
+> CLABEL name value source
+
+`CLABEL` defines a label used to organize and identify a chart.
+
+Name and value accept characters according to the following table:
+
+| Character | Symbol | Label Name | Label Value |
+|---------------------|:------:|:----------:|:-----------:|
+| UTF-8 character | UTF-8 | _ | keep |
+| Lower case letter | [a-z] | keep | keep |
+| Upper case letter | [A-Z] | keep | [a-z] |
+| Digit | [0-9] | keep | keep |
+| Underscore | _ | keep | keep |
+| Minus | - | keep | keep |
+| Plus | + | _ | keep |
+| Colon | : | _ | keep |
+| Semicolon | ; | _ | : |
+| Equal | = | _ | : |
+| Period | . | keep | keep |
+| Comma | , | . | . |
+| Slash | / | keep | keep |
+| Backslash | \ | / | / |
+| At | @ | _ | keep |
+| Space | ' ' | _ | keep |
+| Opening parenthesis | ( | _ | keep |
+| Closing parenthesis | ) | _ | keep |
+| Anything else | | _ | _ |
+
+The `source` is an integer field that can have the following values:
+- `1`: The value was set automatically.
+- `2`: The value was set manually.
+- `4`: This is a K8 label.
+- `8`: This is a label defined using `netdata` agent cloud link.
+
+#### CLABEL_COMMIT
+
+`CLABEL_COMMIT` indicates that all labels were defined and the chart can be updated.
+
+#### FUNCTION
+
+> FUNCTION [GLOBAL] "name and parameters of the function" timeout "help string for users"
+
+A function can be used by users to ask for more information from the collector. Netdata maintains a registry of functions in 2 levels:
+
+- per node
+- per chart
+
+Both node and chart functions are exactly the same, but chart functions allow Netdata to relate functions with charts and therefore present a context sensitive menu of functions related to the chart the user is using.
+
+A function is identified by a string. The allowed characters in the function definition are:
+
+| Character | Symbol | In Functions |
+|-------------------|:------:|:------------:|
+| UTF-8 character | UTF-8 | keep |
+| Lower case letter | [a-z] | keep |
+| Upper case letter | [A-Z] | keep |
+| Digit | [0-9] | keep |
+| Underscore | _ | keep |
+| Comma | , | keep |
+| Minus | - | keep |
+| Period | . | keep |
+| Colon | : | keep |
+| Slash | / | keep |
+| Space | ' ' | keep |
+| Semicolon | ; | : |
+| Equal | = | : |
+| Backslash | \ | / |
+| Anything else | | _ |
+
+Uses can get a list of all the registered functions using the `/api/v1/functions` end point of Netdata.
+
+Users can call functions using the `/api/v1/function` end point of Netdata.
+Once a function is called, the plugin will receive at its standard input a command that looks like this:
+
+> FUNCTION transaction_id timeout "name and parameters of the function"
+
+The plugin is expected to parse and validate `name and parameters of the function`. Netdata allows users to edit this string, append more parameters or even change the ones the plugin originally exposed. To minimize the security risk, Netdata guarantees that only the characters shown above are accepted in function definitions, but still the plugin should carefully inspect the `name and parameters of the function` to ensure that it is valid and not harmful.
+
+If the plugin rejects the request, it should respond with this:
+
+```
+FUNCTION_RESULT_BEGIN transaction_id 400 application/json
+{
+ "status": 400,
+ "error_message": "description of the rejection reasons"
+}
+FUNCTION_RESULT_END
+```
+
+If the plugin prepares a response, it should send (via its standard output, together with the collected data, but not interleaved with them):
+
+> FUNCTION_RESULT_BEGIN transaction_id http_error_code content_type expiration
+
+Where:
+
+ - `transaction_id` is the transaction id that Netdata sent for this function execution
+ - `http_error` is the http error code Netdata should respond with, 200 is the "ok" response
+ - `content_type` is the content type of the response
+ - `expiration` is the absolute timestamp (number, unix epoch) this response expires
+
+Immediately after this, all text is assumed to be the response content.
+The content is text and line oriented. The maximum line length accepted is 15kb. Longer lines will be truncated.
+The type of the context itself depends on the plugin and the UI.
+
+To terminate the message, Netdata seeks a line with just this:
+
+> FUNCTION_RESULT_END
+
+This defines the end of the message. `FUNCTION_RESULT_END` should appear in a line alone, without any other text, so it is wise to add `\n` before and after it.
+
+After this line, Netdata resumes processing collected metrics from the plugin.
+
+## Data collection
+
+data collection is defined as a series of `BEGIN` -> `SET` -> `END` lines
+
+> BEGIN type.id [microseconds]
+
+- `type.id`
+
+ is the unique identification of the chart (as given in `CHART`)
+
+- `microseconds`
+
+ is the number of microseconds since the last update of the chart. It is optional.
+
+ Under heavy system load, the system may have some latency transferring
+ data from the plugins to Netdata via the pipe. This number improves
+ accuracy significantly, since the plugin is able to calculate the
+ duration between its iterations better than Netdata.
+
+ The first time the plugin is started, no microseconds should be given
+ to Netdata.
+
+> SET id = value
+
+- `id`
+
+ is the unique identification of the dimension (of the chart just began)
+
+- `value`
+
+ is the collected value, only integer values are collected. If you want to push fractional values, multiply this value by 100 or 1000 and set the `DIMENSION` divider to 1000.
+
+> END
+
+ END does not take any parameters, it commits the collected values for all dimensions to the chart. If a dimensions was not `SET`, its value will be empty for this commit.
+
+More `SET` lines may appear to update all the dimensions of the chart.
+All of them in one `BEGIN` -> `END` block.
+
+All `SET` lines within a single `BEGIN` -> `END` block have to refer to the
+same chart.
+
+If more charts need to be updated, each chart should have its own
+`BEGIN` -> `SET` -> `END` block.
+
+If, for any reason, a plugin has issued a `BEGIN` but wants to cancel it,
+it can issue a `FLUSH`. The `FLUSH` command will instruct Netdata to ignore
+all the values collected since the last `BEGIN` command.
+
+If a plugin does not behave properly (outputs invalid lines, or does not
+follow these guidelines), will be disabled by Netdata.
+
+### collected values
+
+Netdata will collect any **signed** value in the 64bit range:
+`-9.223.372.036.854.775.808` to `+9.223.372.036.854.775.807`
+
+If a value is not collected, leave it empty, like this:
+
+`SET id =`
+
+or do not output the line at all.
+
+## Modular Plugins
+
+1. **python**, use `python.d.plugin`, there are many examples in the [python.d
+ directory](/collectors/python.d.plugin/README.md)
+
+ python is ideal for Netdata plugins. It is a simple, yet powerful way to collect data, it has a very small memory footprint, although it is not the most CPU efficient way to do it.
+
+2. **BASH**, use `charts.d.plugin`, there are many examples in the [charts.d
+ directory](/collectors/charts.d.plugin/README.md)
+
+ BASH is the simplest scripting language for collecting values. It is the less efficient though in terms of CPU resources. You can use it to collect data quickly, but extensive use of it might use a lot of system resources.
+
+3. **C**
+
+ Of course, C is the most efficient way of collecting data. This is why Netdata itself is written in C.
+
+## Writing Plugins Properly
+
+There are a few rules for writing plugins properly:
+
+1. Respect system resources
+
+ Pay special attention to efficiency:
+
+ - Initialize everything once, at the beginning. Initialization is not an expensive operation. Your plugin will most probably be started once and run forever. So, do whatever heavy operation is needed at the beginning, just once.
+ - Do the absolutely minimum while iterating to collect values repeatedly.
+ - If you need to connect to another server to collect values, avoid re-connects if possible. Connect just once, with keep-alive (for HTTP) enabled and collect values using the same connection.
+ - Avoid any CPU or memory heavy operation while collecting data. If you control memory allocation, avoid any memory allocation while iterating to collect values.
+ - Avoid running external commands when possible. If you are writing shell scripts avoid especially pipes (each pipe is another fork, a very expensive operation).
+
+2. The best way to iterate at a constant pace is this pseudo code:
+
+```js
+ var update_every = argv[1] * 1000; /* seconds * 1000 = milliseconds */
+
+ readConfiguration();
+
+ if(!verifyWeCanCollectValues()) {
+ print("DISABLE");
+ exit(1);
+ }
+
+ createCharts(); /* print CHART and DIMENSION statements */
+
+ var loops = 0;
+ var last_run = 0;
+ var next_run = 0;
+ var dt_since_last_run = 0;
+ var now = 0;
+
+ while(true) {
+ /* find the current time in milliseconds */
+ now = currentTimeStampInMilliseconds();
+
+ /*
+ * find the time of the next loop
+ * this makes sure we are always aligned
+ * with the Netdata daemon
+ */
+ next_run = now - (now % update_every) + update_every;
+
+ /*
+ * wait until it is time
+ * it is important to do it in a loop
+ * since many wait functions can be interrupted
+ */
+ while( now < next_run ) {
+ sleepMilliseconds(next_run - now);
+ now = currentTimeStampInMilliseconds();
+ }
+
+ /* calculate the time passed since the last run */
+ if ( loops > 0 )
+ dt_since_last_run = (now - last_run) * 1000; /* in microseconds */
+
+ /* prepare for the next loop */
+ last_run = now;
+ loops++;
+
+ /* do your magic here to collect values */
+ collectValues();
+
+ /* send the collected data to Netdata */
+ printValues(dt_since_last_run); /* print BEGIN, SET, END statements */
+ }
+```
+
+ Using the above procedure, your plugin will be synchronized to start data collection on steps of `update_every`. There will be no need to keep track of latencies in data collection.
+
+ Netdata interpolates values to second boundaries, so even if your plugin is not perfectly aligned it does not matter. Netdata will find out. When your plugin works in increments of `update_every`, there will be no gaps in the charts due to the possible cumulative micro-delays in data collection. Gaps will only appear if the data collection is really delayed.
+
+3. If you are not sure of memory leaks, exit every one hour. Netdata will re-start your process.
+
+4. If possible, try to autodetect if your plugin should be enabled, without any configuration.
+
+
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
new file mode 100644
index 0000000..79abc70
--- /dev/null
+++ b/collectors/plugins.d/plugins_d.c
@@ -0,0 +1,290 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "plugins_d.h"
+#include "pluginsd_parser.h"
+
+char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL };
+struct plugind *pluginsd_root = NULL;
+
+inline size_t pluginsd_initialize_plugin_directories()
+{
+ char plugins_dirs[(FILENAME_MAX * 2) + 1];
+ static char *plugins_dir_list = NULL;
+
+ // Get the configuration entry
+ if (likely(!plugins_dir_list)) {
+ snprintfz(plugins_dirs, FILENAME_MAX * 2, "\"%s\" \"%s/custom-plugins.d\"", PLUGINS_DIR, CONFIG_DIR);
+ plugins_dir_list = strdupz(config_get(CONFIG_SECTION_DIRECTORIES, "plugins", plugins_dirs));
+ }
+
+ // Parse it and store it to plugin directories
+ return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0);
+}
+
+static void pluginsd_worker_thread_cleanup(void *arg)
+{
+ struct plugind *cd = (struct plugind *)arg;
+
+ if (cd->enabled && !cd->obsolete) {
+ cd->obsolete = 1;
+
+ info("data collection thread exiting");
+
+ if (cd->pid) {
+ siginfo_t info;
+ info("killing child process pid %d", cd->pid);
+ if (killpid(cd->pid) != -1) {
+ info("waiting for child process pid %d to exit...", cd->pid);
+ waitid(P_PID, (id_t)cd->pid, &info, WEXITED);
+ }
+ cd->pid = 0;
+ }
+ }
+}
+
+#define SERIAL_FAILURES_THRESHOLD 10
+static void pluginsd_worker_thread_handle_success(struct plugind *cd)
+{
+ if (likely(cd->successful_collections)) {
+ sleep((unsigned int)cd->update_every);
+ return;
+ }
+
+ if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) {
+ info(
+ "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
+ cd->fullfilename, cd->pid,
+ cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+ sleep((unsigned int)(cd->update_every * 10));
+ return;
+ }
+
+ if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
+ error(
+ "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)."
+ "We have tried to collect something %zu times - unsuccessfully. Disabling it.",
+ cd->fullfilename, cd->pid, cd->serial_failures);
+ cd->enabled = 0;
+ return;
+ }
+
+ return;
+}
+
+static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code)
+{
+ if (worker_ret_code == -1) {
+ info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->pid);
+ cd->enabled = 0;
+ return;
+ }
+
+ if (!cd->successful_collections) {
+ error(
+ "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename,
+ cd->pid, worker_ret_code);
+ cd->enabled = 0;
+ return;
+ }
+
+ if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) {
+ error(
+ "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
+ cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections,
+ cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
+ sleep((unsigned int)(cd->update_every * 10));
+ return;
+ }
+
+ if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
+ error(
+ "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
+ "We tried to restart it %zu times, but it failed to generate data. Disabling it.",
+ cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, cd->serial_failures);
+ cd->enabled = 0;
+ return;
+ }
+
+ return;
+}
+#undef SERIAL_FAILURES_THRESHOLD
+
+void *pluginsd_worker_thread(void *arg)
+{
+ worker_register("PLUGINSD");
+
+ netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg);
+
+ struct plugind *cd = (struct plugind *)arg;
+
+ cd->obsolete = 0;
+ size_t count = 0;
+
+ while (!netdata_exit) {
+ FILE *fp_child_input = NULL;
+ FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input);
+ if (unlikely(!fp_child_input || !fp_child_output)) {
+ error("Cannot popen(\"%s\", \"r\").", cd->cmd);
+ break;
+ }
+
+ info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid);
+ count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0);
+ error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count);
+ killpid(cd->pid);
+
+ int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->pid);
+
+ if (likely(worker_ret_code == 0))
+ pluginsd_worker_thread_handle_success(cd);
+ else
+ pluginsd_worker_thread_handle_error(cd, worker_ret_code);
+
+ cd->pid = 0;
+ if (unlikely(!cd->enabled))
+ break;
+ }
+ worker_unregister();
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+static void pluginsd_main_cleanup(void *data)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+ info("cleaning up...");
+
+ struct plugind *cd;
+ for (cd = pluginsd_root; cd; cd = cd->next) {
+ if (cd->enabled && !cd->obsolete) {
+ info("stopping plugin thread: %s", cd->id);
+ netdata_thread_cancel(cd->thread);
+ }
+ }
+
+ info("cleanup completed.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+
+ worker_unregister();
+}
+
+void *pluginsd_main(void *ptr)
+{
+ netdata_thread_cleanup_push(pluginsd_main_cleanup, ptr);
+
+ int automatic_run = config_get_boolean(CONFIG_SECTION_PLUGINS, "enable running new plugins", 1);
+ int scan_frequency = (int)config_get_number(CONFIG_SECTION_PLUGINS, "check for new plugins every", 60);
+ if (scan_frequency < 1)
+ scan_frequency = 1;
+
+ // disable some plugins by default
+ config_get_boolean(CONFIG_SECTION_PLUGINS, "slabinfo", CONFIG_BOOLEAN_NO);
+
+ // store the errno for each plugins directory
+ // so that we don't log broken directories on each loop
+ int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 };
+
+ while (!netdata_exit) {
+ int idx;
+ const char *directory_name;
+
+ for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) {
+ if (unlikely(netdata_exit))
+ break;
+
+ errno = 0;
+ DIR *dir = opendir(directory_name);
+ if (unlikely(!dir)) {
+ if (directory_errors[idx] != errno) {
+ directory_errors[idx] = errno;
+ error("cannot open plugins directory '%s'", directory_name);
+ }
+ continue;
+ }
+
+ struct dirent *file = NULL;
+ while (likely((file = readdir(dir)))) {
+ if (unlikely(netdata_exit))
+ break;
+
+ debug(D_PLUGINSD, "examining file '%s'", file->d_name);
+
+ if (unlikely(strcmp(file->d_name, ".") == 0 || strcmp(file->d_name, "..") == 0))
+ continue;
+
+ int len = (int)strlen(file->d_name);
+ if (unlikely(len <= (int)PLUGINSD_FILE_SUFFIX_LEN))
+ continue;
+ if (unlikely(strcmp(PLUGINSD_FILE_SUFFIX, &file->d_name[len - (int)PLUGINSD_FILE_SUFFIX_LEN]) != 0)) {
+ debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX);
+ continue;
+ }
+
+ char pluginname[CONFIG_MAX_NAME + 1];
+ snprintfz(pluginname, CONFIG_MAX_NAME, "%.*s", (int)(len - PLUGINSD_FILE_SUFFIX_LEN), file->d_name);
+ int enabled = config_get_boolean(CONFIG_SECTION_PLUGINS, pluginname, automatic_run);
+
+ if (unlikely(!enabled)) {
+ debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name);
+ continue;
+ }
+
+ // check if it runs already
+ struct plugind *cd;
+ for (cd = pluginsd_root; cd; cd = cd->next)
+ if (unlikely(strcmp(cd->filename, file->d_name) == 0))
+ break;
+
+ if (likely(cd && !cd->obsolete)) {
+ debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename);
+ continue;
+ }
+
+ // it is not running
+ // allocate a new one, or use the obsolete one
+ if (unlikely(!cd)) {
+ cd = callocz(sizeof(struct plugind), 1);
+
+ snprintfz(cd->id, CONFIG_MAX_NAME, "plugin:%s", pluginname);
+
+ strncpyz(cd->filename, file->d_name, FILENAME_MAX);
+ snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename);
+
+ cd->enabled = enabled;
+ cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every);
+ cd->started_t = now_realtime_sec();
+
+ char *def = "";
+ snprintfz(
+ cd->cmd, PLUGINSD_CMD_MAX, "exec %s %d %s", cd->fullfilename, cd->update_every,
+ config_get(cd->id, "command options", def));
+
+ // link it
+ if (likely(pluginsd_root))
+ cd->next = pluginsd_root;
+ pluginsd_root = cd;
+
+ // it is not currently running
+ cd->obsolete = 1;
+
+ if (cd->enabled) {
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PLUGINSD[%s]", pluginname);
+ // spawn a new thread for it
+ netdata_thread_create(
+ &cd->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, pluginsd_worker_thread, cd);
+ }
+ }
+ }
+
+ closedir(dir);
+ }
+
+ sleep((unsigned int)scan_frequency);
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
new file mode 100644
index 0000000..a8acf03
--- /dev/null
+++ b/collectors/plugins.d/plugins_d.h
@@ -0,0 +1,103 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINS_D_H
+#define NETDATA_PLUGINS_D_H 1
+
+#include "daemon/common.h"
+
+#define PLUGINSD_FILE_SUFFIX ".plugin"
+#define PLUGINSD_FILE_SUFFIX_LEN strlen(PLUGINSD_FILE_SUFFIX)
+#define PLUGINSD_CMD_MAX (FILENAME_MAX*2)
+#define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0
+
+#define PLUGINSD_KEYWORD_CHART "CHART"
+#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END"
+#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION"
+#define PLUGINSD_KEYWORD_BEGIN "BEGIN"
+#define PLUGINSD_KEYWORD_SET "SET"
+#define PLUGINSD_KEYWORD_END "END"
+#define PLUGINSD_KEYWORD_FLUSH "FLUSH"
+#define PLUGINSD_KEYWORD_DISABLE "DISABLE"
+#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
+#define PLUGINSD_KEYWORD_LABEL "LABEL"
+#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
+#define PLUGINSD_KEYWORD_CLABEL "CLABEL"
+#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT"
+#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION"
+#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN"
+#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END"
+
+#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART"
+#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN"
+#define PLUGINSD_KEYWORD_REPLAY_SET "RSET"
+#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_END "REND"
+
+#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
+
+#define PLUGINSD_LINE_MAX_SSL_READ 512
+
+#define PLUGINSD_MAX_WORDS 20
+
+#define PLUGINSD_MAX_DIRECTORIES 20
+extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES];
+
+struct plugind {
+ char id[CONFIG_MAX_NAME+1]; // config node id
+
+ char filename[FILENAME_MAX+1]; // just the filename
+ char fullfilename[FILENAME_MAX+1]; // with path
+ char cmd[PLUGINSD_CMD_MAX+1]; // the command that it executes
+
+ volatile pid_t pid;
+ netdata_thread_t thread;
+
+ size_t successful_collections; // the number of times we have seen
+ // values collected from this plugin
+
+ size_t serial_failures; // the number of times the plugin started
+ // without collecting values
+
+ int update_every; // the plugin default data collection frequency
+ volatile sig_atomic_t obsolete; // do not touch this structure after setting this to 1
+ volatile sig_atomic_t enabled; // if this is enabled or not
+
+ time_t started_t;
+ uint32_t capabilities; // follows the same principles as streaming capabilities
+ struct plugind *next;
+};
+
+extern struct plugind *pluginsd_root;
+
+size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations);
+
+size_t pluginsd_initialize_plugin_directories();
+
+
+
+#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \
+ buffer_sprintf(wb \
+ , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
+ , (transaction) ? (transaction) : "" \
+ , (int)(code) \
+ , (content_type) ? (content_type) : "" \
+ , (long int)(expires) \
+ )
+
+#define pluginsd_function_result_end_to_buffer(wb) \
+ buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
+
+#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \
+ fprintf(stdout \
+ , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
+ , (transaction) ? (transaction) : "" \
+ , (int)(code) \
+ , (content_type) ? (content_type) : "" \
+ , (long int)(expires) \
+ )
+
+#define pluginsd_function_result_end_to_stdout() \
+ fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
+
+#endif /* NETDATA_PLUGINS_D_H */
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
new file mode 100644
index 0000000..5501c12
--- /dev/null
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -0,0 +1,1360 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pluginsd_parser.h"
+
+#define LOG_FUNCTIONS false
+
+static int send_to_plugin(const char *txt, void *data) {
+ PARSER *parser = data;
+
+ if(!txt || !*txt)
+ return 0;
+
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl *ssl = parser->ssl_output;
+ if(ssl) {
+ if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt));
+
+ error("PLUGINSD: cannot send command (SSL)");
+ return -1;
+ }
+#endif
+
+ if(parser->fp_output) {
+ int bytes = fprintf(parser->fp_output, "%s", txt);
+ if(bytes <= 0) {
+ error("PLUGINSD: cannot send command (FILE)");
+ return -2;
+ }
+ fflush(parser->fp_output);
+ return bytes;
+ }
+
+ if(parser->fd != -1) {
+ size_t bytes = 0;
+ size_t total = strlen(txt);
+ ssize_t sent;
+
+ do {
+ sent = write(parser->fd, &txt[bytes], total - bytes);
+ if(sent <= 0) {
+ error("PLUGINSD: cannot send command (fd)");
+ return -3;
+ }
+ bytes += sent;
+ }
+ while(bytes < total);
+
+ return (int)bytes;
+ }
+
+ error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
+ return -4;
+}
+
+static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) {
+ RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+ if(unlikely(!host))
+ error("PLUGINSD: command %s requires a host, but is not set.", cmd);
+
+ return host;
+}
+
+static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) {
+ RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+
+ if(unlikely(!st))
+ error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
+
+ return st;
+}
+
+static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
+ if (unlikely(!dimension || !*dimension)) {
+ error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
+ }
+
+ RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+
+ if (unlikely(!rda))
+ error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
+
+ return rda;
+}
+
+static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
+ if (unlikely(!chart || !*chart)) {
+ error("PLUGINSD: 'host:%s' got a %s without a chart id.",
+ rrdhost_hostname(host), cmd);
+ return NULL;
+ }
+
+ RRDSET *st = rrdset_find(host, chart);
+ if (unlikely(!st))
+ error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
+ rrdhost_hostname(host), chart, cmd);
+
+ return st;
+}
+
+static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) {
+ ((PARSER_USER_OBJECT *) user)->enabled = 0;
+ return PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
+{
+ char *dimension = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
+ if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+
+ if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+ debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
+ rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
+
+ if (value && *value)
+ rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+
+ rrddim_acquired_release(rda);
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
+{
+ char *id = get_word(words, num_words, 1);
+ char *microseconds_txt = get_word(words, num_words, 2);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ ((PARSER_USER_OBJECT *)user)->st = st;
+
+ usec_t microseconds = 0;
+ if (microseconds_txt && *microseconds_txt)
+ microseconds = str2ull(microseconds_txt);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if(st->replay.log_next_data_collection) {
+ st->replay.log_next_data_collection = false;
+
+ internal_error(true,
+ "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
+ rrdhost_hostname(host), rrdset_id(st),
+ st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
+ st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
+ microseconds
+ );
+ }
+#endif
+
+ if (likely(st->counter_done)) {
+ if (likely(microseconds)) {
+ if (((PARSER_USER_OBJECT *)user)->trust_durations)
+ rrdset_next_usec_unfiltered(st, microseconds);
+ else
+ rrdset_next_usec(st, microseconds);
+ }
+ else
+ rrdset_next(st);
+ }
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
+{
+ UNUSED(words);
+ UNUSED(num_words);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+ debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
+
+ ((PARSER_USER_OBJECT *) user)->st = NULL;
+ ((PARSER_USER_OBJECT *) user)->count++;
+
+ struct timeval now;
+ now_realtime_timeval(&now);
+ rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
+{
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ char *type = get_word(words, num_words, 1);
+ char *name = get_word(words, num_words, 2);
+ char *title = get_word(words, num_words, 3);
+ char *units = get_word(words, num_words, 4);
+ char *family = get_word(words, num_words, 5);
+ char *context = get_word(words, num_words, 6);
+ char *chart = get_word(words, num_words, 7);
+ char *priority_s = get_word(words, num_words, 8);
+ char *update_every_s = get_word(words, num_words, 9);
+ char *options = get_word(words, num_words, 10);
+ char *plugin = get_word(words, num_words, 11);
+ char *module = get_word(words, num_words, 12);
+
+ // parse the id from type
+ char *id = NULL;
+ if (likely(type && (id = strchr(type, '.')))) {
+ *id = '\0';
+ id++;
+ }
+
+ // make sure we have the required variables
+ if (unlikely((!type || !*type || !id || !*id))) {
+ error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.",
+ rrdhost_hostname(host));
+
+ ((PARSER_USER_OBJECT *) user)->enabled = 0;
+ return PARSER_RC_ERROR;
+ }
+
+ // parse the name, and make sure it does not include 'type.'
+ if (unlikely(name && *name)) {
+ // when data are streamed from child nodes
+ // name will be type.name
+ // so we have to remove 'type.' from name too
+ size_t len = strlen(type);
+ if (strncmp(type, name, len) == 0 && name[len] == '.')
+ name = &name[len + 1];
+
+ // if the name is the same with the id,
+ // or is just 'NULL', clear it.
+ if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
+ name = NULL;
+ }
+
+ int priority = 1000;
+ if (likely(priority_s && *priority_s))
+ priority = str2i(priority_s);
+
+ int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
+ if (likely(update_every_s && *update_every_s))
+ update_every = str2i(update_every_s);
+ if (unlikely(!update_every))
+ update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
+
+ RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
+ if (unlikely(chart))
+ chart_type = rrdset_type_id(chart);
+
+ if (unlikely(name && !*name))
+ name = NULL;
+ if (unlikely(family && !*family))
+ family = NULL;
+ if (unlikely(context && !*context))
+ context = NULL;
+ if (unlikely(!title))
+ title = "";
+ if (unlikely(!units))
+ units = "unknown";
+
+ debug(
+ D_PLUGINSD,
+ "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
+ type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
+ priority, update_every);
+
+ RRDSET *st = NULL;
+
+ st = rrdset_create(
+ host, type, id, name, family, context, title, units,
+ (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename,
+ module, priority, update_every,
+ chart_type);
+
+ if (likely(st)) {
+ if (options && *options) {
+ if (strstr(options, "obsolete"))
+ rrdset_is_obsolete(st);
+ else
+ rrdset_isnot_obsolete(st);
+
+ if (strstr(options, "detail"))
+ rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+
+ if (strstr(options, "hidden"))
+ rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
+
+ if (strstr(options, "store_first"))
+ rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
+ } else {
+ rrdset_isnot_obsolete(st);
+ rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+ rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
+ }
+ }
+ ((PARSER_USER_OBJECT *)user)->st = st;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user)
+{
+ const char *first_entry_txt = get_word(words, num_words, 1);
+ const char *last_entry_txt = get_word(words, num_words, 2);
+ const char *world_time_txt = get_word(words, num_words, 3);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
+ time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
+ time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec();
+
+ if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0))
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).",
+ rrdhost_hostname(host), rrdset_id(st),
+ first_entry_child, last_entry_child, child_world_time);
+
+ bool ok = true;
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
+
+ PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
+ ok = replicate_chart_request(send_to_plugin, parser, host, st,
+ first_entry_child, last_entry_child, child_world_time,
+ 0, 0);
+ }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ else {
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ }
+#endif
+
+ return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
+{
+ char *id = get_word(words, num_words, 1);
+ char *name = get_word(words, num_words, 2);
+ char *algorithm = get_word(words, num_words, 3);
+ char *multiplier_s = get_word(words, num_words, 4);
+ char *divisor_s = get_word(words, num_words, 5);
+ char *options = get_word(words, num_words, 6);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ if (unlikely(!id)) {
+ error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.",
+ rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET");
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) {
+ error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.",
+ rrdhost_hostname(host));
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ long multiplier = 1;
+ if (multiplier_s && *multiplier_s) {
+ multiplier = strtol(multiplier_s, NULL, 0);
+ if (unlikely(!multiplier))
+ multiplier = 1;
+ }
+
+ long divisor = 1;
+ if (likely(divisor_s && *divisor_s)) {
+ divisor = strtol(divisor_s, NULL, 0);
+ if (unlikely(!divisor))
+ divisor = 1;
+ }
+
+ if (unlikely(!algorithm || !*algorithm))
+ algorithm = "absolute";
+
+ if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+ debug(
+ D_PLUGINSD,
+ "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
+ rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
+ options ? options : "");
+
+ RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
+ int unhide_dimension = 1;
+
+ rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ if (options && *options) {
+ if (strstr(options, "obsolete") != NULL)
+ rrddim_is_obsolete(st, rd);
+ else
+ rrddim_isnot_obsolete(st, rd);
+
+ unhide_dimension = !strstr(options, "hidden");
+
+ if (strstr(options, "noreset") != NULL)
+ rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ if (strstr(options, "nooverflow") != NULL)
+ rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ } else
+ rrddim_isnot_obsolete(st, rd);
+
+ if (likely(unhide_dimension)) {
+ rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
+ if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
+ rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
+ metaqueue_dimension_update_flags(rd);
+ }
+ }
+ else {
+ rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
+ if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
+ rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
+ metaqueue_dimension_update_flags(rd);
+ }
+ }
+
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+// execution of functions
+
+struct inflight_function {
+ int code;
+ int timeout;
+ BUFFER *destination_wb;
+ STRING *function;
+ void (*callback)(BUFFER *wb, int code, void *callback_data);
+ void *callback_data;
+ usec_t timeout_ut;
+ usec_t started_ut;
+ usec_t sent_ut;
+};
+
+static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
+ struct inflight_function *pf = func;
+
+ PARSER *parser = parser_ptr;
+
+ // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
+ pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
+
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
+ dictionary_acquired_item_name(item),
+ pf->timeout,
+ string2str(pf->function));
+
+ // send the command to the plugin
+ int ret = send_to_plugin(buffer, parser);
+
+ pf->sent_ut = now_realtime_usec();
+
+ if(ret < 0) {
+ error("FUNCTION: failed to send function to plugin, error %d", ret);
+ rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
+ }
+ else {
+ internal_error(LOG_FUNCTIONS,
+ "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item), ret,
+ pf->sent_ut - pf->started_ut);
+ }
+}
+
+static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
+ struct inflight_function *pf = new_func;
+
+ error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
+ pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
+ pf->callback(pf->destination_wb, pf->code, pf->callback_data);
+ string_freez(pf->function);
+
+ return false;
+}
+
+static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
+ struct inflight_function *pf = func;
+
+ internal_error(LOG_FUNCTIONS,
+ "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item),
+ buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
+
+ pf->callback(pf->destination_wb, pf->code, pf->callback_data);
+ string_freez(pf->function);
+}
+
+void inflight_functions_init(PARSER *parser) {
+ parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
+ dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
+ dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
+}
+
+static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
+ parser->inflight.smaller_timeout = 0;
+ struct inflight_function *pf;
+ dfe_start_write(parser->inflight.functions, pf) {
+ if (pf->timeout_ut < now) {
+ internal_error(true,
+ "FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
+ string2str(pf->function), pf_dfe.name, now - pf->started_ut);
+
+ if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
+ pf->code = rrd_call_function_error(pf->destination_wb,
+ "Timeout waiting for collector response.",
+ HTTP_RESP_GATEWAY_TIMEOUT);
+
+ dictionary_del(parser->inflight.functions, pf_dfe.name);
+ }
+
+ else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = pf->timeout_ut;
+ }
+ dfe_done(pf);
+}
+
+// this is the function that is called from
+// rrd_call_function_and_wait() and rrd_call_function_async()
+static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
+ PARSER *parser = collector_data;
+
+ usec_t now = now_realtime_usec();
+
+ struct inflight_function tmp = {
+ .started_ut = now,
+ .timeout_ut = now + timeout * USEC_PER_SEC,
+ .destination_wb = destination_wb,
+ .timeout = timeout,
+ .function = string_strdupz(function),
+ .callback = callback,
+ .callback_data = callback_data,
+ };
+
+ uuid_t uuid;
+ uuid_generate_time(uuid);
+
+ char key[UUID_STR_LEN];
+ uuid_unparse_lower(uuid, key);
+
+ dictionary_write_lock(parser->inflight.functions);
+
+ // if there is any error, our dictionary callbacks will call the caller callback to notify
+ // the caller about the error - no need for error handling here.
+ dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
+
+ if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = tmp.timeout_ut;
+
+ // garbage collect stale inflight functions
+ if(parser->inflight.smaller_timeout < now)
+ inflight_functions_garbage_collect(parser, now);
+
+ dictionary_write_unlock(parser->inflight.functions);
+
+ return HTTP_RESP_OK;
+}
+
+PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
+{
+ bool global = false;
+ size_t i = 1;
+ if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
+ i++;
+ global = true;
+ }
+
+ char *name = get_word(words, num_words, i++);
+ char *timeout_s = get_word(words, num_words, i++);
+ char *help = get_word(words, num_words, i++);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION);
+ if(!host) return PARSER_RC_ERROR;
+
+ RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
+ if(!st) global = true;
+
+ if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
+ error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
+ rrdhost_hostname(host),
+ st?rrdset_id(st):"(unset)",
+ global?"yes":"no",
+ name?name:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ help?help:"(unset)"
+ );
+ return PARSER_RC_ERROR;
+ }
+
+ int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+ if (timeout_s && *timeout_s) {
+ timeout = str2i(timeout_s);
+ if (unlikely(timeout <= 0))
+ timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+ }
+
+ PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
+ rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
+
+ return PARSER_RC_OK;
+}
+
+static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
+ STRING *key = action_data;
+ if(key)
+ dictionary_del(parser->inflight.functions, string2str(key));
+ string_freez(key);
+}
+
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user)
+{
+ char *key = get_word(words, num_words, 1);
+ char *status = get_word(words, num_words, 2);
+ char *format = get_word(words, num_words, 3);
+ char *expires = get_word(words, num_words, 4);
+
+ if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
+ error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
+ , key ? key : "(unset)"
+ , status ? status : "(unset)"
+ , format ? format : "(unset)"
+ , expires ? expires : "(unset)"
+ );
+ }
+
+ int code = (status && *status) ? str2i(status) : 0;
+ if (code <= 0)
+ code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
+
+ time_t expiration = (expires && *expires) ? str2l(expires) : 0;
+
+ PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
+
+ struct inflight_function *pf = NULL;
+
+ if(key && *key)
+ pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
+
+ if(!pf) {
+ error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
+ }
+ else {
+ if(format && *format)
+ pf->destination_wb->contenttype = functions_format_to_content_type(format);
+
+ pf->code = code;
+
+ pf->destination_wb->expires = expiration;
+ if(expiration <= now_realtime_sec())
+ buffer_no_cacheable(pf->destination_wb);
+ else
+ buffer_cacheable(pf->destination_wb);
+ }
+
+ parser->defer.response = (pf) ? pf->destination_wb : NULL;
+ parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
+ parser->defer.action = pluginsd_function_result_end;
+ parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
+ parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
+
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+
+PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
+{
+ char *name = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
+ NETDATA_DOUBLE v;
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+
+ int global = (st) ? 0 : 1;
+
+ if (name && *name) {
+ if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
+ global = 1;
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
+ } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
+ global = 0;
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
+ }
+ }
+
+ if (unlikely(!name || !*name)) {
+ error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.",
+ rrdhost_hostname(host), st ? rrdset_id(st):"UNSET");
+
+ ((PARSER_USER_OBJECT *)user)->enabled = 0;
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ if (unlikely(!value || !*value))
+ value = NULL;
+
+ if (unlikely(!value)) {
+ error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ (global) ? "HOST" : "CHART",
+ name);
+ return PARSER_RC_OK;
+ }
+
+ if (!global && !st) {
+ error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ name
+ );
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ char *endptr = NULL;
+ v = (NETDATA_DOUBLE)str2ndd(value, &endptr);
+ if (unlikely(endptr && *endptr)) {
+ if (endptr == value)
+ error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name);
+ else
+ error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name,
+ endptr);
+ }
+
+ if (global) {
+ const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
+ if (rva) {
+ rrdvar_custom_host_variable_set(host, rva, v);
+ rrdvar_custom_host_variable_release(host, rva);
+ }
+ else
+ error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
+ rrdhost_hostname(host),
+ name);
+ } else {
+ const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
+ if (rsa) {
+ rrdsetvar_custom_chart_variable_set(st, rsa, v);
+ rrdsetvar_custom_chart_variable_release(st, rsa);
+ }
+ else
+ error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
+ rrdhost_hostname(host), rrdset_id(st), name);
+ }
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
+{
+ debug(D_PLUGINSD, "requested a FLUSH");
+ ((PARSER_USER_OBJECT *) user)->st = NULL;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
+{
+ info("PLUGINSD: plugin called DISABLE. Disabling it.");
+ ((PARSER_USER_OBJECT *) user)->enabled = 0;
+ return PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
+{
+ const char *name = get_word(words, num_words, 1);
+ const char *label_source = get_word(words, num_words, 2);
+ const char *value = get_word(words, num_words, 3);
+
+ if (!name || !label_source || !value) {
+ error("PLUGINSD: ignoring malformed or empty LABEL command.");
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ char *store = (char *)value;
+ bool allocated_store = false;
+
+ if(unlikely(num_words > 4)) {
+ allocated_store = true;
+ store = mallocz(PLUGINSD_LINE_MAX + 1);
+ size_t remaining = PLUGINSD_LINE_MAX;
+ char *move = store;
+ char *word;
+ for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
+ if(i > 3) {
+ *move++ = ' ';
+ *move = '\0';
+ remaining--;
+ }
+
+ size_t length = strlen(word);
+ if (length > remaining)
+ length = remaining;
+
+ remaining -= length;
+ memcpy(move, word, length);
+ move += length;
+ *move = '\0';
+ }
+ }
+
+ if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
+ ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
+
+ rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
+ name,
+ store,
+ str2l(label_source));
+
+ if (allocated_store)
+ freez(store);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
+{
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ debug(D_PLUGINSD, "requested to OVERWRITE host labels");
+
+ if(unlikely(!host->rrdlabels))
+ host->rrdlabels = rrdlabels_create();
+
+ rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
+ metaqueue_store_host_labels(host->machine_guid);
+
+ rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
+ ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
+ return PARSER_RC_OK;
+}
+
+
+PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
+{
+ const char *name = get_word(words, num_words, 1);
+ const char *value = get_word(words, num_words, 2);
+ const char *label_source = get_word(words, num_words, 3);
+
+ if (!name || !value || !*label_source) {
+ error("Ignoring malformed or empty CHART LABEL command.");
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
+ ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels;
+ rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
+ }
+
+ rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
+ name, value, str2l(label_source));
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
+{
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ debug(D_PLUGINSD, "requested to commit chart labels");
+
+ if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
+ error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
+ rrdhost_hostname(host));
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
+
+ rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
+ rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
+
+ ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL;
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user)
+{
+ char *id = get_word(words, num_words, 1);
+ char *start_time_str = get_word(words, num_words, 2);
+ char *end_time_str = get_word(words, num_words, 3);
+ char *child_now_str = get_word(words, num_words, 4);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st;
+ if (likely(!id || !*id))
+ st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ else
+ st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ ((PARSER_USER_OBJECT *) user)->st = st;
+
+ if(start_time_str && end_time_str) {
+ time_t start_time = (time_t)str2ul(start_time_str);
+ time_t end_time = (time_t)str2ul(end_time_str);
+
+ time_t wall_clock_time = 0, tolerance;
+ bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
+ if(child_now_str) {
+ wall_clock_time = (time_t)str2ul(child_now_str);
+ tolerance = st->update_every + 1;
+ wall_clock_comes_from_child = true;
+ }
+
+ if(wall_clock_time <= 0) {
+ wall_clock_time = now_realtime_sec();
+ tolerance = st->update_every + 5;
+ wall_clock_comes_from_child = false;
+ }
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(
+ (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
+ "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
+
+ internal_error(
+ true,
+ "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
+ st->replay.after, st->replay.before);
+#endif
+
+ if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
+ if (unlikely(end_time - start_time != st->update_every))
+ rrdset_set_update_every(st, end_time - start_time);
+
+ st->last_collected_time.tv_sec = end_time;
+ st->last_collected_time.tv_usec = 0;
+
+ st->last_updated.tv_sec = end_time;
+ st->last_updated.tv_usec = 0;
+
+ st->counter++;
+ st->counter_done++;
+
+ // these are only needed for db mode RAM, SAVE, MAP, ALLOC
+ st->current_entry++;
+ if(st->current_entry >= st->entries)
+ st->current_entry -= st->entries;
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true;
+
+ return PARSER_RC_OK;
+ }
+
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+ wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
+ }
+
+ // the child sends an RBEGIN without any parameters initially
+ // setting rset_enabled to false, means the RSET should not store any metrics
+ // to store metrics, the RBEGIN needs to have timestamps
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
+{
+ char *dimension = get_word(words, num_words, 1);
+ char *value_str = get_word(words, num_words, 2);
+ char *flags_str = get_word(words, num_words, 3);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) {
+ error_limit_static_thread_var(erl, 1, 0);
+ error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ // we have to return OK here
+ return PARSER_RC_OK;
+ }
+
+ RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
+ error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ rrdhost_hostname(host),
+ rrdset_id(st),
+ dimension,
+ ((PARSER_USER_OBJECT *) user)->replay.start_time,
+ ((PARSER_USER_OBJECT *) user)->replay.end_time);
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ if (unlikely(!value_str || !*value_str))
+ value_str = "NAN";
+
+ if(unlikely(!flags_str))
+ flags_str = "";
+
+ if (likely(value_str)) {
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+
+ RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
+
+ if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
+ NETDATA_DOUBLE value = strtondd(value_str, NULL);
+ SN_FLAGS flags = SN_FLAG_NONE;
+
+ char c;
+ while ((c = *flags_str++)) {
+ switch (c) {
+ case 'R':
+ flags |= SN_FLAG_RESET;
+ break;
+
+ case 'E':
+ flags |= SN_EMPTY_SLOT;
+ value = NAN;
+ break;
+
+ default:
+ error("unknown flag '%c'", c);
+ break;
+ }
+ }
+
+ if (!netdata_double_isnumber(value)) {
+ value = NAN;
+ flags = SN_EMPTY_SLOT;
+ }
+
+ rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags);
+ rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+ rd->last_collected_time.tv_usec = 0;
+ rd->collections_counter++;
+ }
+ else {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
+ }
+ }
+
+ rrddim_acquired_release(rda);
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user)
+{
+ char *dimension = get_word(words, num_words, 1);
+ char *last_collected_ut_str = get_word(words, num_words, 2);
+ char *last_collected_value_str = get_word(words, num_words, 3);
+ char *last_calculated_value_str = get_word(words, num_words, 4);
+ char *last_stored_value_str = get_word(words, num_words, 5);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+ usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ if(last_collected_ut > dim_last_collected_ut) {
+ rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
+ rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+ }
+
+ rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
+ rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
+ rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
+ rrddim_acquired_release(rda);
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user)
+{
+ char *last_collected_ut_str = get_word(words, num_words, 1);
+ char *last_updated_ut_str = get_word(words, num_words, 2);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ if(last_collected_ut > chart_last_collected_ut) {
+ st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
+ st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+ }
+
+ usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
+ usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0;
+ if(last_updated_ut > chart_last_updated_ut) {
+ st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC;
+ st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC;
+ }
+
+ st->counter++;
+ st->counter_done++;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
+{
+ if (num_words < 7) { // accepts 7, but the 7th is optional
+ error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
+ return PARSER_RC_ERROR;
+ }
+
+ const char *update_every_child_txt = get_word(words, num_words, 1);
+ const char *first_entry_child_txt = get_word(words, num_words, 2);
+ const char *last_entry_child_txt = get_word(words, num_words, 3);
+ const char *start_streaming_txt = get_word(words, num_words, 4);
+ const char *first_entry_requested_txt = get_word(words, num_words, 5);
+ const char *last_entry_requested_txt = get_word(words, num_words, 6);
+ const char *child_world_time_txt = get_word(words, num_words, 7); // optional
+
+ time_t update_every_child = (time_t)str2ul(update_every_child_txt);
+ time_t first_entry_child = (time_t)str2ul(first_entry_child_txt);
+ time_t last_entry_child = (time_t)str2ul(last_entry_child_txt);
+
+ bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
+ time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt);
+ time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt);
+
+ // the optional child world time
+ time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec();
+
+ PARSER_USER_OBJECT *user_object = user;
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true,
+ "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
+ rrdhost_hostname(host), rrdset_id(st),
+ (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
+ start_streaming?"true":"false",
+ (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
+ (unsigned long long)child_world_time
+ );
+#endif
+
+ ((PARSER_USER_OBJECT *) user)->st = NULL;
+ ((PARSER_USER_OBJECT *) user)->count++;
+
+ if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
+ time_t now = now_realtime_sec();
+ time_t started = st->rrdhost->receiver->replication_first_time_t;
+ time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started));
+ }
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+
+ st->counter++;
+ st->counter_done++;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+ if(start_streaming)
+ st->replay.log_next_data_collection = true;
+#endif
+
+ if (start_streaming) {
+ if (st->update_every != update_every_child)
+ rrdset_set_update_every(st, update_every_child);
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
+ rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
+ }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ else
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
+ rrdhost_hostname(host), rrdset_id(st));
+#endif
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
+
+ return PARSER_RC_OK;
+ }
+
+ rrdcontext_updated_retention_rrdset(st);
+
+ bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
+ first_entry_child, last_entry_child, child_world_time,
+ first_entry_requested, last_entry_requested);
+ return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
+
+static void pluginsd_process_thread_cleanup(void *ptr) {
+ PARSER *parser = (PARSER *)ptr;
+ rrd_collector_finished();
+ parser_destroy(parser);
+}
+
+// New plugins.d parser
+
+inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
+{
+ int enabled = cd->enabled;
+
+ if (!fp_plugin_input || !fp_plugin_output || !enabled) {
+ cd->enabled = 0;
+ return 0;
+ }
+
+ if (unlikely(fileno(fp_plugin_input) == -1)) {
+ error("input file descriptor given is not a valid stream");
+ cd->serial_failures++;
+ return 0;
+ }
+
+ if (unlikely(fileno(fp_plugin_output) == -1)) {
+ error("output file descriptor given is not a valid stream");
+ cd->serial_failures++;
+ return 0;
+ }
+
+ clearerr(fp_plugin_input);
+ clearerr(fp_plugin_output);
+
+ PARSER_USER_OBJECT user = {
+ .enabled = cd->enabled,
+ .host = host,
+ .cd = cd,
+ .trust_durations = trust_durations
+ };
+
+ // fp_plugin_output = our input; fp_plugin_input = our output
+ PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
+
+ rrd_collector_started();
+
+ // this keeps the parser with its current value
+ // so, parser needs to be allocated before pushing it
+ netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
+
+ user.parser = parser;
+
+ while (likely(!parser_next(parser))) {
+ if (unlikely(netdata_exit || parser_action(parser, NULL)))
+ break;
+ }
+
+ // free parser with the pop function
+ netdata_thread_cleanup_pop(1);
+
+ cd->enabled = user.enabled;
+ size_t count = user.count;
+
+ if (likely(count)) {
+ cd->successful_collections += count;
+ cd->serial_failures = 0;
+ }
+ else
+ cd->serial_failures++;
+
+ return count;
+}
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
new file mode 100644
index 0000000..e18b43e
--- /dev/null
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINSD_PARSER_H
+#define NETDATA_PLUGINSD_PARSER_H
+
+#include "parser/parser.h"
+
+typedef struct parser_user_object {
+ PARSER *parser;
+ RRDSET *st;
+ RRDHOST *host;
+ void *opaque;
+ struct plugind *cd;
+ int trust_durations;
+ DICTIONARY *new_host_labels;
+ DICTIONARY *chart_rrdlabels_linked_temporarily;
+ size_t count;
+ int enabled;
+ uint8_t st_exists;
+ uint8_t host_exists;
+ void *private; // the user can set this for private use
+
+ struct {
+ time_t start_time;
+ time_t end_time;
+
+ usec_t start_time_ut;
+ usec_t end_time_ut;
+
+ time_t wall_clock_time;
+
+ bool rset_enabled;
+ } replay;
+} PARSER_USER_OBJECT;
+
+PARSER_RC pluginsd_function(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user);
+void inflight_functions_init(PARSER *parser);
+#endif //NETDATA_PLUGINSD_PARSER_H