summaryrefslogtreecommitdiffstats
path: root/src/collectors/plugins.d
diff options
context:
space:
mode:
Diffstat (limited to 'src/collectors/plugins.d')
-rw-r--r--src/collectors/plugins.d/README.md875
-rw-r--r--src/collectors/plugins.d/functions-table.md418
-rw-r--r--src/collectors/plugins.d/gperf-config.txt112
-rw-r--r--src/collectors/plugins.d/gperf-hashtable.h237
-rw-r--r--src/collectors/plugins.d/local_listeners.c292
-rw-r--r--src/collectors/plugins.d/ndsudo.c308
-rw-r--r--src/collectors/plugins.d/plugins_d.c360
-rw-r--r--src/collectors/plugins.d/plugins_d.h53
-rw-r--r--src/collectors/plugins.d/pluginsd_dyncfg.c69
-rw-r--r--src/collectors/plugins.d/pluginsd_dyncfg.h11
-rw-r--r--src/collectors/plugins.d/pluginsd_functions.c412
-rw-r--r--src/collectors/plugins.d/pluginsd_functions.h48
-rw-r--r--src/collectors/plugins.d/pluginsd_internals.c120
-rw-r--r--src/collectors/plugins.d/pluginsd_internals.h355
-rw-r--r--src/collectors/plugins.d/pluginsd_parser.c1401
-rw-r--r--src/collectors/plugins.d/pluginsd_parser.h244
-rw-r--r--src/collectors/plugins.d/pluginsd_replication.c371
-rw-r--r--src/collectors/plugins.d/pluginsd_replication.h14
18 files changed, 5700 insertions, 0 deletions
diff --git a/src/collectors/plugins.d/README.md b/src/collectors/plugins.d/README.md
new file mode 100644
index 000000000..20c80dbad
--- /dev/null
+++ b/src/collectors/plugins.d/README.md
@@ -0,0 +1,875 @@
+<!--
+title: "External plugins"
+custom_edit_url: "https://github.com/netdata/netdata/edit/master/src/collectors/plugins.d/README.md"
+sidebar_label: "External plugins"
+learn_status: "Published"
+learn_topic_type: "References"
+learn_rel_path: "Developers/External plugins"
+-->
+
+# External plugins
+
+`plugins.d` is the Netdata internal plugin that collects metrics
+from external processes, thus allowing Netdata to use **external plugins**.
+
+## Provided External Plugins
+
+| plugin | language | O/S | description |
+|:------------------------------------------------------------------------------------------------------:|:--------:|:--------------:|:----------------------------------------------------------------------------------------------------------------------------------------|
+| [apps.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/apps.plugin/README.md) | `C` | linux, freebsd | monitors the whole process tree on Linux and FreeBSD and breaks down system resource usage by **process**, **user** and **user group**. |
+| [charts.d.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/charts.d.plugin/README.md) | `BASH` | all | a **plugin orchestrator** for data collection modules written in `BASH` v4+. |
+| [cups.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/cups.plugin/README.md) | `C` | all | monitors **CUPS** |
+| [ebpf.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/ebpf.plugin/README.md) | `C` | linux | monitors different metrics on environments using kernel internal functions. |
+| [go.d.plugin](https://github.com/netdata/netdata/blob/master/src/go/collectors/go.d.plugin/README.md) | `GO` | all | collects metrics from the system, applications, or third-party APIs. |
+| [ioping.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/ioping.plugin/README.md) | `C` | all | measures disk latency. |
+| [freeipmi.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/freeipmi.plugin/README.md) | `C` | linux | collects metrics from enterprise hardware sensors, on Linux servers. |
+| [nfacct.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/nfacct.plugin/README.md) | `C` | linux | collects netfilter firewall, connection tracker and accounting metrics using `libmnl` and `libnetfilter_acct`. |
+| [xenstat.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/xenstat.plugin/README.md) | `C` | linux | collects XenServer and XCP-ng metrics using `lxenstat`. |
+| [perf.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/perf.plugin/README.md) | `C` | linux | collects CPU performance metrics using performance monitoring units (PMU). |
+| [python.d.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/python.d.plugin/README.md) | `python` | all | a **plugin orchestrator** for data collection modules written in `python` v2 or v3 (both are supported). |
+| [slabinfo.plugin](https://github.com/netdata/netdata/blob/master/src/collectors/slabinfo.plugin/README.md) | `C` | linux | collects kernel internal cache objects (SLAB) metrics. |
+
+Plugin orchestrators may also be described as **modular plugins**. They are modular since they accept custom made modules to be included. Writing modules for these plugins is easier than accessing the native Netdata API directly. You will find modules already available for each orchestrator under the directory of the particular modular plugin (e.g. under python.d.plugin for the python orchestrator).
+Each of these modular plugins has each own methods for defining modules. Please check the examples and their documentation.
+
+## Motivation
+
+This plugin allows Netdata to use **external plugins** for data collection:
+
+1. external data collection plugins may be written in any computer language.
+
+2. external data collection plugins may use O/S capabilities or `setuid` to
+ run with escalated privileges (compared to the `netdata` daemon).
+ The communication between the external plugin and Netdata is unidirectional
+ (from the plugin to Netdata), so that Netdata cannot manipulate an external
+ plugin running with escalated privileges.
+
+## Operation
+
+Each of the external plugins is expected to run forever.
+Netdata will start it when it starts and stop it when it exits.
+
+If the external plugin exits or crashes, Netdata will log an error.
+If the external plugin exits or crashes without pushing metrics to Netdata, Netdata will not start it again.
+
+- Plugins that exit with any value other than zero, will be disabled. Plugins that exit with zero, will be restarted after some time.
+- Plugins may also be disabled by Netdata if they output things that Netdata does not understand.
+
+The `stdout` of external plugins is connected to Netdata to receive metrics,
+with the API defined below.
+
+The `stderr` of external plugins is connected to Netdata's `error.log`.
+
+Plugins can create any number of charts with any number of dimensions each. Each chart can have its own characteristics independently of the others generated by the same plugin. For example, one chart may have an update frequency of 1 second, another may have 5 seconds and a third may have 10 seconds.
+
+## Configuration
+
+Netdata will supply the environment variables `NETDATA_USER_CONFIG_DIR` (for user supplied) and `NETDATA_STOCK_CONFIG_DIR` (for Netdata supplied) configuration files to identify the directory where configuration files are stored. It is up to the plugin to read the configuration it needs.
+
+The `netdata.conf` section `[plugins]` section contains a list of all the plugins found at the system where Netdata runs, with a boolean setting to enable them or not.
+
+Example:
+
+```
+[plugins]
+ # enable running new plugins = yes
+ # check for new plugins every = 60
+
+ # charts.d = yes
+ # ioping = yes
+ # python.d = yes
+```
+
+The setting `enable running new plugins` sets the default behavior for all external plugins. It can be
+overridden for distinct plugins by modifying the appropriate plugin value configuration to either `yes` or `no`.
+
+The setting `check for new plugins every` sets the interval between scans of the directory
+`/usr/libexec/netdata/plugins.d`. New plugins can be added any time, and Netdata will detect them in a timely manner.
+
+For each of the external plugins enabled, another `netdata.conf` section
+is created, in the form of `[plugin:NAME]`, where `NAME` is the name of the external plugin.
+This section allows controlling the update frequency of the plugin and provide
+additional command line arguments to it.
+
+For example, for `apps.plugin` the following section is available:
+
+```
+[plugin:apps]
+ # update every = 1
+ # command options =
+```
+
+- `update every` controls the granularity of the external plugin.
+- `command options` allows giving additional command line options to the plugin.
+
+Netdata will provide to the external plugins the environment variable `NETDATA_UPDATE_EVERY`, in seconds (the default is 1). This is the **minimum update frequency** for all charts. A plugin that is updating values more frequently than this, is just wasting resources.
+
+Netdata will call the plugin with just one command line parameter: the number of seconds the user requested this plugin to update its data (by default is also 1).
+
+Other than the above, the plugin configuration is up to the plugin.
+
+Keep in mind, that the user may use Netdata configuration to overwrite chart and dimension parameters. This is transparent to the plugin.
+
+### Autoconfiguration
+
+Plugins should attempt to autoconfigure themselves when possible.
+
+For example, if your plugin wants to monitor `squid`, you can search for it on port `3128` or `8080`. If any succeeds, you can proceed. If it fails you can output an error (on stderr) saying that you cannot find `squid` running and giving instructions about the plugin configuration. Then you can stop (exit with non-zero value), so that Netdata will not attempt to start the plugin again.
+
+## External Plugins API
+
+Any program that can print a few values to its standard output can become a Netdata external plugin.
+
+Netdata parses lines starting with:
+
+- `CHART` - create or update a chart
+- `DIMENSION` - add or update a dimension to the chart just created
+- `VARIABLE` - define a variable (to be used in health calculations)
+- `CLABEL` - add a label to a chart
+- `CLABEL_COMMIT` - commit added labels to the chart
+- `FUNCTION` - define a function that can be called later to execute it
+- `BEGIN` - initialize data collection for a chart
+- `SET` - set the value of a dimension for the initialized chart
+- `END` - complete data collection for the initialized chart
+- `FLUSH` - ignore the last collected values
+- `DISABLE` - disable this plugin
+- `FUNCTION` - define functions
+- `FUNCTION_PROGRESS` - report the progress of a function execution
+- `FUNCTION_RESULT_BEGIN` - to initiate the transmission of function results
+- `FUNCTION_RESULT_END` - to end the transmission of function result
+- `CONFIG` - to define dynamic configuration entities
+
+a single program can produce any number of charts with any number of dimensions each.
+
+Charts can be added any time (not just the beginning).
+
+Netdata may send the following commands to the plugin's `stdin`:
+
+- `FUNCTION` - to call a specific function, with all parameters inline
+- `FUNCTION_PAYLOAD` - to call a specific function, with a payload of parameters
+- `FUNCTION_PAYLOAD_END` - to end the payload of parameters
+- `FUNCTION_CANCEL` - to cancel a running function transaction - no response is required
+- `FUNCTION_PROGRESS` - to report that a user asked the progress of running function call - no response is required
+
+### Command line parameters
+
+The plugin **MUST** accept just **one** parameter: **the number of seconds it is
+expected to update the values for its charts**. The value passed by Netdata
+to the plugin is controlled via its configuration file (so there is no need
+for the plugin to handle this configuration option).
+
+The external plugin can overwrite the update frequency. For example, the server may
+request per second updates, but the plugin may ignore it and update its charts
+every 5 seconds.
+
+### Environment variables
+
+There are a few environment variables that are set by `netdata` and are
+available for the plugin to use.
+
+| variable | description |
+|:--------------------------------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `NETDATA_USER_CONFIG_DIR` | The directory where all Netdata-related user configuration should be stored. If the plugin requires custom user configuration, this is the place the user has saved it (normally under `/etc/netdata`). |
+| `NETDATA_STOCK_CONFIG_DIR` | The directory where all Netdata -related stock configuration should be stored. If the plugin is shipped with configuration files, this is the place they can be found (normally under `/usr/lib/netdata/conf.d`). |
+| `NETDATA_PLUGINS_DIR` | The directory where all Netdata plugins are stored. |
+| `NETDATA_USER_PLUGINS_DIRS` | The list of directories where custom plugins are stored. |
+| `NETDATA_WEB_DIR` | The directory where the web files of Netdata are saved. |
+| `NETDATA_CACHE_DIR` | The directory where the cache files of Netdata are stored. Use this directory if the plugin requires a place to store data. A new directory should be created for the plugin for this purpose, inside this directory. |
+| `NETDATA_LOG_DIR` | The directory where the log files are stored. By default the `stderr` output of the plugin will be saved in the `error.log` file of Netdata. |
+| `NETDATA_HOST_PREFIX` | This is used in environments where system directories like `/sys` and `/proc` have to be accessed at a different path. |
+| `NETDATA_DEBUG_FLAGS` | This is a number (probably in hex starting with `0x`), that enables certain Netdata debugging features. Check **\[[Tracing Options]]** for more information. |
+| `NETDATA_UPDATE_EVERY` | The minimum number of seconds between chart refreshes. This is like the **internal clock** of Netdata (it is user configurable, defaulting to `1`). There is no meaning for a plugin to update its values more frequently than this number of seconds. |
+| `NETDATA_INVOCATION_ID` | A random UUID in compact form, representing the unique invocation identifier of Netdata. When running under systemd, Netdata uses the `INVOCATION_ID` set by systemd. |
+| `NETDATA_LOG_METHOD` | One of `syslog`, `journal`, `stderr` or `none`, indicating the preferred log method of external plugins. |
+| `NETDATA_LOG_FORMAT` | One of `journal`, `logfmt` or `json`, indicating the format of the logs. Plugins can use the Netdata `systemd-cat-native` command to log always in `journal` format, and have it automatically converted to the format expected by netdata. |
+| `NETDATA_LOG_LEVEL` | One of `emergency`, `alert`, `critical`, `error`, `warning`, `notice`, `info`, `debug`. Plugins are expected to log events with the given priority and the more important ones. |
+| `NETDATA_SYSLOG_FACILITY` | Set only when the `NETDATA_LOG_METHOD` is `syslog`. Possible values are `auth`, `authpriv`, `cron`, `daemon`, `ftp`, `kern`, `lpr`, `mail`, `news`, `syslog`, `user`, `uucp` and `local0` to `local7` |
+| `NETDATA_ERRORS_THROTTLE_PERIOD` | The log throttling period in seconds. |
+| `NETDATA_ERRORS_PER_PERIOD` | The allowed number of log events per period. |
+| `NETDATA_SYSTEMD_JOURNAL_PATH` | When `NETDATA_LOG_METHOD` is set to `journal`, this is the systemd-journald socket path to use. |
+
+### The output of the plugin
+
+The plugin should output instructions for Netdata to its output (`stdout`). Since this uses pipes, please make sure you flush stdout after every iteration.
+
+#### DISABLE
+
+`DISABLE` will disable this plugin. This will prevent Netdata from restarting the plugin. You can also exit with the value `1` to have the same effect.
+
+#### HOST_DEFINE
+
+`HOST_DEFINE` defines a new (or updates an existing) virtual host.
+
+The template is:
+
+> HOST_DEFINE machine_guid hostname
+
+where:
+
+- `machine_guid`
+
+ uniquely identifies the host, this is what will be needed to add charts to the host.
+
+- `hostname`
+
+ is the hostname of the virtual host
+
+#### HOST_LABEL
+
+`HOST_LABEL` adds a key-value pair to the virtual host labels. It has to be given between `HOST_DEFINE` and `HOST_DEFINE_END`.
+
+The template is:
+
+> HOST_LABEL key value
+
+where:
+
+- `key`
+
+ uniquely identifies the key of the label
+
+- `value`
+
+ is the value associated with this key
+
+There are a few special keys that are used to define the system information of the monitored system:
+
+- `_cloud_provider_type`
+- `_cloud_instance_type`
+- `_cloud_instance_region`
+- `_os_name`
+- `_os_version`
+- `_kernel_version`
+- `_system_cores`
+- `_system_cpu_freq`
+- `_system_ram_total`
+- `_system_disk_space`
+- `_architecture`
+- `_virtualization`
+- `_container`
+- `_container_detection`
+- `_virt_detection`
+- `_is_k8s_node`
+- `_install_type`
+- `_prebuilt_arch`
+- `_prebuilt_dist`
+
+#### HOST_DEFINE_END
+
+`HOST_DEFINE_END` commits the host information, creating a new host entity, or updating an existing one with the same `machine_guid`.
+
+#### HOST
+
+`HOST` switches data collection between hosts.
+
+The template is:
+
+> HOST machine_guid
+
+where:
+
+- `machine_guid`
+
+ is the UUID of the host to switch to. After this command, every other command following it is assumed to be associated with this host.
+ Setting machine_guid to `localhost` switches data collection to the local host.
+
+#### CHART
+
+`CHART` defines a new chart.
+
+the template is:
+
+> CHART type.id name title units \[family \[context \[charttype \[priority \[update_every \[options \[plugin [module]]]]]]]]
+
+ where:
+
+- `type.id`
+
+ uniquely identifies the chart,
+ this is what will be needed to add values to the chart
+
+ the `type` part controls the menu the charts will appear in
+
+- `name`
+
+ is the name that will be presented to the user instead of `id` in `type.id`. This means that only the `id` part of
+ `type.id` is changed. When a name has been given, the chart is indexed (and can be referred) as both `type.id` and
+ `type.name`. You can set name to `''`, or `null`, or `(null)` to disable it. If a chart with the same name already
+ exists, a serial number is automatically attached to the name to avoid naming collisions.
+
+- `title`
+
+ the text above the chart
+
+- `units`
+
+ the label of the vertical axis of the chart,
+ all dimensions added to a chart should have the same units
+ of measurement
+
+- `family`
+
+ is used to group charts together
+ (for example all eth0 charts should say: eth0),
+ if empty or missing, the `id` part of `type.id` will be used
+
+ this controls the sub-menu on the dashboard
+
+- `context`
+
+ the context is giving the template of the chart. For example, if multiple charts present the same information for a different family, they should have the same `context`
+
+ this is used for looking up rendering information for the chart (colors, sizes, informational texts) and also apply alerts to it
+
+- `charttype`
+
+ one of `line`, `area` or `stacked`,
+ if empty or missing, the `line` will be used
+
+- `priority`
+
+ is the relative priority of the charts as rendered on the web page,
+ lower numbers make the charts appear before the ones with higher numbers,
+ if empty or missing, `1000` will be used
+
+- `update_every`
+
+ overwrite the update frequency set by the server,
+ if empty or missing, the user configured value will be used
+
+- `options`
+
+ a space separated list of options, enclosed in quotes. 4 options are currently supported: `obsolete` to mark a chart as obsolete (Netdata will hide it and delete it after some time), `detail` to mark a chart as insignificant (this may be used by dashboards to make the charts smaller, or somehow visualize properly a less important chart), `store_first` to make Netdata store the first collected value, assuming there was an invisible previous value set to zero (this is used by statsd charts - if the first data collected value of incremental dimensions is not zero based, unrealistic spikes will appear with this option set) and `hidden` to perform all operations on a chart, but do not offer it on dashboards (the chart will be send to external databases). `CHART` options have been added in Netdata v1.7 and the `hidden` option was added in 1.10.
+
+- `plugin` and `module`
+
+ both are just names that are used to let the user identify the plugin and the module that generated the chart. If `plugin` is unset or empty, Netdata will automatically set the filename of the plugin that generated the chart. `module` has not default.
+
+#### DIMENSION
+
+`DIMENSION` defines a new dimension for the chart
+
+the template is:
+
+> DIMENSION id \[name \[algorithm \[multiplier \[divisor [options]]]]]
+
+ where:
+
+- `id`
+
+ the `id` of this dimension (it is a text value, not numeric),
+ this will be needed later to add values to the dimension
+
+ We suggest to avoid using `.` in dimension ids. External databases expect metrics to be `.` separated and people will get confused if a dimension id contains a dot.
+
+- `name`
+
+ the name of the dimension as it will appear at the legend of the chart,
+ if empty or missing the `id` will be used
+
+- `algorithm`
+
+ one of:
+
+ - `absolute`
+
+ the value is to drawn as-is (interpolated to second boundary),
+ if `algorithm` is empty, invalid or missing, `absolute` is used
+
+ - `incremental`
+
+ the value increases over time,
+ the difference from the last value is presented in the chart,
+ the server interpolates the value and calculates a per second figure
+
+ - `percentage-of-absolute-row`
+
+ the % of this value compared to the total of all dimensions
+
+ - `percentage-of-incremental-row`
+
+ the % of this value compared to the incremental total of
+ all dimensions
+
+- `multiplier`
+
+ an integer value to multiply the collected value,
+ if empty or missing, `1` is used
+
+- `divisor`
+
+ an integer value to divide the collected value,
+ if empty or missing, `1` is used
+
+- `options`
+
+ a space separated list of options, enclosed in quotes. Options supported: `obsolete` to mark a dimension as obsolete (Netdata will delete it after some time) and `hidden` to make this dimension hidden, it will take part in the calculations but will not be presented in the chart.
+
+#### VARIABLE
+
+> VARIABLE [SCOPE] name = value
+
+`VARIABLE` defines a variable that can be used in alerts. This is to used for setting constants (like the max connections a server may accept).
+
+Variables support 2 scopes:
+
+- `GLOBAL` or `HOST` to define the variable at the host level.
+- `LOCAL` or `CHART` to define the variable at the chart level. Use chart-local variables when the same variable may exist for different charts (i.e. Netdata monitors 2 mysql servers, and you need to set the `max_connections` each server accepts). Using chart-local variables is the ideal to build alert templates.
+
+The position of the `VARIABLE` line, sets its default scope (in case you do not specify a scope). So, defining a `VARIABLE` before any `CHART`, or between `END` and `BEGIN` (outside any chart), sets `GLOBAL` scope, while defining a `VARIABLE` just after a `CHART` or a `DIMENSION`, or within the `BEGIN` - `END` block of a chart, sets `LOCAL` scope.
+
+These variables can be set and updated at any point.
+
+Variable names should use alphanumeric characters, the `.` and the `_`.
+
+The `value` is floating point (Netdata used `long double`).
+
+Variables are transferred to upstream Netdata servers (streaming and database replication).
+
+#### CLABEL
+
+> CLABEL name value source
+
+`CLABEL` defines a label used to organize and identify a chart.
+
+Name and value accept characters according to the following table:
+
+| Character | Symbol | Label Name | Label Value |
+|---------------------|:------:|:----------:|:-----------:|
+| UTF-8 character | UTF-8 | _ | keep |
+| Lower case letter | [a-z] | keep | keep |
+| Upper case letter | [A-Z] | keep | [a-z] |
+| Digit | [0-9] | keep | keep |
+| Underscore | _ | keep | keep |
+| Minus | - | keep | keep |
+| Plus | + | _ | keep |
+| Colon | : | _ | keep |
+| Semicolon | ; | _ | : |
+| Equal | = | _ | : |
+| Period | . | keep | keep |
+| Comma | , | . | . |
+| Slash | / | keep | keep |
+| Backslash | \ | / | / |
+| At | @ | _ | keep |
+| Space | ' ' | _ | keep |
+| Opening parenthesis | ( | _ | keep |
+| Closing parenthesis | ) | _ | keep |
+| Anything else | | _ | _ |
+
+The `source` is an integer field that can have the following values:
+- `1`: The value was set automatically.
+- `2`: The value was set manually.
+- `4`: This is a K8 label.
+- `8`: This is a label defined using `netdata` agent cloud link.
+
+#### CLABEL_COMMIT
+
+`CLABEL_COMMIT` indicates that all labels were defined and the chart can be updated.
+
+#### FUNCTION
+
+The plugin can register functions to Netdata, like this:
+
+> FUNCTION [GLOBAL] "name and parameters of the function" timeout "help string for users" "tags" "access"
+
+- Tags currently recognized are either `top` or `logs` (or both, space separated).
+- Access is one of `any`, `member`, or `admin`:
+ - `any` to offer the function to all users of Netdata, even if they are not authenticated.
+ - `member` to offer the function to all authenticated members of Netdata.
+ - `admin` to offer the function only to authenticated administrators.
+
+Users can use a function to ask for more information from the collector. Netdata maintains a registry of functions in 2 levels:
+
+- per node
+- per chart
+
+Both node and chart functions are exactly the same, but chart functions allow Netdata to relate functions with charts and therefore present a context-sensitive menu of functions related to the chart the user is using.
+
+Users can get a list of all the registered functions using the `/api/v1/functions` endpoint of Netdata and call functions using the `/api/v1/function` API call of Netdata.
+
+Once a function is called, the plugin will receive at its standard input a command that looks like this:
+
+```
+FUNCTION transaction_id timeout "name and parameters of the function as one quoted parameter" "user permissions value" "source of request"
+```
+
+When the function to be called is to receive a payload of parameters, the call looks like this:
+
+```
+FUNCTION_PAYLOAD transaction_id timeout "name and parameters of the function as one quoted parameter" "user permissions value" "source of request" "content/type"
+body of the payload, formatted according to content/type
+FUNCTION PAYLOAD END
+```
+
+In this case, Netdata will send:
+
+- A line starting with `FUNCTION_PAYLOAD` together with the required metadata for the function, like the transaction id, the function name and its parameters, the timeout and the content type. This line ends with a newline.
+- Then, the payload itself (which may or may not have newlines in it). The payload should be parsed according to the content type parameter.
+- Finally, a line starting with `FUNCTION_PAYLOAD_END`, so it is expected like `\nFUNCTION_PAYLOAD_END\n`.
+
+Note 1: The plugins.d protocol allows parameters without single or double quotes if they don't contain spaces. However, the plugin should be able to parse parameters even if they are enclosed in single or double quotes. If the first character of a parameter is a single quote, its last character should also be a single quote too, and similarly for double quotes.
+
+Note 2: Netdata always sends the function and its parameters enclosed in double quotes. If the function command and its parameters contain quotes, they are converted to single quotes.
+
+The plugin is expected to parse and validate `name and parameters of the function as one quotes parameter`. Netdata allows the user interface to manipulate this string by appending more parameters.
+
+If the plugin rejects the request, it should respond with this:
+
+```
+FUNCTION_RESULT_BEGIN transaction_id 400 application/json
+{
+ "status": 400,
+ "error_message": "description of the rejection reasons"
+}
+FUNCTION_RESULT_END
+```
+
+If the plugin prepares a response, it should send (via its standard output, together with the collected data, but not interleaved with them):
+
+```
+FUNCTION_RESULT_BEGIN transaction_id http_response_code content_type expiration
+```
+
+Where:
+
+ - `transaction_id` is the transaction id that Netdata sent for this function execution
+ - `http_response_code` is the http error code Netdata should respond with, 200 is the "ok" response
+ - `content_type` is the content type of the response
+ - `expiration` is the absolute timestamp (number, unix epoch) this response expires
+
+Immediately after this, all text is assumed to be the response content.
+The content is text and line oriented. The maximum line length accepted is 15kb. Longer lines will be truncated.
+The type of the context itself depends on the plugin and the UI.
+
+To terminate the message, Netdata seeks a line with just this:
+
+```
+FUNCTION_RESULT_END
+```
+
+This defines the end of the message. `FUNCTION_RESULT_END` should appear in a line alone, without any other text, so it is wise to add `\n` before and after it.
+
+After this line, Netdata resumes processing collected metrics from the plugin.
+
+The maximum uncompressed payload size Netdata will accept is 100MB.
+
+##### Functions cancellation
+
+Netdata is able to detect when a user made an API request, but abandoned it before it was completed. If this happens to an API called for a function served by the plugin, Netdata will generate a `FUNCTION_CANCEL` request to let the plugin know that it can stop processing the query.
+
+After receiving such a command, the plugin **must still send a response for the original function request**, to wake up any waiting threads before they timeout. The http response code is not important, since the response will be discarded, however for auditing reasons we suggest to send back a 499 http response code. This is not a standard response code according to the HTTP protocol, but web servers like `nginx` are using it to indicate that a request was abandoned by a user.
+
+##### Functions progress
+
+When a request takes too long to be processed, Netdata allows the plugin to report progress to Netdata, which in turn will report progress to the caller.
+
+The plugin can send `FUNCTION_PROGRESS` like this:
+
+```
+FUNCTION_PROGRESS transaction_id done all
+```
+
+Where:
+
+- `transaction_id` is the transaction id of the function request
+- `done` is an integer value indicating the amount of work done
+- `all` is an integer value indicating the total amount of work to be done
+
+Netdata supports two kinds of progress:
+- progress as a percentage, which is calculated as `done * 100 / all`
+- progress without knowing the total amount of work to be done, which is enabled when the plugin reports `all` as zero.
+
+##### Functions timeout
+
+All functions calls specify a timeout, at which all the intermediate routing nodes (parents, web server threads) will time out and abort the call.
+
+However, all intermediate routing nodes are configured to extend the timeout when the caller asks for progress. This works like this:
+
+When a progress request is received, if the expected timeout of the request is less than or equal to 10 seconds, the expected timeout is extended by 10 seconds.
+
+Usually, the user interface asks for a progress every second. So, during the last 10 seconds of the timeout, every progress request made shifts the timeout 10 seconds to the future.
+
+To accomplish this, when Netdata receives a progress request by a user, it generates progress requests to the plugin, updating all the intermediate nodes to extend their timeout if necessary.
+
+The plugin will receive progress requests like this:
+
+```
+FUNCTION_PROGRESS transaction_id
+```
+
+There is no need to respond to this command. It is only there to let the plugin know that a user is still waiting for the query to finish.
+
+#### CONFIG
+
+`CONFIG` commands sent from the plugin to Netdata define dynamic configuration entities. These configurable entities are exposed to the user interface, allowing users to change configuration at runtime.
+
+Dynamically configurations made this way are saved to disk by Netdata and are replayed automatically when Netdata or the plugin restarts.
+
+`CONFIG` commands look like this:
+
+```
+CONFIG id action ...
+```
+
+Where:
+
+- `id` is a unique identifier for the configurable entity. This should by design be unique across Netdata. It should be something like `plugin:module:jobs`, e.g. `go.d:postgresql:jobs:masterdb`. This is assumed to be colon-separated with the last part (`masterdb` in our example), being the one displayed to users when there ano conflicts under the same configuration path.
+- `action` can be:
+ - `create`, to declare the dynamic configuration entity
+ - `delete`, to delete the dynamic configuration entity - this does not delete user configuration, we if an entity with the same id is created in the future, the saved configuration will be given to it.
+ - `status`, to update the dynamic configuration entity status
+
+> IMPORTANT:<br/>
+> The plugin should blindly create, delete and update the status of its dynamic configuration entities, without any special logic applied to it. Netdata needs to be updated of what is actually happening at the plugin. Keep in mind that creating dynamic configuration entities triggers responses from Netdata, depending on its type and status. Re-creating a job, triggers the same responses every time, so make sure you create jobs only when you add jobs.
+
+When the `action` is `create`, the following additional parameters are expected:
+
+```
+CONFIG id action status type "path" source_type "source" "supported commands" "view permissions" "edit permissions"
+```
+
+Where:
+
+- `action` should be `create`
+- `status` can be:
+ - `accepted`, the plugin accepted the configuration, but it is not running yet.
+ - `running`, the plugin accepted and runs the configuration.
+ - `failed`, the plugin tries to run the configuration but it fails.
+ - `incomplete`, the plugin needs additional settings to run this configuration. This is usually used for the cases the plugin discovered a job, but important information is missing for it to work.
+ - `disabled`, the configuration has been disabled by a user.
+ - `orphan`, the configuration is not claimed by any plugin. This is used internally by Netdata to mark the configuration nodes available, for which there is no plugin related to them. Do not use in plugins directly.
+- `type` can be `single`, `template` or `job`:
+ - `single` is used when the configurable entity is fixed and users should never be able to add or delete it.
+ - `template` is used to define a template based on which users can add multiple configurations, like adding data collection jobs. So, the plugin defines the template of the jobs and users are presented with a `[+]` button to add such configuration jobs. The plugin can define multiple templates by giving different `id`s to them.
+ - `job` is used to define a job of a template. The plugin should always add all its jobs, independently of the way they have been discovered. It is important to note the relation between `template` and `job` when it comes it the `id`: The `id` of the template should be the prefix of the `job`'s `id`. For example, if the template is `go.d:postgresql:jobs`, then all its jobs be like `go.d:postgresql:jobs:jobname`.
+- `path` is the absolute path of the configurable entity inside the tree of Netdata configurations. Usually, this is should be `/collectors`.
+- `source` can be `internal`, `stock`, `user`, `discovered` or `dyncfg`:
+ - `internal` is used for configurations that are based on internal code settings
+ - `stock` is used for default configurations
+ - `discovered` is used for dynamic configurations the plugin discovers by its own
+ - `user` is used for user configurations, usually via a configuration file
+ - `dyncfg` is used for configuration received via this dynamic configuration mechanism
+- `source` should provide more details about the exact source of the configuration, like `line@file`, or `user@ip`, etc.
+- `supported_commands` is a space separated list of the following keywords, enclosed in single or double quotes. These commands are used by the user interface to determine the actions the users can take:
+ - `schema`, to expose the JSON schema for the user interface. This is mandatory for all configurable entities. When `schema` requests are received, Netdata will first attempt to load the schema from `/etc/netdata/schema.d/` and `/var/lib/netdata/conf.d/schema.d`. For jobs, it will serve the schema of their template. If no schema is found for the required `id`, the `schema` request will be forwarded to the plugin, which is expected to send back the relevant schema.
+ - `get`, to expose the current configuration values, according the schema defined. `templates` cannot support `get`, since they don't maintain any data.
+ - `update`, to receive configuration updates for this entity. `templates` cannot support `update`, since they don't maintain any data.
+ - `test`, like `update` but only test the configuration and report success or failure.
+ - `add`, to receive job creation commands for templates. Only `templates` should support this command.
+ - `remove`, to remove a configuration. Only `jobs` should support this command.
+ - `enable` and `disable`, to receive user requests to enable and disable this entity. Adding only one of `enable` or `disable` to the supported commands, Netdata will add both of them. The plugin should expose these commands on `templates` only when it wants to receive `enable` and `disable` commands for all the `jobs` of this `template`.
+ - `restart`, to restart a job.
+- `view permissions` and `edit permissions` are bitmaps of the Netdata permission system to control access to the configuration. If set to zero, Netdata will require a signed in user with view and edit permissions to the Netdata's configuration system.
+
+The plugin receives commands as if it had exposed a `FUNCTION` named `config`. Netdata formats all these calls like this:
+
+```
+config id command
+```
+
+Where `id` is the unique id of the configurable entity and `command` is one of the supported commands the plugin sent to Netdata.
+
+The plugin will receive (for commands: `schema`, `get`, `remove`, `enable`, `disable` and `restart`):
+
+```
+FUNCTION transaction_id timeout "config id command" "user permissions value" "source string"
+```
+
+or (for commands: `update`, `add` and `test`):
+
+```
+FUNCTION_PAYLOAD transaction_id timeout "config id command" "user permissions value" "source string" "content/type"
+body of the payload formatted according to content/type
+FUNCTION_PAYLOAD_END
+```
+
+Once received, the plugin should process it and respond accordingly.
+
+Immediately after the plugin adds a configuration entity, if the commands `enable` and `disable` are supported by it, Netdata will send either `enable` or `disable` for it, based on the last user action, which has been persisted to disk.
+
+Plugin responses follow the same format `FUNCTIONS` do:
+
+```
+FUNCTION_RESULT_BEGIN transaction_id http_response_code content/type expiration
+body of the response formatted according to content/type
+FUNCTION_RESULT_END
+```
+
+Successful responses (HTTP response code 200) to `schema` and `get` should send back the relevant JSON object.
+All other responses should have the following response body:
+
+```json
+{
+ "status" : 404,
+ "message" : "some text"
+}
+```
+
+The user interface presents the message to users, even when the response is successful (HTTP code 200).
+
+When responding to additions and updates, Netdata uses the following success response codes to derive additional information:
+
+- `200`, responding with 200, means the configuration has been accepted and it is running.
+- `202`, responding with 202, means the configuration has been accepted but it is not yet running. A subsequent `status` action will update it.
+- `298`, responding with 298, means the configuration has been accepted but it is disabled for some reason (probably because it matches nothing or the contents are not useful - use the `message` to provide additional information).
+- `299`, responding with 299, means the configuration has been accepted but a restart is required to apply it.
+
+## Data collection
+
+data collection is defined as a series of `BEGIN` -> `SET` -> `END` lines
+
+> BEGIN type.id [microseconds]
+
+- `type.id`
+
+ is the unique identification of the chart (as given in `CHART`)
+
+- `microseconds`
+
+ is the number of microseconds since the last update of the chart. It is optional.
+
+ Under heavy system load, the system may have some latency transferring
+ data from the plugins to Netdata via the pipe. This number improves
+ accuracy significantly, since the plugin is able to calculate the
+ duration between its iterations better than Netdata.
+
+ The first time the plugin is started, no microseconds should be given
+ to Netdata.
+
+> SET id = value
+
+- `id`
+
+ is the unique identification of the dimension (of the chart just began)
+
+- `value`
+
+ is the collected value, only integer values are collected. If you want to push fractional values, multiply this value by 100 or 1000 and set the `DIMENSION` divider to 1000.
+
+> END
+
+ END does not take any parameters, it commits the collected values for all dimensions to the chart. If a dimensions was not `SET`, its value will be empty for this commit.
+
+More `SET` lines may appear to update all the dimensions of the chart.
+All of them in one `BEGIN` -> `END` block.
+
+All `SET` lines within a single `BEGIN` -> `END` block have to refer to the
+same chart.
+
+If more charts need to be updated, each chart should have its own
+`BEGIN` -> `SET` -> `END` block.
+
+If, for any reason, a plugin has issued a `BEGIN` but wants to cancel it,
+it can issue a `FLUSH`. The `FLUSH` command will instruct Netdata to ignore
+all the values collected since the last `BEGIN` command.
+
+If a plugin does not behave properly (outputs invalid lines, or does not
+follow these guidelines), will be disabled by Netdata.
+
+### collected values
+
+Netdata will collect any **signed** value in the 64bit range:
+`-9.223.372.036.854.775.808` to `+9.223.372.036.854.775.807`
+
+If a value is not collected, leave it empty, like this:
+
+`SET id =`
+
+or do not output the line at all.
+
+## Modular Plugins
+
+1. **python**, use `python.d.plugin`, there are many examples in the [python.d
+ directory](https://github.com/netdata/netdata/blob/master/src/collectors/python.d.plugin/README.md)
+
+ python is ideal for Netdata plugins. It is a simple, yet powerful way to collect data, it has a very small memory footprint, although it is not the most CPU efficient way to do it.
+
+2. **BASH**, use `charts.d.plugin`, there are many examples in the [charts.d
+ directory](https://github.com/netdata/netdata/blob/master/src/collectors/charts.d.plugin/README.md)
+
+ BASH is the simplest scripting language for collecting values. It is the less efficient though in terms of CPU resources. You can use it to collect data quickly, but extensive use of it might use a lot of system resources.
+
+3. **C**
+
+ Of course, C is the most efficient way of collecting data. This is why Netdata itself is written in C.
+
+## Writing Plugins Properly
+
+There are a few rules for writing plugins properly:
+
+1. Respect system resources
+
+ Pay special attention to efficiency:
+
+ - Initialize everything once, at the beginning. Initialization is not an expensive operation. Your plugin will most probably be started once and run forever. So, do whatever heavy operation is needed at the beginning, just once.
+ - Do the absolutely minimum while iterating to collect values repeatedly.
+ - If you need to connect to another server to collect values, avoid re-connects if possible. Connect just once, with keep-alive (for HTTP) enabled and collect values using the same connection.
+ - Avoid any CPU or memory heavy operation while collecting data. If you control memory allocation, avoid any memory allocation while iterating to collect values.
+ - Avoid running external commands when possible. If you are writing shell scripts avoid especially pipes (each pipe is another fork, a very expensive operation).
+
+2. The best way to iterate at a constant pace is this pseudo code:
+
+```js
+ var update_every = argv[1] * 1000; /* seconds * 1000 = milliseconds */
+
+ readConfiguration();
+
+ if(!verifyWeCanCollectValues()) {
+ print("DISABLE");
+ exit(1);
+ }
+
+ createCharts(); /* print CHART and DIMENSION statements */
+
+ var loops = 0;
+ var last_run = 0;
+ var next_run = 0;
+ var dt_since_last_run = 0;
+ var now = 0;
+
+ while(true) {
+ /* find the current time in milliseconds */
+ now = currentTimeStampInMilliseconds();
+
+ /*
+ * find the time of the next loop
+ * this makes sure we are always aligned
+ * with the Netdata daemon
+ */
+ next_run = now - (now % update_every) + update_every;
+
+ /*
+ * wait until it is time
+ * it is important to do it in a loop
+ * since many wait functions can be interrupted
+ */
+ while( now < next_run ) {
+ sleepMilliseconds(next_run - now);
+ now = currentTimeStampInMilliseconds();
+ }
+
+ /* calculate the time passed since the last run */
+ if ( loops > 0 )
+ dt_since_last_run = (now - last_run) * 1000; /* in microseconds */
+
+ /* prepare for the next loop */
+ last_run = now;
+ loops++;
+
+ /* do your magic here to collect values */
+ collectValues();
+
+ /* send the collected data to Netdata */
+ printValues(dt_since_last_run); /* print BEGIN, SET, END statements */
+ }
+```
+
+ Using the above procedure, your plugin will be synchronized to start data collection on steps of `update_every`. There will be no need to keep track of latencies in data collection.
+
+ Netdata interpolates values to second boundaries, so even if your plugin is not perfectly aligned it does not matter. Netdata will find out. When your plugin works in increments of `update_every`, there will be no gaps in the charts due to the possible cumulative micro-delays in data collection. Gaps will only appear if the data collection is really delayed.
+
+3. If you are not sure of memory leaks, exit every one hour. Netdata will re-start your process.
+
+4. If possible, try to autodetect if your plugin should be enabled, without any configuration.
+
+
diff --git a/src/collectors/plugins.d/functions-table.md b/src/collectors/plugins.d/functions-table.md
new file mode 100644
index 000000000..f3a8bcf36
--- /dev/null
+++ b/src/collectors/plugins.d/functions-table.md
@@ -0,0 +1,418 @@
+
+> This document is a work in progress.
+
+Plugin functions can support any kind of responses. However, the UI of Netdata has defined some structures as responses it can parse, understand and visualize.
+
+One of these responses is the `table`. This is used in almost all functions implemented today.
+
+# Functions Tables
+
+Tables are defined when `"type": "table"` is set. The following is the standard header that should be available on all `table` responses:
+
+```json
+{
+ "type": "table",
+ "status": 200,
+ "update_every": 1,
+ "help": "help text",
+ "hostname": "the hostname of the server sending this response, to appear at the title of the UI.",
+ "expires": "UNIX epoch timestamp that the response expires",
+ "has_history": "boolean: true when the datetime picker plays a role in the result set",
+ // rest of the response
+}
+```
+
+## Preflight `info` request
+
+The UI, before making the first call to a function, it does a preflight request to understand what the function supports. The plugin receives this request as a FUNCTION call specifying the `info` parameter (possible among others).
+
+The response from the plugin is expected to have the following:
+
+```json
+{
+ // standard table header - as above
+ "accepted_params": [ "a", "b", "c", ...],
+ "required_params": [
+ {
+ "id": "the keyword to use when sending / receiving this parameter",
+ "name": "the name to present to users for this parameter",
+ "help": "a help string to help users understand this parameter",
+ "type": "the type of the parameter, either: 'select' or 'multiselect'",
+ "options": [
+ {
+ "id": "the keyword to use when sending / receiving this option",
+ "name": "the name to present to users for this option",
+ "pill": "a short text to show next to this option as a pill",
+ "info": "a longer text to show on a tooltip when the user is hovering this option"
+ },
+ // more options for this required parameter
+ ]
+ },
+ // more required parameters
+ ]
+}
+```
+
+If there are no required parameters, `required_params` can be omitted.
+If there are no accepted parameters, `accepted_params` can be omitted. `accepted_param` can be sent during normal responses to update the UI with a new set of parameters available, between calls.
+
+For `logs`, the UI requires this set of `accepted_params`.
+
+Ref [Pagination](#pagination), [Deltas](#incremental-responses)
+```json
+[
+ "info", // boolean: requests the preflight `info` request
+ "after", // interval start timestamp
+ "before", // interval end timestamp
+ "direction", // sort direction [backward,forward]
+ "last", // number of records to retrieve
+ "anchor", // timestamp to divide records in pages
+ "facets",
+ "histogram", // selects facet to be used on the histogram
+ "if_modified_since", // used in PLAY mode, to indicate that the UI wants data newer than the specified timestamp
+ "data_only", // boolean: requests data (logs) only
+ "delta", // boolean: requests incremental responses
+ "tail",
+ "sampling",
+ "slice"
+]
+```
+
+If there are `required_params`, the UI by default selects the first option. [](VERIFY_WITH_UI)
+
+## Table data
+
+To define table data, the UI expects this:
+
+```json
+{
+ // header
+ "columns": {
+ "id": {
+ "index": "number: the sort order for the columns, lower numbers are first",
+ "name": "string: the name of the column as it should be presented to users",
+ "unique_key": "boolean: true when the column uniquely identifies the row",
+ "visible": "boolean: true when the column should be visible by default",
+ "type": "enum: see column types",
+ "units": "string: the units of the value, if any - this item can be omitted if the column does not have units [](VERIFY_WITH_UI)",
+ "visualization": "enum: see visualization types",
+ "value_options": {
+ "units": "string: the units of the value [](VERIFY_WITH_UI)",
+ "transform": "enum: see transformation types",
+ "decimal_points": "number: the number of fractional digits for the number",
+ "default_value": "whatever the value is: when the value is null, show this instead"
+ },
+ "max": "number: when the column is numeric, this is the max value the data have - this is used when range filtering is set and value bars",
+ "pointer_to": "id of another field: this is used when detail-string is set, to point to the column this column is detail of",
+ "sort": "enum: sorting order",
+ "sortable": "boolean: whether the column is sortable by users",
+ "sticky": "boolean: whether the column should always be visible in the UI",
+ "summary": "string: ???",
+ "filter": "enum: the filtering type for this column",
+ "full_width": "boolean: the value is expected to get most of the available column space. When multiple columns are full_width, the available space is given to all of them.",
+ "wrap": "boolean: true when the entire value should be shown, even when it occupies a big space.",
+ "default_expanded_filter": "boolean: true when the filter of this column should be expanded by default.",
+ "dummy": "boolean: when set to true, the column is not to be presented to users."
+ },
+ // more IDs
+ },
+ "data": [ // array of rows
+ [ // array of columns
+ // values for each column linked to their "index" in the columns
+ ],
+ // next row
+ ],
+ "default_sort_column": "id: the id of the column that should be sorted by default"
+}
+```
+
+**IMPORTANT**
+
+On Data values, `timestamp` column value must be in unix micro.
+
+
+### Sorting order
+
+- `ascending`
+- `descending`
+
+### Transformation types
+
+- `none`, just show the value, without any processing
+- `number`, just show a number with its units, respecting `decimal_points`
+- `duration`, makes the UI show a human readable duration, of the seconds given
+- `datetime`, makes the UI show a human readable datetime of the timestamp in UNIX epoch
+- `datetime_usec`, makes the UI show a human readable datetime of the timestamp in USEC UNIX epoch
+
+### Visualization types
+
+- `value`
+- `bar`
+- `pill`
+- `richValue`, this is not used yet, it is supposed to be a structure that will provide a value and options for it
+- `rowOptions`, defines options for the entire row - this column is hidden from the UI
+
+### rowOptions
+
+TBD
+
+### Column types
+
+- `none`
+- `integer`
+- `boolean`
+- `string`
+- `detail-string`
+- `bar-with-integer`
+- `duration`
+- `timestamp`
+- `array`
+
+### Filter types
+
+- `none`, this facet is not selectable by users
+- `multiselect`, the user can select any number of the available options
+- `facet`, similar to `multiselect`, but it also indicates that the column has been indexed and has values with counters. Columns set to `facet` must appear in the `facets` list.
+- `range`, the user can select a range of values (numeric)
+
+The plugin may send non visible columns with filter type `facet`. This means that the plugin can enable indexing on these columns, but it has not done it. Then the UI may send `facets:{ID1},{ID2},{ID3},...` to enable indexing of the columns specified.
+
+What is the default?
+
+#### Facets
+
+Facets are a special case of `multiselect` fields. They are used to provide additional information about each possible value, including their relative sort order and the number of times each value appears in the result set. Facets are filters handled by the plugin. So, the plugin will receive user selected filter like: `{KEY}:{VALUE1},{VALUE2},...`, where `{KEY}` is the id of the column and `{VALUEX}` is the id the facet option the user selected.
+
+```json
+{
+ // header,
+ "columns": ...,
+ "data": ...,
+ "facets": [
+ {
+ "id": "string: the unique id of the facet",
+ "name": "string: the human readable name of the facet",
+ "order": "integer: the sorting order of this facet - lower numbers move items above others"
+ "options": [
+ {
+ "id": "string: the unique id of the facet value",
+ "name": "string: the human readable version of the facet value",
+ "count": "integer: the number of times this value appears in the result set",
+ "order": "integer: the sorting order of this facet value - lower numbers move items above others"
+ },
+ // next option
+ ],
+ },
+ // next facet
+ ]
+}
+```
+
+## Charts
+
+```json
+{
+ // header,
+ "charts": {
+
+ },
+ "default_charts": [
+
+ ]
+}
+```
+
+
+## Histogram
+
+```json
+{
+ "available_histograms": [
+ {
+ "id": "string: the unique id of the histogram",
+ "name": "string: the human readable name of the histogram",
+ "order": "integer: the sorting order of available histograms - lower numbers move items above others"
+ }
+ ],
+ "histogram": {
+ "id": "string: the unique id of the histogram",
+ "name": "string: the human readable name of the histogram",
+ "chart": {
+ "summary": {
+ "nodes": [
+ {
+ "mg": "string",
+ "nm": "string: node name",
+ "ni": "integer: node index"
+ }
+ ],
+ "contexts": [
+ {
+ "id": "string: context id"
+ }
+ ],
+ "instances": [
+ {
+ "id": "string: instance id",
+ "ni": "integer: instance index"
+ }
+ ],
+ "dimensions": [
+ {
+ "id": "string: dimension id",
+ "pri": "integer",
+ "sts": {
+ "min": "float: dimension min value",
+ "max": "float: dimension max value",
+ "avg": "float: dimension avarage value",
+ "arp": "float",
+ "con": "float"
+ }
+ }
+ ]
+ },
+ "result": {
+ "labels": [
+ // histogram labels
+ ],
+ "point": {
+ "value": "integer",
+ "arp": "integer",
+ "pa": "integer"
+ },
+ "data": [
+ [
+ "timestamp" // unix milli
+ // one array per label
+ [
+ // values
+ ],
+ ]
+ ]
+ },
+ "view": {
+ "title": "string: histogram tittle",
+ "update_every": "integer",
+ "after": "timestamp: histogram window start",
+ "before": "timestamp: histogram window end",
+ "units": "string: histogram units",
+ "chart_type": "string: histogram chart type",
+ "min": "integer: histogram min value",
+ "max": "integer: histogram max value",
+ "dimensions": {
+ "grouped_by": [
+ // "string: histogram grouped by",
+ ],
+ "ids": [
+ // "string: histogram label id",
+ ],
+ "names": [
+ // "string: histogram human readable label name",
+ ],
+ "colors": [],
+ "units": [
+ // "string: histogram label unit",
+ ],
+ "sts": {
+ "min": [
+ // "float: label min value",
+ ],
+ "max": [
+ // "float: label max value",
+ ],
+ "avg": [
+ // "float: label avarage value",
+ ],
+ "arp": [
+ // "float",
+ ],
+ "con": [
+ // "float",
+ ]
+ }
+ }
+ },
+ "totals": {
+ "nodes": {
+ "sl": "integer",
+ "qr": "integer"
+ },
+ "contexts": {
+ "sl": "integer",
+ "qr": "integer"
+ },
+ "instances": {
+ "sl": "integer",
+ "qr": "integer"
+ },
+ "dimensions": {
+ "sl": "integer",
+ "qr": "integer"
+ }
+ },
+ "db": {
+ "update_every": "integer"
+ }
+ }
+ }
+}
+```
+
+**IMPORTANT**
+
+On Result Data, `timestamps` must be in unix milli.
+
+## Grouping
+
+```json
+{
+ // header,
+ "group_by": {
+
+ }
+}
+```
+
+## Datetime picker
+
+When `has_history: true`, the plugin must accept `after:TIMESTAMP_IN_SECONDS` and `before:TIMESTAMP_IN_SECONDS` parameters.
+The plugin can also turn pagination on, so that only a small set of the data are sent to the UI at a time.
+
+
+## Pagination
+
+The UI supports paginating results when `has_history: true`. So, when the result depends on the datetime picker and it is too big to be sent to the UI in one response, the plugin can enable datetime pagination like this:
+
+```json
+{
+ // header,
+ "columns": ...,
+ "data": ...,
+ "has_history": true,
+ "pagination": {
+ "enabled": "boolean: true to enable it",
+ "column": "string: the column id that is used for pagination",
+ "key": "string: the accepted_param that is used as the pagination anchor",
+ "units": "enum: a transformation of the datetime picker to make it compatible with the anchor: timestamp, timestamp_usec"
+ }
+}
+```
+
+Once pagination is enabled, the plugin must support the following parameters:
+
+- `{ANCHOR}:{VALUE}`, `{ANCHOR}` is the `pagination.key`, `{VALUE}` is the point the user wants to see entries at, formatted according to `pagination.units`.
+- `direction:backward` or `direction:forward` to specify if the data to be returned if before are after the anchor.
+- `last:NUMER`, the number of entries the plugin should return in the table data.
+- `query:STRING`, the full text search string the user wants to search for.
+- `if_modified_since:TIMESTAMP_USEC` and `tail:true`, used in PLAY mode, to indicate that the UI wants data newer than the specified timestamp. If there are no new data, the plugin must respond with 304 (Not Modified).
+
+### Incremental Responses
+
+- `delta:true` or `delta:false`, when the plugin supports incremental queries, it can accept the parameter `delta`. When set to true, the response of the plugin will be "added" to the previous response already available. This is used in combination with `if_modified_since` to optimize the amount of work the plugin has to do to respond.
+
+
+### Other
+
+- `slice:BOOLEAN` [](VERIFY_WITH_UI)
+- `sampling:NUMBER`
+
diff --git a/src/collectors/plugins.d/gperf-config.txt b/src/collectors/plugins.d/gperf-config.txt
new file mode 100644
index 000000000..721b771b7
--- /dev/null
+++ b/src/collectors/plugins.d/gperf-config.txt
@@ -0,0 +1,112 @@
+%{
+
+#define PLUGINSD_KEYWORD_ID_FLUSH 97
+#define PLUGINSD_KEYWORD_ID_DISABLE 98
+#define PLUGINSD_KEYWORD_ID_EXIT 99
+#define PLUGINSD_KEYWORD_ID_HOST 71
+#define PLUGINSD_KEYWORD_ID_HOST_DEFINE 72
+#define PLUGINSD_KEYWORD_ID_HOST_DEFINE_END 73
+#define PLUGINSD_KEYWORD_ID_HOST_LABEL 74
+
+#define PLUGINSD_KEYWORD_ID_BEGIN 12
+#define PLUGINSD_KEYWORD_ID_CHART 32
+#define PLUGINSD_KEYWORD_ID_CLABEL 34
+#define PLUGINSD_KEYWORD_ID_CLABEL_COMMIT 35
+#define PLUGINSD_KEYWORD_ID_DIMENSION 31
+#define PLUGINSD_KEYWORD_ID_END 13
+#define PLUGINSD_KEYWORD_ID_FUNCTION 41
+#define PLUGINSD_KEYWORD_ID_FUNCTION_RESULT_BEGIN 42
+#define PLUGINSD_KEYWORD_ID_FUNCTION_PROGRESS 43
+#define PLUGINSD_KEYWORD_ID_LABEL 51
+#define PLUGINSD_KEYWORD_ID_OVERWRITE 52
+#define PLUGINSD_KEYWORD_ID_SET 11
+#define PLUGINSD_KEYWORD_ID_VARIABLE 53
+#define PLUGINSD_KEYWORD_ID_CONFIG 100
+
+#define PLUGINSD_KEYWORD_ID_CLAIMED_ID 61
+#define PLUGINSD_KEYWORD_ID_BEGIN2 2
+#define PLUGINSD_KEYWORD_ID_SET2 1
+#define PLUGINSD_KEYWORD_ID_END2 3
+
+#define PLUGINSD_KEYWORD_ID_CHART_DEFINITION_END 33
+#define PLUGINSD_KEYWORD_ID_RBEGIN 22
+#define PLUGINSD_KEYWORD_ID_RDSTATE 23
+#define PLUGINSD_KEYWORD_ID_REND 25
+#define PLUGINSD_KEYWORD_ID_RSET 21
+#define PLUGINSD_KEYWORD_ID_RSSTATE 24
+
+#define PLUGINSD_KEYWORD_ID_DYNCFG_ENABLE 901
+#define PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_MODULE 902
+#define PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_JOB 903
+#define PLUGINSD_KEYWORD_ID_DYNCFG_RESET 904
+#define PLUGINSD_KEYWORD_ID_REPORT_JOB_STATUS 905
+#define PLUGINSD_KEYWORD_ID_DELETE_JOB 906
+
+%}
+
+%struct-type
+%omit-struct-type
+%define hash-function-name gperf_keyword_hash_function
+%define lookup-function-name gperf_lookup_keyword
+%define word-array-name gperf_keywords
+%define constants-prefix GPERF_PARSER_
+%define slot-name keyword
+%define initializer-suffix ,0,PARSER_INIT_PLUGINSD,0
+%global-table
+%readonly-tables
+%null-strings
+PARSER_KEYWORD;
+
+%%
+#
+# Plugins Only Keywords
+#
+FLUSH, PLUGINSD_KEYWORD_ID_FLUSH, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1
+DISABLE, PLUGINSD_KEYWORD_ID_DISABLE, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2
+EXIT, PLUGINSD_KEYWORD_ID_EXIT, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3
+HOST, PLUGINSD_KEYWORD_ID_HOST, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4
+HOST_DEFINE, PLUGINSD_KEYWORD_ID_HOST_DEFINE, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5
+HOST_DEFINE_END, PLUGINSD_KEYWORD_ID_HOST_DEFINE_END, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6
+HOST_LABEL, PLUGINSD_KEYWORD_ID_HOST_LABEL, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7
+#
+# Common keywords
+#
+BEGIN, PLUGINSD_KEYWORD_ID_BEGIN, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8
+CHART, PLUGINSD_KEYWORD_ID_CHART, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9
+CLABEL, PLUGINSD_KEYWORD_ID_CLABEL, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10
+CLABEL_COMMIT, PLUGINSD_KEYWORD_ID_CLABEL_COMMIT, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11
+DIMENSION, PLUGINSD_KEYWORD_ID_DIMENSION, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12
+END, PLUGINSD_KEYWORD_ID_END, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13
+FUNCTION, PLUGINSD_KEYWORD_ID_FUNCTION, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14
+FUNCTION_RESULT_BEGIN, PLUGINSD_KEYWORD_ID_FUNCTION_RESULT_BEGIN, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15
+FUNCTION_PROGRESS, PLUGINSD_KEYWORD_ID_FUNCTION_PROGRESS, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16
+LABEL, PLUGINSD_KEYWORD_ID_LABEL, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17
+OVERWRITE, PLUGINSD_KEYWORD_ID_OVERWRITE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 18
+SET, PLUGINSD_KEYWORD_ID_SET, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19
+VARIABLE, PLUGINSD_KEYWORD_ID_VARIABLE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 20
+CONFIG, PLUGINSD_KEYWORD_ID_CONFIG, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 21
+#
+# Streaming only keywords
+#
+CLAIMED_ID, PLUGINSD_KEYWORD_ID_CLAIMED_ID, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 22
+BEGIN2, PLUGINSD_KEYWORD_ID_BEGIN2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
+SET2, PLUGINSD_KEYWORD_ID_SET2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
+END2, PLUGINSD_KEYWORD_ID_END2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
+#
+# Streaming Replication keywords
+#
+CHART_DEFINITION_END, PLUGINSD_KEYWORD_ID_CHART_DEFINITION_END, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26
+RBEGIN, PLUGINSD_KEYWORD_ID_RBEGIN, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
+RDSTATE, PLUGINSD_KEYWORD_ID_RDSTATE, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
+REND, PLUGINSD_KEYWORD_ID_REND, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
+RSET, PLUGINSD_KEYWORD_ID_RSET, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30
+RSSTATE, PLUGINSD_KEYWORD_ID_RSSTATE, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31
+#
+# obsolete - do nothing commands
+#
+DYNCFG_ENABLE, PLUGINSD_KEYWORD_ID_DYNCFG_ENABLE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32
+DYNCFG_REGISTER_MODULE, PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_MODULE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33
+DYNCFG_REGISTER_JOB, PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_JOB, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34
+DYNCFG_RESET, PLUGINSD_KEYWORD_ID_DYNCFG_RESET, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35
+REPORT_JOB_STATUS, PLUGINSD_KEYWORD_ID_REPORT_JOB_STATUS, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 36
+DELETE_JOB, PLUGINSD_KEYWORD_ID_DELETE_JOB, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 37
diff --git a/src/collectors/plugins.d/gperf-hashtable.h b/src/collectors/plugins.d/gperf-hashtable.h
new file mode 100644
index 000000000..315e2f7c7
--- /dev/null
+++ b/src/collectors/plugins.d/gperf-hashtable.h
@@ -0,0 +1,237 @@
+/* ANSI-C code produced by gperf version 3.1 */
+/* Command-line: gperf --multiple-iterations=1000 --output-file=gperf-hashtable.h gperf-config.txt */
+/* Computed positions: -k'1-2' */
+
+#if !((' ' == 32) && ('!' == 33) && ('"' == 34) && ('#' == 35) \
+ && ('%' == 37) && ('&' == 38) && ('\'' == 39) && ('(' == 40) \
+ && (')' == 41) && ('*' == 42) && ('+' == 43) && (',' == 44) \
+ && ('-' == 45) && ('.' == 46) && ('/' == 47) && ('0' == 48) \
+ && ('1' == 49) && ('2' == 50) && ('3' == 51) && ('4' == 52) \
+ && ('5' == 53) && ('6' == 54) && ('7' == 55) && ('8' == 56) \
+ && ('9' == 57) && (':' == 58) && (';' == 59) && ('<' == 60) \
+ && ('=' == 61) && ('>' == 62) && ('?' == 63) && ('A' == 65) \
+ && ('B' == 66) && ('C' == 67) && ('D' == 68) && ('E' == 69) \
+ && ('F' == 70) && ('G' == 71) && ('H' == 72) && ('I' == 73) \
+ && ('J' == 74) && ('K' == 75) && ('L' == 76) && ('M' == 77) \
+ && ('N' == 78) && ('O' == 79) && ('P' == 80) && ('Q' == 81) \
+ && ('R' == 82) && ('S' == 83) && ('T' == 84) && ('U' == 85) \
+ && ('V' == 86) && ('W' == 87) && ('X' == 88) && ('Y' == 89) \
+ && ('Z' == 90) && ('[' == 91) && ('\\' == 92) && (']' == 93) \
+ && ('^' == 94) && ('_' == 95) && ('a' == 97) && ('b' == 98) \
+ && ('c' == 99) && ('d' == 100) && ('e' == 101) && ('f' == 102) \
+ && ('g' == 103) && ('h' == 104) && ('i' == 105) && ('j' == 106) \
+ && ('k' == 107) && ('l' == 108) && ('m' == 109) && ('n' == 110) \
+ && ('o' == 111) && ('p' == 112) && ('q' == 113) && ('r' == 114) \
+ && ('s' == 115) && ('t' == 116) && ('u' == 117) && ('v' == 118) \
+ && ('w' == 119) && ('x' == 120) && ('y' == 121) && ('z' == 122) \
+ && ('{' == 123) && ('|' == 124) && ('}' == 125) && ('~' == 126))
+/* The character set is not based on ISO-646. */
+#error "gperf generated tables don't work with this execution character set. Please report a bug to <bug-gperf@gnu.org>."
+#endif
+
+#line 1 "gperf-config.txt"
+
+
+#define PLUGINSD_KEYWORD_ID_FLUSH 97
+#define PLUGINSD_KEYWORD_ID_DISABLE 98
+#define PLUGINSD_KEYWORD_ID_EXIT 99
+#define PLUGINSD_KEYWORD_ID_HOST 71
+#define PLUGINSD_KEYWORD_ID_HOST_DEFINE 72
+#define PLUGINSD_KEYWORD_ID_HOST_DEFINE_END 73
+#define PLUGINSD_KEYWORD_ID_HOST_LABEL 74
+
+#define PLUGINSD_KEYWORD_ID_BEGIN 12
+#define PLUGINSD_KEYWORD_ID_CHART 32
+#define PLUGINSD_KEYWORD_ID_CLABEL 34
+#define PLUGINSD_KEYWORD_ID_CLABEL_COMMIT 35
+#define PLUGINSD_KEYWORD_ID_DIMENSION 31
+#define PLUGINSD_KEYWORD_ID_END 13
+#define PLUGINSD_KEYWORD_ID_FUNCTION 41
+#define PLUGINSD_KEYWORD_ID_FUNCTION_RESULT_BEGIN 42
+#define PLUGINSD_KEYWORD_ID_FUNCTION_PROGRESS 43
+#define PLUGINSD_KEYWORD_ID_LABEL 51
+#define PLUGINSD_KEYWORD_ID_OVERWRITE 52
+#define PLUGINSD_KEYWORD_ID_SET 11
+#define PLUGINSD_KEYWORD_ID_VARIABLE 53
+#define PLUGINSD_KEYWORD_ID_CONFIG 100
+
+#define PLUGINSD_KEYWORD_ID_CLAIMED_ID 61
+#define PLUGINSD_KEYWORD_ID_BEGIN2 2
+#define PLUGINSD_KEYWORD_ID_SET2 1
+#define PLUGINSD_KEYWORD_ID_END2 3
+
+#define PLUGINSD_KEYWORD_ID_CHART_DEFINITION_END 33
+#define PLUGINSD_KEYWORD_ID_RBEGIN 22
+#define PLUGINSD_KEYWORD_ID_RDSTATE 23
+#define PLUGINSD_KEYWORD_ID_REND 25
+#define PLUGINSD_KEYWORD_ID_RSET 21
+#define PLUGINSD_KEYWORD_ID_RSSTATE 24
+
+#define PLUGINSD_KEYWORD_ID_DYNCFG_ENABLE 901
+#define PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_MODULE 902
+#define PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_JOB 903
+#define PLUGINSD_KEYWORD_ID_DYNCFG_RESET 904
+#define PLUGINSD_KEYWORD_ID_REPORT_JOB_STATUS 905
+#define PLUGINSD_KEYWORD_ID_DELETE_JOB 906
+
+
+#define GPERF_PARSER_TOTAL_KEYWORDS 37
+#define GPERF_PARSER_MIN_WORD_LENGTH 3
+#define GPERF_PARSER_MAX_WORD_LENGTH 22
+#define GPERF_PARSER_MIN_HASH_VALUE 7
+#define GPERF_PARSER_MAX_HASH_VALUE 52
+/* maximum key range = 46, duplicates = 0 */
+
+#ifdef __GNUC__
+__inline
+#else
+#ifdef __cplusplus
+inline
+#endif
+#endif
+static unsigned int
+gperf_keyword_hash_function (register const char *str, register size_t len)
+{
+ static const unsigned char asso_values[] =
+ {
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 6, 24, 3, 9, 6,
+ 0, 53, 3, 27, 53, 53, 33, 53, 42, 0,
+ 53, 53, 0, 30, 53, 12, 3, 53, 9, 0,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53, 53, 53, 53, 53,
+ 53, 53, 53, 53, 53, 53
+ };
+ return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]];
+}
+
+static const PARSER_KEYWORD gperf_keywords[] =
+ {
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 67 "gperf-config.txt"
+ {"HOST", PLUGINSD_KEYWORD_ID_HOST, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 87 "gperf-config.txt"
+ {"CONFIG", PLUGINSD_KEYWORD_ID_CONFIG, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 21},
+#line 101 "gperf-config.txt"
+ {"REND", PLUGINSD_KEYWORD_ID_REND, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
+#line 75 "gperf-config.txt"
+ {"CHART", PLUGINSD_KEYWORD_ID_CHART, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9},
+#line 84 "gperf-config.txt"
+ {"OVERWRITE", PLUGINSD_KEYWORD_ID_OVERWRITE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 18},
+#line 70 "gperf-config.txt"
+ {"HOST_LABEL", PLUGINSD_KEYWORD_ID_HOST_LABEL, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7},
+#line 68 "gperf-config.txt"
+ {"HOST_DEFINE", PLUGINSD_KEYWORD_ID_HOST_DEFINE, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 100 "gperf-config.txt"
+ {"RDSTATE", PLUGINSD_KEYWORD_ID_RDSTATE, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
+#line 86 "gperf-config.txt"
+ {"VARIABLE", PLUGINSD_KEYWORD_ID_VARIABLE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 20},
+#line 69 "gperf-config.txt"
+ {"HOST_DEFINE_END", PLUGINSD_KEYWORD_ID_HOST_DEFINE_END, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6},
+#line 66 "gperf-config.txt"
+ {"EXIT", PLUGINSD_KEYWORD_ID_EXIT, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3},
+#line 80 "gperf-config.txt"
+ {"FUNCTION", PLUGINSD_KEYWORD_ID_FUNCTION, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14},
+#line 110 "gperf-config.txt"
+ {"DYNCFG_RESET", PLUGINSD_KEYWORD_ID_DYNCFG_RESET, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35},
+#line 107 "gperf-config.txt"
+ {"DYNCFG_ENABLE", PLUGINSD_KEYWORD_ID_DYNCFG_ENABLE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32},
+#line 111 "gperf-config.txt"
+ {"REPORT_JOB_STATUS", PLUGINSD_KEYWORD_ID_REPORT_JOB_STATUS, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 36},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 112 "gperf-config.txt"
+ {"DELETE_JOB", PLUGINSD_KEYWORD_ID_DELETE_JOB, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 37},
+#line 98 "gperf-config.txt"
+ {"CHART_DEFINITION_END", PLUGINSD_KEYWORD_ID_CHART_DEFINITION_END, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 109 "gperf-config.txt"
+ {"DYNCFG_REGISTER_JOB", PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_JOB, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34},
+#line 82 "gperf-config.txt"
+ {"FUNCTION_PROGRESS", PLUGINSD_KEYWORD_ID_FUNCTION_PROGRESS, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16},
+#line 99 "gperf-config.txt"
+ {"RBEGIN", PLUGINSD_KEYWORD_ID_RBEGIN, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
+#line 108 "gperf-config.txt"
+ {"DYNCFG_REGISTER_MODULE", PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_MODULE, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 81 "gperf-config.txt"
+ {"FUNCTION_RESULT_BEGIN", PLUGINSD_KEYWORD_ID_FUNCTION_RESULT_BEGIN, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15},
+#line 102 "gperf-config.txt"
+ {"RSET", PLUGINSD_KEYWORD_ID_RSET, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30},
+#line 74 "gperf-config.txt"
+ {"BEGIN", PLUGINSD_KEYWORD_ID_BEGIN, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
+#line 92 "gperf-config.txt"
+ {"BEGIN2", PLUGINSD_KEYWORD_ID_BEGIN2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
+#line 103 "gperf-config.txt"
+ {"RSSTATE", PLUGINSD_KEYWORD_ID_RSSTATE, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31},
+#line 64 "gperf-config.txt"
+ {"FLUSH", PLUGINSD_KEYWORD_ID_FLUSH, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
+#line 85 "gperf-config.txt"
+ {"SET", PLUGINSD_KEYWORD_ID_SET, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19},
+#line 93 "gperf-config.txt"
+ {"SET2", PLUGINSD_KEYWORD_ID_SET2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 76 "gperf-config.txt"
+ {"CLABEL", PLUGINSD_KEYWORD_ID_CLABEL, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10},
+#line 65 "gperf-config.txt"
+ {"DISABLE", PLUGINSD_KEYWORD_ID_DISABLE, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
+#line 83 "gperf-config.txt"
+ {"LABEL", PLUGINSD_KEYWORD_ID_LABEL, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17},
+#line 78 "gperf-config.txt"
+ {"DIMENSION", PLUGINSD_KEYWORD_ID_DIMENSION, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12},
+#line 91 "gperf-config.txt"
+ {"CLAIMED_ID", PLUGINSD_KEYWORD_ID_CLAIMED_ID, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 22},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 77 "gperf-config.txt"
+ {"CLABEL_COMMIT", PLUGINSD_KEYWORD_ID_CLABEL_COMMIT, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11},
+ {(char*)0,0,PARSER_INIT_PLUGINSD,0},
+#line 79 "gperf-config.txt"
+ {"END", PLUGINSD_KEYWORD_ID_END, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13},
+#line 94 "gperf-config.txt"
+ {"END2", PLUGINSD_KEYWORD_ID_END2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}
+ };
+
+const PARSER_KEYWORD *
+gperf_lookup_keyword (register const char *str, register size_t len)
+{
+ if (len <= GPERF_PARSER_MAX_WORD_LENGTH && len >= GPERF_PARSER_MIN_WORD_LENGTH)
+ {
+ register unsigned int key = gperf_keyword_hash_function (str, len);
+
+ if (key <= GPERF_PARSER_MAX_HASH_VALUE)
+ {
+ register const char *s = gperf_keywords[key].keyword;
+
+ if (s && *str == *s && !strcmp (str + 1, s + 1))
+ return &gperf_keywords[key];
+ }
+ }
+ return 0;
+}
diff --git a/src/collectors/plugins.d/local_listeners.c b/src/collectors/plugins.d/local_listeners.c
new file mode 100644
index 000000000..d815f67fe
--- /dev/null
+++ b/src/collectors/plugins.d/local_listeners.c
@@ -0,0 +1,292 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
+#include "libnetdata/maps/local-sockets.h"
+#include "libnetdata/required_dummies.h"
+
+// --------------------------------------------------------------------------------------------------------------------
+
+static const char *protocol_name(LOCAL_SOCKET *n) {
+ if(n->local.family == AF_INET) {
+ if(n->local.protocol == IPPROTO_TCP)
+ return "TCP";
+ else if(n->local.protocol == IPPROTO_UDP)
+ return "UDP";
+ else
+ return "UNKNOWN_IPV4";
+ }
+ else if(n->local.family == AF_INET6) {
+ if (n->local.protocol == IPPROTO_TCP)
+ return "TCP6";
+ else if(n->local.protocol == IPPROTO_UDP)
+ return "UDP6";
+ else
+ return "UNKNOWN_IPV6";
+ }
+ else
+ return "UNKNOWN";
+}
+
+static void print_local_listeners(LS_STATE *ls __maybe_unused, LOCAL_SOCKET *n, void *data __maybe_unused) {
+ char local_address[INET6_ADDRSTRLEN];
+ char remote_address[INET6_ADDRSTRLEN];
+
+ if(n->local.family == AF_INET) {
+ ipv4_address_to_txt(n->local.ip.ipv4, local_address);
+ ipv4_address_to_txt(n->remote.ip.ipv4, remote_address);
+ }
+ else if(n->local.family == AF_INET6) {
+ ipv6_address_to_txt(&n->local.ip.ipv6, local_address);
+ ipv6_address_to_txt(&n->remote.ip.ipv6, remote_address);
+ }
+
+ printf("%s|%s|%u|%s\n", protocol_name(n), local_address, n->local.port, string2str(n->cmdline));
+}
+
+static void print_local_listeners_debug(LS_STATE *ls __maybe_unused, LOCAL_SOCKET *n, void *data __maybe_unused) {
+ char local_address[INET6_ADDRSTRLEN];
+ char remote_address[INET6_ADDRSTRLEN];
+
+ if(n->local.family == AF_INET) {
+ ipv4_address_to_txt(n->local.ip.ipv4, local_address);
+ ipv4_address_to_txt(n->remote.ip.ipv4, remote_address);
+ }
+ else if(n->local.family == AF_INET6) {
+ ipv6_address_to_txt(&n->local.ip.ipv6, local_address);
+ ipv6_address_to_txt(&n->remote.ip.ipv6, remote_address);
+ }
+
+ printf("%s, direction=%s%s%s%s%s pid=%d, state=0x%0x, ns=%"PRIu64", local=%s[:%u], remote=%s[:%u], uid=%u, comm=%s\n",
+ protocol_name(n),
+ (n->direction & SOCKET_DIRECTION_LISTEN) ? "LISTEN," : "",
+ (n->direction & SOCKET_DIRECTION_INBOUND) ? "INBOUND," : "",
+ (n->direction & SOCKET_DIRECTION_OUTBOUND) ? "OUTBOUND," : "",
+ (n->direction & (SOCKET_DIRECTION_LOCAL_INBOUND|SOCKET_DIRECTION_LOCAL_OUTBOUND)) ? "LOCAL," : "",
+ (n->direction == 0) ? "NONE," : "",
+ n->pid,
+ n->state,
+ n->net_ns_inode,
+ local_address, n->local.port,
+ remote_address, n->remote.port,
+ n->uid,
+ n->comm);
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+int main(int argc, char **argv) {
+ static struct rusage started, ended;
+ getrusage(RUSAGE_SELF, &started);
+ bool debug = false;
+
+ LS_STATE ls = {
+ .config = {
+ .listening = true,
+ .inbound = false,
+ .outbound = false,
+ .local = false,
+ .tcp4 = true,
+ .tcp6 = true,
+ .udp4 = true,
+ .udp6 = true,
+ .pid = false,
+ .cmdline = true,
+ .comm = false,
+ .namespaces = true,
+
+ .max_errors = 10,
+
+ .cb = print_local_listeners,
+ .data = NULL,
+ },
+ .stats = { 0 },
+ .sockets_hashtable = { 0 },
+ .local_ips_hashtable = { 0 },
+ .listening_ports_hashtable = { 0 },
+ };
+
+ netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
+ if(!netdata_configured_host_prefix) netdata_configured_host_prefix = "";
+
+ for (int i = 1; i < argc; i++) {
+ char *s = argv[i];
+ bool positive = true;
+
+ if(strcmp(s, "-h") == 0 || strcmp(s, "--help") == 0) {
+ fprintf(stderr,
+ "\n"
+ " Netdata local-listeners\n"
+ " (C) 2024 Netdata Inc.\n"
+ "\n"
+ " This program prints a list of all the processes that have a listening socket.\n"
+ " It is used by Netdata to auto-detect the services running.\n"
+ "\n"
+ " Options:\n"
+ "\n"
+ " The options:\n"
+ "\n"
+ " udp, udp4, udp6, tcp, tcp4, tcp6, ipv4, ipv6\n"
+ "\n"
+ " select the sources to read currently available sockets.\n"
+ "\n"
+ " while:\n"
+ "\n"
+ " listening, local, inbound, outbound, namespaces\n"
+ "\n"
+ " filter the output based on the direction of the sockets.\n"
+ "\n"
+ " Prepending any option with 'no-', 'not-' or 'non-' will disable them.\n"
+ "\n"
+ " Current options:\n"
+ "\n"
+ " %s %s %s %s %s %s %s %s %s\n"
+ "\n"
+ " Option 'debug' enables all sources and all directions and provides\n"
+ " a full dump of current sockets.\n"
+ "\n"
+ " DIRECTION DETECTION\n"
+ " The program detects the direction of the sockets using these rules:\n"
+ "\n"
+ " - listening are all the TCP sockets that are in listen state\n"
+ " and all sockets that their remote IP is zero.\n"
+ "\n"
+ " - local are all the non-listening sockets that either their source IP\n"
+ " or their remote IP are loopback addresses. Loopback addresses are\n"
+ " those in 127.0.0.0/8 and ::1. When IPv4 addresses are mapped\n"
+ " into IPv6, the program extracts the IPv4 addresses to check them.\n"
+ "\n"
+ " Also, local are considered all the sockets that their remote\n"
+ " IP is one of the IPs that appear as local on another socket.\n"
+ "\n"
+ " - inbound are all the non-listening and non-local sockets that their local\n"
+ " port is a port of another socket that is marked as listening.\n"
+ "\n"
+ " - outbound are all the other sockets.\n"
+ "\n"
+ " Keep in mind that this kind of socket direction detection is not 100%% accurate,\n"
+ " and there may be cases (e.g. reusable sockets) that this code may incorrectly\n"
+ " mark sockets as inbound or outbound.\n"
+ "\n"
+ " WARNING:\n"
+ " This program reads the entire /proc/net/{tcp,udp,tcp6,upd6} files, builds\n"
+ " multiple hash maps in memory and traverses the entire /proc filesystem to\n"
+ " associate sockets with processes. We have made the most to make it as\n"
+ " lightweight and fast as possible, but still this program has a lot of work\n"
+ " to do and it may have some impact on very busy servers with millions of.\n"
+ " established connections."
+ "\n"
+ " Therefore, we suggest to avoid running it repeatedly for data collection.\n"
+ "\n"
+ " Netdata executes it only when it starts to auto-detect data collection sources\n"
+ " and initialize the network dependencies explorer."
+ "\n"
+ , ls.config.udp4 ? "udp4" :"no-udp4"
+ , ls.config.udp6 ? "udp6" :"no-udp6"
+ , ls.config.tcp4 ? "tcp4" :"no-tcp4"
+ , ls.config.tcp6 ? "tcp6" :"no-tcp6"
+ , ls.config.listening ? "listening" : "no-listening"
+ , ls.config.local ? "local" : "no-local"
+ , ls.config.inbound ? "inbound" : "no-inbound"
+ , ls.config.outbound ? "outbound" : "no-outbound"
+ , ls.config.namespaces ? "namespaces" : "no-namespaces"
+ );
+ exit(1);
+ }
+
+ if(strncmp(s, "no-", 3) == 0) {
+ positive = false;
+ s += 3;
+ }
+ else if(strncmp(s, "not-", 4) == 0 || strncmp(s, "non-", 4) == 0) {
+ positive = false;
+ s += 4;
+ }
+
+ if(strcmp(s, "debug") == 0 || strcmp(s, "--debug") == 0) {
+ fprintf(stderr, "%s debugging\n", positive ? "enabling" : "disabling");
+ ls.config.listening = true;
+ ls.config.local = true;
+ ls.config.inbound = true;
+ ls.config.outbound = true;
+ ls.config.pid = true;
+ ls.config.comm = true;
+ ls.config.cmdline = true;
+ ls.config.namespaces = true;
+ ls.config.uid = true;
+ ls.config.max_errors = SIZE_MAX;
+ ls.config.cb = print_local_listeners_debug;
+
+ debug = true;
+ }
+ else if (strcmp("tcp", s) == 0) {
+ ls.config.tcp4 = ls.config.tcp6 = positive;
+ // fprintf(stderr, "%s tcp4 and tcp6\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("tcp4", s) == 0) {
+ ls.config.tcp4 = positive;
+ // fprintf(stderr, "%s tcp4\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("tcp6", s) == 0) {
+ ls.config.tcp6 = positive;
+ // fprintf(stderr, "%s tcp6\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("udp", s) == 0) {
+ ls.config.udp4 = ls.config.udp6 = positive;
+ // fprintf(stderr, "%s udp4 and udp6\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("udp4", s) == 0) {
+ ls.config.udp4 = positive;
+ // fprintf(stderr, "%s udp4\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("udp6", s) == 0) {
+ ls.config.udp6 = positive;
+ // fprintf(stderr, "%s udp6\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("ipv4", s) == 0) {
+ ls.config.tcp4 = ls.config.udp4 = positive;
+ // fprintf(stderr, "%s udp4 and tcp4\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("ipv6", s) == 0) {
+ ls.config.tcp6 = ls.config.udp6 = positive;
+ // fprintf(stderr, "%s udp6 and tcp6\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("listening", s) == 0) {
+ ls.config.listening = positive;
+ // fprintf(stderr, "%s listening\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("local", s) == 0) {
+ ls.config.local = positive;
+ // fprintf(stderr, "%s local\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("inbound", s) == 0) {
+ ls.config.inbound = positive;
+ // fprintf(stderr, "%s inbound\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("outbound", s) == 0) {
+ ls.config.outbound = positive;
+ // fprintf(stderr, "%s outbound\n", positive ? "enabling" : "disabling");
+ }
+ else if (strcmp("namespaces", s) == 0 || strcmp("ns", s) == 0) {
+ ls.config.namespaces = positive;
+ // fprintf(stderr, "%s namespaces\n", positive ? "enabling" : "disabling");
+ }
+ else {
+ fprintf(stderr, "Unknown parameter %s\n", s);
+ exit(1);
+ }
+ }
+
+ local_sockets_process(&ls);
+
+ getrusage(RUSAGE_SELF, &ended);
+
+ if(debug) {
+ unsigned long long user = ended.ru_utime.tv_sec * 1000000ULL + ended.ru_utime.tv_usec - started.ru_utime.tv_sec * 1000000ULL + started.ru_utime.tv_usec;
+ unsigned long long system = ended.ru_stime.tv_sec * 1000000ULL + ended.ru_stime.tv_usec - started.ru_stime.tv_sec * 1000000ULL + started.ru_stime.tv_usec;
+ unsigned long long total = user + system;
+
+ fprintf(stderr, "CPU Usage %llu user, %llu system, %llu total\n", user, system, total);
+ }
+
+ return 0;
+}
diff --git a/src/collectors/plugins.d/ndsudo.c b/src/collectors/plugins.d/ndsudo.c
new file mode 100644
index 000000000..13e2ccaaa
--- /dev/null
+++ b/src/collectors/plugins.d/ndsudo.c
@@ -0,0 +1,308 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdbool.h>
+
+#define MAX_SEARCH 2
+#define MAX_PARAMETERS 128
+#define ERROR_BUFFER_SIZE 1024
+
+struct command {
+ const char *name;
+ const char *params;
+ const char *search[MAX_SEARCH];
+} allowed_commands[] = {
+ {
+ .name = "nvme-list",
+ .params = "list --output-format=json",
+ .search = {
+ [0] = "nvme",
+ [1] = NULL,
+ },
+ },
+ {
+ .name = "nvme-smart-log",
+ .params = "smart-log {{device}} --output-format=json",
+ .search = {
+ [0] = "nvme",
+ [1] = NULL,
+ },
+ },
+ {
+ .name = "megacli-disk-info",
+ .params = "-LDPDInfo -aAll -NoLog",
+ .search = {
+ [0] = "megacli",
+ [1] = "MegaCli",
+ },
+ },
+ {
+ .name = "megacli-battery-info",
+ .params = "-AdpBbuCmd -aAll -NoLog",
+ .search = {
+ [0] = "megacli",
+ [1] = "MegaCli",
+ },
+ },
+ {
+ .name = "arcconf-ld-info",
+ .params = "GETCONFIG 1 LD",
+ .search = {
+ [0] = "arcconf",
+ [1] = NULL,
+ },
+ },
+ {
+ .name = "arcconf-pd-info",
+ .params = "GETCONFIG 1 PD",
+ .search = {
+ [0] = "arcconf",
+ [1] = NULL,
+ },
+ }
+};
+
+bool command_exists_in_dir(const char *dir, const char *cmd, char *dst, size_t dst_size) {
+ snprintf(dst, dst_size, "%s/%s", dir, cmd);
+ return access(dst, X_OK) == 0;
+}
+
+bool command_exists_in_PATH(const char *cmd, char *dst, size_t dst_size) {
+ if(!dst || !dst_size)
+ return false;
+
+ char *path = getenv("PATH");
+ if(!path)
+ return false;
+
+ char *path_copy = strdup(path);
+ if (!path_copy)
+ return false;
+
+ char *dir;
+ bool found = false;
+ dir = strtok(path_copy, ":");
+ while(dir && !found) {
+ found = command_exists_in_dir(dir, cmd, dst, dst_size);
+ dir = strtok(NULL, ":");
+ }
+
+ free(path_copy);
+ return found;
+}
+
+struct command *find_command(const char *cmd) {
+ size_t size = sizeof(allowed_commands) / sizeof(allowed_commands[0]);
+ for(size_t i = 0; i < size ;i++) {
+ if(strcmp(cmd, allowed_commands[i].name) == 0)
+ return &allowed_commands[i];
+ }
+
+ return NULL;
+}
+
+bool check_string(const char *str, size_t index, char *err, size_t err_size) {
+ const char *s = str;
+ while(*s) {
+ char c = *s++;
+ if(!((c >= 'A' && c <= 'Z') ||
+ (c >= 'a' && c <= 'z') ||
+ (c >= '0' && c <= '9') ||
+ c == ' ' || c == '_' || c == '-' || c == '/' || c == '.')) {
+ snprintf(err, err_size, "command line argument No %zu includes invalid character '%c'", index, c);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool check_params(int argc, char **argv, char *err, size_t err_size) {
+ for(int i = 0 ; i < argc ;i++)
+ if(!check_string(argv[i], i, err, err_size))
+ return false;
+
+ return true;
+}
+
+char *find_variable_in_argv(const char *variable, int argc, char **argv, char *err, size_t err_size) {
+ for (int i = 1; i < argc - 1; i++) {
+ if (strcmp(argv[i], variable) == 0)
+ return strdup(argv[i + 1]);
+ }
+
+ snprintf(err, err_size, "variable '%s' is required, but was not provided in the command line parameters", variable);
+
+ return NULL;
+}
+
+bool search_and_replace_params(struct command *cmd, char **params, size_t max_params, const char *filename, int argc, char **argv, char *err, size_t err_size) {
+ if (!cmd || !params || !max_params) {
+ snprintf(err, err_size, "search_and_replace_params() internal error");
+ return false;
+ }
+
+ const char *delim = " ";
+ char *token;
+ char *temp_params = strdup(cmd->params);
+ if (!temp_params) {
+ snprintf(err, err_size, "search_and_replace_params() cannot allocate memory");
+ return false;
+ }
+
+ size_t param_count = 0;
+ params[param_count++] = strdup(filename);
+
+ token = strtok(temp_params, delim);
+ while (token && param_count < max_params - 1) {
+ size_t len = strlen(token);
+
+ char *value = NULL;
+
+ if (strncmp(token, "{{", 2) == 0 && strncmp(token + len - 2, "}}", 2) == 0) {
+ token[0] = '-';
+ token[1] = '-';
+ token[len - 2] = '\0';
+
+ value = find_variable_in_argv(token, argc, argv, err, err_size);
+ }
+ else
+ value = strdup(token);
+
+ if(!value)
+ goto cleanup;
+
+ params[param_count++] = value;
+ token = strtok(NULL, delim);
+ }
+
+ params[param_count] = NULL; // Null-terminate the params array
+ free(temp_params);
+ return true;
+
+cleanup:
+ if(!err[0])
+ snprintf(err, err_size, "memory allocation failure");
+
+ free(temp_params);
+ for (size_t i = 0; i < param_count; ++i) {
+ free(params[i]);
+ params[i] = NULL;
+ }
+ return false;
+}
+
+void show_help() {
+ fprintf(stdout, "\n");
+ fprintf(stdout, "ndsudo\n");
+ fprintf(stdout, "\n");
+ fprintf(stdout, "(C) Netdata Inc.\n");
+ fprintf(stdout, "\n");
+ fprintf(stdout, "A helper to allow Netdata run privileged commands.\n");
+ fprintf(stdout, "\n");
+ fprintf(stdout, " --test\n");
+ fprintf(stdout, " print the generated command that will be run, without running it.\n");
+ fprintf(stdout, "\n");
+ fprintf(stdout, " --help\n");
+ fprintf(stdout, " print this message.\n");
+ fprintf(stdout, "\n");
+
+ fprintf(stdout, "The following commands are supported:\n\n");
+
+ size_t size = sizeof(allowed_commands) / sizeof(allowed_commands[0]);
+ for(size_t i = 0; i < size ;i++) {
+ fprintf(stdout, "- Command : %s\n", allowed_commands[i].name);
+ fprintf(stdout, " Executables: ");
+ for(size_t j = 0; j < MAX_SEARCH && allowed_commands[i].search[j] ;j++) {
+ fprintf(stdout, "%s ", allowed_commands[i].search[j]);
+ }
+ fprintf(stdout, "\n");
+ fprintf(stdout, " Parameters : %s\n\n", allowed_commands[i].params);
+ }
+
+ fprintf(stdout, "The program searches for executables in the system path.\n");
+ fprintf(stdout, "\n");
+ fprintf(stdout, "Variables given as {{variable}} are expected on the command line as:\n");
+ fprintf(stdout, " --variable VALUE\n");
+ fprintf(stdout, "\n");
+ fprintf(stdout, "VALUE can include space, A-Z, a-z, 0-9, _, -, /, and .\n");
+ fprintf(stdout, "\n");
+}
+
+int main(int argc, char *argv[]) {
+ char error_buffer[ERROR_BUFFER_SIZE] = "";
+
+ if (argc < 2) {
+ fprintf(stderr, "at least 2 parameters are needed, but %d were given.\n", argc);
+ return 1;
+ }
+
+ if(!check_params(argc, argv, error_buffer, sizeof(error_buffer))) {
+ fprintf(stderr, "invalid characters in parameters: %s\n", error_buffer);
+ return 2;
+ }
+
+ bool test = false;
+ const char *cmd = argv[1];
+ if(strcmp(cmd, "--help") == 0 || strcmp(cmd, "-h") == 0) {
+ show_help();
+ exit(0);
+ }
+ else if(strcmp(cmd, "--test") == 0) {
+ cmd = argv[2];
+ test = true;
+ }
+
+ struct command *command = find_command(cmd);
+ if(!command) {
+ fprintf(stderr, "command not recognized: %s\n", cmd);
+ return 3;
+ }
+
+ char new_path[] = "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin";
+ setenv("PATH", new_path, 1);
+
+ bool found = false;
+ char filename[FILENAME_MAX];
+
+ for(size_t i = 0; i < MAX_SEARCH && !found ;i++) {
+ if(command->search[i]) {
+ found = command_exists_in_PATH(command->search[i], filename, sizeof(filename));
+ if(!found) {
+ size_t len = strlen(error_buffer);
+ snprintf(&error_buffer[len], sizeof(error_buffer) - len, "%s ", command->search[i]);
+ }
+ }
+ }
+
+ if(!found) {
+ fprintf(stderr, "%s: not available in PATH.\n", error_buffer);
+ return 4;
+ }
+ else
+ error_buffer[0] = '\0';
+
+ char *params[MAX_PARAMETERS];
+ if(!search_and_replace_params(command, params, MAX_PARAMETERS, filename, argc, argv, error_buffer, sizeof(error_buffer))) {
+ fprintf(stderr, "command line parameters are not satisfied: %s\n", error_buffer);
+ return 5;
+ }
+
+ if(test) {
+ fprintf(stderr, "Command to run: \n");
+
+ for(size_t i = 0; i < MAX_PARAMETERS && params[i] ;i++)
+ fprintf(stderr, "'%s' ", params[i]);
+
+ fprintf(stderr, "\n");
+
+ exit(0);
+ }
+ else {
+ char *clean_env[] = {NULL};
+ execve(filename, params, clean_env);
+ perror("execve"); // execve only returns on error
+ return 6;
+ }
+}
diff --git a/src/collectors/plugins.d/plugins_d.c b/src/collectors/plugins.d/plugins_d.c
new file mode 100644
index 000000000..0bcb3df63
--- /dev/null
+++ b/src/collectors/plugins.d/plugins_d.c
@@ -0,0 +1,360 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "plugins_d.h"
+#include "pluginsd_parser.h"
+
+char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { [0] = PLUGINS_DIR, };
+struct plugind *pluginsd_root = NULL;
+
+inline size_t pluginsd_initialize_plugin_directories()
+{
+ char plugins_dirs[(FILENAME_MAX * 2) + 1];
+ static char *plugins_dir_list = NULL;
+
+ // Get the configuration entry
+ if (likely(!plugins_dir_list)) {
+ snprintfz(plugins_dirs, FILENAME_MAX * 2, "\"%s\" \"%s/custom-plugins.d\"", PLUGINS_DIR, CONFIG_DIR);
+ plugins_dir_list = strdupz(config_get(CONFIG_SECTION_DIRECTORIES, "plugins", plugins_dirs));
+ }
+
+ // Parse it and store it to plugin directories
+ return quoted_strings_splitter_config(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES);
+}
+
+static inline void plugin_set_disabled(struct plugind *cd) {
+ spinlock_lock(&cd->unsafe.spinlock);
+ cd->unsafe.enabled = false;
+ spinlock_unlock(&cd->unsafe.spinlock);
+}
+
+bool plugin_is_enabled(struct plugind *cd) {
+ spinlock_lock(&cd->unsafe.spinlock);
+ bool ret = cd->unsafe.enabled;
+ spinlock_unlock(&cd->unsafe.spinlock);
+ return ret;
+}
+
+static inline void plugin_set_running(struct plugind *cd) {
+ spinlock_lock(&cd->unsafe.spinlock);
+ cd->unsafe.running = true;
+ spinlock_unlock(&cd->unsafe.spinlock);
+}
+
+static inline bool plugin_is_running(struct plugind *cd) {
+ spinlock_lock(&cd->unsafe.spinlock);
+ bool ret = cd->unsafe.running;
+ spinlock_unlock(&cd->unsafe.spinlock);
+ return ret;
+}
+
+static void pluginsd_worker_thread_cleanup(void *arg) {
+ struct plugind *cd = (struct plugind *)arg;
+
+ worker_unregister();
+
+ spinlock_lock(&cd->unsafe.spinlock);
+
+ cd->unsafe.running = false;
+ cd->unsafe.thread = 0;
+
+ pid_t pid = cd->unsafe.pid;
+ cd->unsafe.pid = 0;
+
+ spinlock_unlock(&cd->unsafe.spinlock);
+
+ if (pid) {
+ siginfo_t info;
+ netdata_log_info("PLUGINSD: 'host:%s', killing data collection child process with pid %d",
+ rrdhost_hostname(cd->host), pid);
+
+ if (killpid(pid) != -1) {
+ netdata_log_info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...",
+ rrdhost_hostname(cd->host), pid);
+
+ netdata_waitid(P_PID, (id_t)pid, &info, WEXITED);
+ }
+ }
+}
+
+#define SERIAL_FAILURES_THRESHOLD 10
+static void pluginsd_worker_thread_handle_success(struct plugind *cd) {
+ if (likely(cd->successful_collections)) {
+ sleep((unsigned int)cd->update_every);
+ return;
+ }
+
+ if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) {
+ netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+
+ sleep((unsigned int)(cd->update_every * 10));
+ return;
+ }
+
+ if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
+ netdata_log_error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, "
+ "although it reports success (exits with 0)."
+ "We have tried to collect something %zu times - unsuccessfully. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
+ plugin_set_disabled(cd);
+ return;
+ }
+}
+
+static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) {
+ if (worker_ret_code == -1) {
+ netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
+ plugin_set_disabled(cd);
+ return;
+ }
+
+ if (!cd->successful_collections) {
+ netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code);
+ plugin_set_disabled(cd);
+ return;
+ }
+
+ if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) {
+ netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
+ sleep((unsigned int)(cd->update_every * 10));
+ return;
+ }
+
+ if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
+ netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
+ "We tried to restart it %zu times, but it failed to generate data. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code,
+ cd->successful_collections, cd->serial_failures);
+ plugin_set_disabled(cd);
+ return;
+ }
+}
+
+#undef SERIAL_FAILURES_THRESHOLD
+
+static void *pluginsd_worker_thread(void *arg) {
+ worker_register("PLUGINSD");
+
+ netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg)
+ {
+ struct plugind *cd = (struct plugind *) arg;
+ plugin_set_running(cd);
+
+ size_t count = 0;
+
+ while(service_running(SERVICE_COLLECTORS)) {
+ FILE *fp_child_input = NULL;
+ FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
+
+ if(unlikely(!fp_child_input || !fp_child_output)) {
+ netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").",
+ rrdhost_hostname(cd->host), cd->cmd);
+ break;
+ }
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
+ rrdhost_hostname(cd->host),
+ cd->fullfilename, cd->unsafe.pid);
+
+ const char *plugin = strrchr(cd->fullfilename, '/');
+ if(plugin)
+ plugin++;
+ else
+ plugin = cd->fullfilename;
+
+ char module[100];
+ snprintfz(module, sizeof(module), "plugins.d[%s]", plugin);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_MODULE, module),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rrdhost_hostname(cd->host)),
+ ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "pluginsd"),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
+
+ killpid(cd->unsafe.pid);
+
+ int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
+
+ if(likely(worker_ret_code == 0))
+ pluginsd_worker_thread_handle_success(cd);
+ else
+ pluginsd_worker_thread_handle_error(cd, worker_ret_code);
+
+ cd->unsafe.pid = 0;
+
+ if(unlikely(!plugin_is_enabled(cd)))
+ break;
+ }
+ }
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+static void pluginsd_main_cleanup(void *data) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+ netdata_log_info("PLUGINSD: cleaning up...");
+
+ struct plugind *cd;
+ for (cd = pluginsd_root; cd; cd = cd->next) {
+ spinlock_lock(&cd->unsafe.spinlock);
+ if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) {
+ netdata_log_info("PLUGINSD: 'host:%s', stopping plugin thread: %s",
+ rrdhost_hostname(cd->host), cd->id);
+
+ netdata_thread_cancel(cd->unsafe.thread);
+ }
+ spinlock_unlock(&cd->unsafe.spinlock);
+ }
+
+ netdata_log_info("PLUGINSD: cleanup completed.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+
+ worker_unregister();
+}
+
+void *pluginsd_main(void *ptr)
+{
+ netdata_thread_cleanup_push(pluginsd_main_cleanup, ptr);
+
+ int automatic_run = config_get_boolean(CONFIG_SECTION_PLUGINS, "enable running new plugins", 1);
+ int scan_frequency = (int)config_get_number(CONFIG_SECTION_PLUGINS, "check for new plugins every", 60);
+ if (scan_frequency < 1)
+ scan_frequency = 1;
+
+ // disable some plugins by default
+ config_get_boolean(CONFIG_SECTION_PLUGINS, "slabinfo", CONFIG_BOOLEAN_NO);
+ config_get_boolean(CONFIG_SECTION_PLUGINS, "logs-management",
+#if defined(LOGS_MANAGEMENT_DEV_MODE)
+ CONFIG_BOOLEAN_YES
+#else
+ CONFIG_BOOLEAN_NO
+#endif
+ );
+ // it crashes (both threads) on Alpine after we made it multi-threaded
+ // works with "--device /dev/ipmi0", but this is not default
+ // see https://github.com/netdata/netdata/pull/15564 for details
+ if (getenv("NETDATA_LISTENER_PORT"))
+ config_get_boolean(CONFIG_SECTION_PLUGINS, "freeipmi", CONFIG_BOOLEAN_NO);
+
+ // store the errno for each plugins directory
+ // so that we don't log broken directories on each loop
+ int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 };
+
+ while (service_running(SERVICE_COLLECTORS)) {
+ int idx;
+ const char *directory_name;
+
+ for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) {
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
+ break;
+
+ errno = 0;
+ DIR *dir = opendir(directory_name);
+ if (unlikely(!dir)) {
+ if (directory_errors[idx] != errno) {
+ directory_errors[idx] = errno;
+ netdata_log_error("cannot open plugins directory '%s'", directory_name);
+ }
+ continue;
+ }
+
+ struct dirent *file = NULL;
+ while (likely((file = readdir(dir)))) {
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
+ break;
+
+ netdata_log_debug(D_PLUGINSD, "examining file '%s'", file->d_name);
+
+ if (unlikely(strcmp(file->d_name, ".") == 0 || strcmp(file->d_name, "..") == 0))
+ continue;
+
+ int len = (int)strlen(file->d_name);
+ if (unlikely(len <= (int)PLUGINSD_FILE_SUFFIX_LEN))
+ continue;
+ if (unlikely(strcmp(PLUGINSD_FILE_SUFFIX, &file->d_name[len - (int)PLUGINSD_FILE_SUFFIX_LEN]) != 0)) {
+ netdata_log_debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX);
+ continue;
+ }
+
+ char pluginname[CONFIG_MAX_NAME + 1];
+ snprintfz(pluginname, CONFIG_MAX_NAME, "%.*s", (int)(len - PLUGINSD_FILE_SUFFIX_LEN), file->d_name);
+ int enabled = config_get_boolean(CONFIG_SECTION_PLUGINS, pluginname, automatic_run);
+
+ if (unlikely(!enabled)) {
+ netdata_log_debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name);
+ continue;
+ }
+
+ // check if it runs already
+ struct plugind *cd;
+ for (cd = pluginsd_root; cd; cd = cd->next)
+ if (unlikely(strcmp(cd->filename, file->d_name) == 0))
+ break;
+
+ if (likely(cd && plugin_is_running(cd))) {
+ netdata_log_debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename);
+ continue;
+ }
+
+ // it is not running
+ // allocate a new one, or use the obsolete one
+ if (unlikely(!cd)) {
+ cd = callocz(sizeof(struct plugind), 1);
+
+ snprintfz(cd->id, CONFIG_MAX_NAME, "plugin:%s", pluginname);
+
+ strncpyz(cd->filename, file->d_name, FILENAME_MAX);
+ snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename);
+
+ cd->host = localhost;
+ cd->unsafe.enabled = enabled;
+ cd->unsafe.running = false;
+
+ cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every);
+ cd->started_t = now_realtime_sec();
+
+ char *def = "";
+ snprintfz(
+ cd->cmd, PLUGINSD_CMD_MAX, "exec %s %d %s", cd->fullfilename, cd->update_every,
+ config_get(cd->id, "command options", def));
+
+ // link it
+ DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(pluginsd_root, cd, prev, next);
+
+ if (plugin_is_enabled(cd)) {
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PD[%s]", pluginname);
+
+ // spawn a new thread for it
+ netdata_thread_create(&cd->unsafe.thread,
+ tag,
+ NETDATA_THREAD_OPTION_DEFAULT,
+ pluginsd_worker_thread,
+ cd);
+ }
+ }
+ }
+
+ closedir(dir);
+ }
+
+ sleep((unsigned int)scan_frequency);
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
diff --git a/src/collectors/plugins.d/plugins_d.h b/src/collectors/plugins.d/plugins_d.h
new file mode 100644
index 000000000..f831efa3a
--- /dev/null
+++ b/src/collectors/plugins.d/plugins_d.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINS_D_H
+#define NETDATA_PLUGINS_D_H 1
+
+#include "daemon/common.h"
+
+#define PLUGINSD_FILE_SUFFIX ".plugin"
+#define PLUGINSD_FILE_SUFFIX_LEN strlen(PLUGINSD_FILE_SUFFIX)
+#define PLUGINSD_CMD_MAX (FILENAME_MAX*2)
+#define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0
+
+#define PLUGINSD_MAX_DIRECTORIES 20
+extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES];
+
+struct plugind {
+ char id[CONFIG_MAX_NAME+1]; // config node id
+
+ char filename[FILENAME_MAX+1]; // just the filename
+ char fullfilename[FILENAME_MAX+1]; // with path
+ char cmd[PLUGINSD_CMD_MAX+1]; // the command that it executes
+
+ size_t successful_collections; // the number of times we have seen
+ // values collected from this plugin
+
+ size_t serial_failures; // the number of times the plugin started
+ // without collecting values
+
+ RRDHOST *host; // the host the plugin collects data for
+ int update_every; // the plugin default data collection frequency
+
+ struct {
+ SPINLOCK spinlock;
+ bool running; // do not touch this structure after setting this to 1
+ bool enabled; // if this is enabled or not
+ netdata_thread_t thread;
+ pid_t pid;
+ } unsafe;
+
+ time_t started_t;
+
+ struct plugind *prev;
+ struct plugind *next;
+};
+
+extern struct plugind *pluginsd_root;
+
+size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations);
+void pluginsd_process_thread_cleanup(void *ptr);
+
+size_t pluginsd_initialize_plugin_directories();
+
+#endif /* NETDATA_PLUGINS_D_H */
diff --git a/src/collectors/plugins.d/pluginsd_dyncfg.c b/src/collectors/plugins.d/pluginsd_dyncfg.c
new file mode 100644
index 000000000..c4dd42a73
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_dyncfg.c
@@ -0,0 +1,69 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pluginsd_dyncfg.h"
+
+
+// ----------------------------------------------------------------------------
+
+PARSER_RC pluginsd_config(char **words, size_t num_words, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CONFIG);
+ if(!host) return PARSER_RC_ERROR;
+
+ size_t i = 1;
+ char *id = get_word(words, num_words, i++);
+ char *action = get_word(words, num_words, i++);
+
+ if(strcmp(action, PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE) == 0) {
+ char *status_str = get_word(words, num_words, i++);
+ char *type_str = get_word(words, num_words, i++);
+ char *path = get_word(words, num_words, i++);
+ char *source_type_str = get_word(words, num_words, i++);
+ char *source = get_word(words, num_words, i++);
+ char *supported_cmds_str = get_word(words, num_words, i++);
+ char *view_permissions_str = get_word(words, num_words, i++);
+ char *edit_permissions_str = get_word(words, num_words, i++);
+
+ DYNCFG_STATUS status = dyncfg_status2id(status_str);
+ DYNCFG_TYPE type = dyncfg_type2id(type_str);
+ DYNCFG_SOURCE_TYPE source_type = dyncfg_source_type2id(source_type_str);
+ DYNCFG_CMDS cmds = dyncfg_cmds2id(supported_cmds_str);
+ HTTP_ACCESS view_access = http_access_from_hex(view_permissions_str);
+ HTTP_ACCESS edit_access = http_access_from_hex(edit_permissions_str);
+
+ if(!dyncfg_add_low_level(
+ host,
+ id,
+ path,
+ status,
+ type,
+ source_type,
+ source,
+ cmds,
+ 0,
+ 0,
+ false,
+ view_access,
+ edit_access,
+ pluginsd_function_execute_cb,
+ parser))
+ return PARSER_RC_ERROR;
+ }
+ else if(strcmp(action, PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE) == 0) {
+ dyncfg_del_low_level(host, id);
+ }
+ else if(strcmp(action, PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS) == 0) {
+ char *status_str = get_word(words, num_words, i++);
+ dyncfg_status_low_level(host, id, dyncfg_status2id(status_str));
+ }
+ else
+ nd_log(NDLS_COLLECTORS, NDLP_WARNING, "DYNCFG: unknown action '%s' received from plugin", action);
+
+ parser->user.data_collections_count++;
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+
+PARSER_RC pluginsd_dyncfg_noop(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ return PARSER_RC_OK;
+}
diff --git a/src/collectors/plugins.d/pluginsd_dyncfg.h b/src/collectors/plugins.d/pluginsd_dyncfg.h
new file mode 100644
index 000000000..fd35a3c36
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_dyncfg.h
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINSD_DYNCFG_H
+#define NETDATA_PLUGINSD_DYNCFG_H
+
+#include "pluginsd_internals.h"
+
+PARSER_RC pluginsd_config(char **words, size_t num_words, PARSER *parser);
+PARSER_RC pluginsd_dyncfg_noop(char **words, size_t num_words, PARSER *parser);
+
+#endif //NETDATA_PLUGINSD_DYNCFG_H
diff --git a/src/collectors/plugins.d/pluginsd_functions.c b/src/collectors/plugins.d/pluginsd_functions.c
new file mode 100644
index 000000000..4ea6d4812
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_functions.c
@@ -0,0 +1,412 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pluginsd_functions.h"
+
+#define LOG_FUNCTIONS false
+
+// ----------------------------------------------------------------------------
+// execution of functions
+
+static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
+ struct inflight_function *pf = func;
+
+ PARSER *parser = parser_ptr;
+
+ // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
+ pf->code = HTTP_RESP_SERVICE_UNAVAILABLE;
+
+ const char *transaction = dictionary_acquired_item_name(item);
+
+ int rc = uuid_parse_flexi(transaction, pf->transaction);
+ if(rc != 0)
+ netdata_log_error("FUNCTION: '%s': cannot parse transaction UUID", string2str(pf->function));
+
+ CLEAN_BUFFER *buffer = buffer_create(1024, NULL);
+ if(pf->payload && buffer_strlen(pf->payload)) {
+ buffer_sprintf(
+ buffer,
+ PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN " %s %d \"%s\" \""HTTP_ACCESS_FORMAT"\" \"%s\" \"%s\"\n",
+ transaction,
+ pf->timeout_s,
+ string2str(pf->function),
+ (HTTP_ACCESS_FORMAT_CAST)pf->access,
+ pf->source ? pf->source : "",
+ content_type_id2string(pf->payload->content_type)
+ );
+
+ buffer_fast_strcat(buffer, buffer_tostring(pf->payload), buffer_strlen(pf->payload));
+ buffer_strcat(buffer, "\nFUNCTION_PAYLOAD_END\n");
+ }
+ else {
+ buffer_sprintf(
+ buffer,
+ PLUGINSD_CALL_FUNCTION " %s %d \"%s\" \""HTTP_ACCESS_FORMAT"\" \"%s\"\n",
+ transaction,
+ pf->timeout_s,
+ string2str(pf->function),
+ (HTTP_ACCESS_FORMAT_CAST)pf->access,
+ pf->source ? pf->source : ""
+ );
+ }
+
+ // send the command to the plugin
+ // IMPORTANT: make sure all commands are sent in 1 call, because in streaming they may interfere with others
+ ssize_t ret = send_to_plugin(buffer_tostring(buffer), parser);
+ pf->sent_monotonic_ut = now_monotonic_usec();
+
+ if(ret < 0) {
+ pf->sent_successfully = false;
+
+ pf->code = HTTP_RESP_SERVICE_UNAVAILABLE;
+ netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret);
+ rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", pf->code);
+ }
+ else {
+ pf->sent_successfully = true;
+
+ internal_error(LOG_FUNCTIONS,
+ "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item), ret,
+ pf->sent_monotonic_ut - pf->started_monotonic_ut);
+ }
+}
+
+static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
+ struct inflight_function *pf = new_func;
+
+ netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
+ pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
+ pf->result.cb(pf->result_body_wb, pf->code, pf->result.data);
+ string_freez(pf->function);
+
+ return false;
+}
+
+static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
+ struct inflight_function *pf = func;
+ struct parser *parser = (struct parser *)parser_ptr; (void)parser;
+
+ internal_error(LOG_FUNCTIONS,
+ "FUNCTION '%s' result of transaction '%s' received from collector "
+ "(%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item),
+ buffer_strlen(pf->result_body_wb),
+ pf->sent_monotonic_ut - pf->started_monotonic_ut, now_realtime_usec() - pf->sent_monotonic_ut);
+
+ if(pf->code == HTTP_RESP_SERVICE_UNAVAILABLE && !buffer_strlen(pf->result_body_wb))
+ rrd_call_function_error(pf->result_body_wb, "The plugin exited while servicing this call.", pf->code);
+
+ pf->result.cb(pf->result_body_wb, pf->code, pf->result.data);
+
+ string_freez(pf->function);
+ buffer_free((void *)pf->payload);
+ freez((void *)pf->source);
+}
+
+void pluginsd_inflight_functions_init(PARSER *parser) {
+ parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
+ dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
+ dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
+ dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
+}
+
+void pluginsd_inflight_functions_cleanup(PARSER *parser) {
+ dictionary_destroy(parser->inflight.functions);
+}
+
+// ----------------------------------------------------------------------------
+
+void pluginsd_inflight_functions_garbage_collect(PARSER *parser, usec_t now_ut) {
+ parser->inflight.smaller_monotonic_timeout_ut = 0;
+ struct inflight_function *pf;
+ dfe_start_write(parser->inflight.functions, pf) {
+ if (*pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < now_ut) {
+ internal_error(true,
+ "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.",
+ string2str(pf->function), pf_dfe.name, now_ut - pf->started_monotonic_ut);
+
+ if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK)
+ pf->code = rrd_call_function_error(pf->result_body_wb,
+ "Timeout waiting for collector response.",
+ HTTP_RESP_GATEWAY_TIMEOUT);
+
+ dictionary_del(parser->inflight.functions, pf_dfe.name);
+ }
+
+ else if(!parser->inflight.smaller_monotonic_timeout_ut || *pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < parser->inflight.smaller_monotonic_timeout_ut)
+ parser->inflight.smaller_monotonic_timeout_ut = *pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
+ }
+ dfe_done(pf);
+}
+
+// ----------------------------------------------------------------------------
+
+static void pluginsd_function_cancel(void *data) {
+ struct inflight_function *look_for = data, *t;
+
+ bool sent = false;
+ dfe_start_read(look_for->parser->inflight.functions, t) {
+ if(look_for == t) {
+ const char *transaction = t_dfe.name;
+
+ internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
+
+ char buffer[2048];
+ snprintfz(buffer, sizeof(buffer), PLUGINSD_CALL_FUNCTION_CANCEL " %s\n", transaction);
+
+ // send the command to the plugin
+ ssize_t ret = send_to_plugin(buffer, t->parser);
+ if(ret < 0)
+ sent = true;
+
+ break;
+ }
+ }
+ dfe_done(t);
+
+ if(sent <= 0)
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
+}
+
+static void pluginsd_function_progress_to_plugin(void *data) {
+ struct inflight_function *look_for = data, *t;
+
+ bool sent = false;
+ dfe_start_read(look_for->parser->inflight.functions, t) {
+ if(look_for == t) {
+ const char *transaction = t_dfe.name;
+
+ internal_error(true, "PLUGINSD: sending function progress to plugin for transaction '%s'", transaction);
+
+ char buffer[2048];
+ snprintfz(buffer, sizeof(buffer), PLUGINSD_CALL_FUNCTION_PROGRESS " %s\n", transaction);
+
+ // send the command to the plugin
+ ssize_t ret = send_to_plugin(buffer, t->parser);
+ if(ret < 0)
+ sent = true;
+
+ break;
+ }
+ }
+ dfe_done(t);
+
+ if(sent <= 0)
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: FUNCTION_PROGRESS request didn't match any pending function requests in pluginsd.d.");
+}
+
+// this is the function called from
+// rrd_call_function_and_wait() and rrd_call_function_async()
+int pluginsd_function_execute_cb(struct rrd_function_execute *rfe, void *data) {
+
+ // IMPORTANT: this function MUST call the result_cb even on failures
+
+ PARSER *parser = data;
+
+ usec_t now_ut = now_monotonic_usec();
+
+ int timeout_s = (int)((*rfe->stop_monotonic_ut - now_ut + USEC_PER_SEC / 2) / USEC_PER_SEC);
+
+ struct inflight_function tmp = {
+ .started_monotonic_ut = now_ut,
+ .stop_monotonic_ut = rfe->stop_monotonic_ut,
+ .result_body_wb = rfe->result.wb,
+ .timeout_s = timeout_s,
+ .function = string_strdupz(rfe->function),
+ .payload = buffer_dup(rfe->payload),
+ .access = rfe->user_access,
+ .source = rfe->source ? strdupz(rfe->source) : NULL,
+ .parser = parser,
+
+ .result = {
+ .cb = rfe->result.cb,
+ .data = rfe->result.data,
+ },
+ .progress = {
+ .cb = rfe->progress.cb,
+ .data = rfe->progress.data,
+ },
+ };
+ uuid_copy(tmp.transaction, *rfe->transaction);
+
+ char transaction_str[UUID_COMPACT_STR_LEN];
+ uuid_unparse_lower_compact(tmp.transaction, transaction_str);
+
+ dictionary_write_lock(parser->inflight.functions);
+
+ // if there is any error, our dictionary callbacks will call the caller callback to notify
+ // the caller about the error - no need for error handling here.
+ struct inflight_function *t = dictionary_set(parser->inflight.functions, transaction_str, &tmp, sizeof(struct inflight_function));
+ if(!t->sent_successfully) {
+ int code = t->code;
+ dictionary_write_unlock(parser->inflight.functions);
+ dictionary_del(parser->inflight.functions, transaction_str);
+ pluginsd_inflight_functions_garbage_collect(parser, now_ut);
+ return code;
+ }
+ else {
+ if (rfe->register_canceller.cb)
+ rfe->register_canceller.cb(rfe->register_canceller.data, pluginsd_function_cancel, t);
+
+ if (rfe->register_progresser.cb &&
+ (parser->repertoire == PARSER_INIT_PLUGINSD || (parser->repertoire == PARSER_INIT_STREAMING &&
+ stream_has_capability(&parser->user, STREAM_CAP_PROGRESS))))
+ rfe->register_progresser.cb(rfe->register_progresser.data, pluginsd_function_progress_to_plugin, t);
+
+ if (!parser->inflight.smaller_monotonic_timeout_ut ||
+ *tmp.stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < parser->inflight.smaller_monotonic_timeout_ut)
+ parser->inflight.smaller_monotonic_timeout_ut = *tmp.stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
+
+ // garbage collect stale inflight functions
+ if (parser->inflight.smaller_monotonic_timeout_ut < now_ut)
+ pluginsd_inflight_functions_garbage_collect(parser, now_ut);
+
+ dictionary_write_unlock(parser->inflight.functions);
+
+ return HTTP_RESP_OK;
+ }
+}
+
+PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
+ // a plugin or a child is registering a function
+
+ bool global = false;
+ size_t i = 1;
+ if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
+ i++;
+ global = true;
+ }
+
+ char *name = get_word(words, num_words, i++);
+ char *timeout_str = get_word(words, num_words, i++);
+ char *help = get_word(words, num_words, i++);
+ char *tags = get_word(words, num_words, i++);
+ char *access_str = get_word(words, num_words, i++);
+ char *priority_str = get_word(words, num_words, i++);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_FUNCTION);
+ if(!host) return PARSER_RC_ERROR;
+
+ RRDSET *st = (global)? NULL: pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
+ if(!st) global = true;
+
+ if (unlikely(!timeout_str || !name || !help || (!global && !st))) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
+ rrdhost_hostname(host),
+ st?rrdset_id(st):"(unset)",
+ global?"yes":"no",
+ name?name:"(unset)",
+ timeout_str ? timeout_str : "(unset)",
+ help?help:"(unset)"
+ );
+ return PARSER_RC_ERROR;
+ }
+
+ int timeout_s = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+ if (timeout_str && *timeout_str) {
+ timeout_s = str2i(timeout_str);
+ if (unlikely(timeout_s <= 0))
+ timeout_s = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+ }
+
+ int priority = RRDFUNCTIONS_PRIORITY_DEFAULT;
+ if(priority_str && *priority_str) {
+ priority = str2i(priority_str);
+ if(priority <= 0)
+ priority = RRDFUNCTIONS_PRIORITY_DEFAULT;
+ }
+
+ rrd_function_add(host, st, name, timeout_s, priority, help, tags,
+ http_access_from_hex_mapping_old_roles(access_str), false,
+ pluginsd_function_execute_cb, parser);
+
+ parser->user.data_collections_count++;
+
+ return PARSER_RC_OK;
+}
+
+static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
+ STRING *key = action_data;
+ if(key)
+ dictionary_del(parser->inflight.functions, string2str(key));
+ string_freez(key);
+
+ parser->user.data_collections_count++;
+}
+
+static inline struct inflight_function *inflight_function_find(PARSER *parser, const char *transaction) {
+ struct inflight_function *pf = NULL;
+
+ if(transaction && *transaction)
+ pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, transaction);
+
+ if(!pf)
+ netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", transaction ? transaction : "(unset)");
+
+ return pf;
+}
+
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
+ char *transaction = get_word(words, num_words, 1);
+ char *status = get_word(words, num_words, 2);
+ char *format = get_word(words, num_words, 3);
+ char *expires = get_word(words, num_words, 4);
+
+ if (unlikely(!transaction || !*transaction || !status || !*status || !format || !*format || !expires || !*expires)) {
+ netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
+ , transaction ? transaction : "(unset)"
+ , status ? status : "(unset)"
+ , format ? format : "(unset)"
+ , expires ? expires : "(unset)"
+ );
+ }
+
+ int code = (status && *status) ? str2i(status) : 0;
+ if (code <= 0)
+ code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
+
+ time_t expiration = (expires && *expires) ? str2l(expires) : 0;
+
+ struct inflight_function *pf = inflight_function_find(parser, transaction);
+ if(pf) {
+ if(format && *format)
+ pf->result_body_wb->content_type = content_type_string2id(format);
+
+ pf->code = code;
+
+ pf->result_body_wb->expires = expiration;
+ if(expiration <= now_realtime_sec())
+ buffer_no_cacheable(pf->result_body_wb);
+ else
+ buffer_cacheable(pf->result_body_wb);
+ }
+
+ parser->defer.response = (pf) ? pf->result_body_wb : NULL;
+ parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
+ parser->defer.action = pluginsd_function_result_end;
+ parser->defer.action_data = string_strdupz(transaction); // it is ok is key is NULL
+ parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_function_progress(char **words, size_t num_words, PARSER *parser) {
+ size_t i = 1;
+
+ char *transaction = get_word(words, num_words, i++);
+ char *done_str = get_word(words, num_words, i++);
+ char *all_str = get_word(words, num_words, i++);
+
+ struct inflight_function *pf = inflight_function_find(parser, transaction);
+ if(pf) {
+ size_t done = done_str && *done_str ? str2u(done_str) : 0;
+ size_t all = all_str && *all_str ? str2u(all_str) : 0;
+
+ if(pf->progress.cb)
+ pf->progress.cb(pf->progress.data, done, all);
+ }
+
+ return PARSER_RC_OK;
+}
diff --git a/src/collectors/plugins.d/pluginsd_functions.h b/src/collectors/plugins.d/pluginsd_functions.h
new file mode 100644
index 000000000..cd49512e4
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_functions.h
@@ -0,0 +1,48 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINSD_FUNCTIONS_H
+#define NETDATA_PLUGINSD_FUNCTIONS_H
+
+#include "pluginsd_internals.h"
+
+struct inflight_function {
+ uuid_t transaction;
+
+ int code;
+ int timeout_s;
+ STRING *function;
+ BUFFER *payload;
+ HTTP_ACCESS access;
+ const char *source;
+
+ BUFFER *result_body_wb;
+
+ usec_t *stop_monotonic_ut; // pointer to caller data
+ usec_t started_monotonic_ut;
+ usec_t sent_monotonic_ut;
+ PARSER *parser;
+
+ bool sent_successfully;
+
+ struct {
+ rrd_function_result_callback_t cb;
+ void *data;
+ } result;
+
+ struct {
+ rrd_function_progress_cb_t cb;
+ void *data;
+ } progress;
+};
+
+PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser);
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser);
+PARSER_RC pluginsd_function_progress(char **words, size_t num_words, PARSER *parser);
+
+void pluginsd_inflight_functions_init(PARSER *parser);
+void pluginsd_inflight_functions_cleanup(PARSER *parser);
+void pluginsd_inflight_functions_garbage_collect(PARSER *parser, usec_t now_ut);
+
+int pluginsd_function_execute_cb(struct rrd_function_execute *rfe, void *data);
+
+#endif //NETDATA_PLUGINSD_FUNCTIONS_H
diff --git a/src/collectors/plugins.d/pluginsd_internals.c b/src/collectors/plugins.d/pluginsd_internals.c
new file mode 100644
index 000000000..d03daf745
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_internals.c
@@ -0,0 +1,120 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pluginsd_internals.h"
+
+ssize_t send_to_plugin(const char *txt, void *data) {
+ PARSER *parser = data;
+
+ if(!txt || !*txt)
+ return 0;
+
+#ifdef ENABLE_H2O
+ if(parser->h2o_ctx)
+ return h2o_stream_write(parser->h2o_ctx, txt, strlen(txt));
+#endif
+
+ errno = 0;
+ spinlock_lock(&parser->writer.spinlock);
+ ssize_t bytes = -1;
+
+#ifdef ENABLE_HTTPS
+ NETDATA_SSL *ssl = parser->ssl_output;
+ if(ssl) {
+
+ if(SSL_connection(ssl))
+ bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
+
+ else
+ netdata_log_error("PLUGINSD: cannot send command (SSL)");
+
+ spinlock_unlock(&parser->writer.spinlock);
+ return bytes;
+ }
+#endif
+
+ if(parser->fp_output) {
+
+ bytes = fprintf(parser->fp_output, "%s", txt);
+ if(bytes <= 0) {
+ netdata_log_error("PLUGINSD: cannot send command (FILE)");
+ bytes = -2;
+ }
+ else
+ fflush(parser->fp_output);
+
+ spinlock_unlock(&parser->writer.spinlock);
+ return bytes;
+ }
+
+ if(parser->fd != -1) {
+ bytes = 0;
+ ssize_t total = (ssize_t)strlen(txt);
+ ssize_t sent;
+
+ do {
+ sent = write(parser->fd, &txt[bytes], total - bytes);
+ if(sent <= 0) {
+ netdata_log_error("PLUGINSD: cannot send command (fd)");
+ spinlock_unlock(&parser->writer.spinlock);
+ return -3;
+ }
+ bytes += sent;
+ }
+ while(bytes < total);
+
+ spinlock_unlock(&parser->writer.spinlock);
+ return (int)bytes;
+ }
+
+ spinlock_unlock(&parser->writer.spinlock);
+ netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
+ return -4;
+}
+
+PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
+ parser->user.enabled = 0;
+
+ if(keyword && msg) {
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO,
+ "PLUGINSD: keyword %s: %s", keyword, msg);
+ }
+
+ return PARSER_RC_ERROR;
+}
+
+void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) {
+ parser_init_repertoire(parser, repertoire);
+
+ if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING))
+ pluginsd_inflight_functions_init(parser);
+}
+
+void parser_destroy(PARSER *parser) {
+ if (unlikely(!parser))
+ return;
+
+ pluginsd_inflight_functions_cleanup(parser);
+
+ freez(parser);
+}
+
+
+PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd,
+ PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) {
+ PARSER *parser;
+
+ parser = callocz(1, sizeof(*parser));
+ if(user)
+ parser->user = *user;
+ parser->fd = fd;
+ parser->fp_input = fp_input;
+ parser->fp_output = fp_output;
+#ifdef ENABLE_HTTPS
+ parser->ssl_output = ssl;
+#endif
+ parser->flags = flags;
+
+ spinlock_init(&parser->writer.spinlock);
+ return parser;
+}
diff --git a/src/collectors/plugins.d/pluginsd_internals.h b/src/collectors/plugins.d/pluginsd_internals.h
new file mode 100644
index 000000000..31db02544
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_internals.h
@@ -0,0 +1,355 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINSD_INTERNALS_H
+#define NETDATA_PLUGINSD_INTERNALS_H
+
+#include "pluginsd_parser.h"
+#include "pluginsd_functions.h"
+#include "pluginsd_dyncfg.h"
+#include "pluginsd_replication.h"
+
+#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING)
+#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD)
+
+PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg);
+
+ssize_t send_to_plugin(const char *txt, void *data);
+
+static inline RRDHOST *pluginsd_require_scope_host(PARSER *parser, const char *cmd) {
+ RRDHOST *host = parser->user.host;
+
+ if(unlikely(!host))
+ netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
+
+ return host;
+}
+
+static inline RRDSET *pluginsd_require_scope_chart(PARSER *parser, const char *cmd, const char *parent_cmd) {
+ RRDSET *st = parser->user.st;
+
+ if(unlikely(!st))
+ netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
+
+ return st;
+}
+
+static inline RRDSET *pluginsd_get_scope_chart(PARSER *parser) {
+ return parser->user.st;
+}
+
+static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && !parser->user.v2.locked_data_collection) {
+ spinlock_lock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = true;
+ }
+}
+
+static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && parser->user.v2.locked_data_collection) {
+ spinlock_unlock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = false;
+ return true;
+ }
+
+ return false;
+}
+
+static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
+ if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
+ if(stale)
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
+ }
+
+ if(unlikely(parser->user.v2.ml_locked)) {
+ ml_chart_update_end(parser->user.st);
+ parser->user.v2.ml_locked = false;
+
+ if(stale)
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
+ }
+}
+
+static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
+ pluginsd_unlock_previous_scope_chart(parser, keyword, true);
+
+ if(parser->user.cleanup_slots && parser->user.st)
+ rrdset_pluginsd_receive_unslot(parser->user.st);
+
+ parser->user.st = NULL;
+ parser->user.cleanup_slots = false;
+}
+
+static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
+ RRDSET *old_st = parser->user.st;
+ pid_t old_collector_tid = (old_st) ? old_st->pluginsd.collector_tid : 0;
+ pid_t my_collector_tid = gettid();
+
+ if(unlikely(old_collector_tid)) {
+ if(old_collector_tid != my_collector_tid) {
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
+ keyword ? keyword : "UNKNOWN",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ my_collector_tid, old_collector_tid);
+
+ return false;
+ }
+
+ old_st->pluginsd.collector_tid = 0;
+ }
+
+ st->pluginsd.collector_tid = my_collector_tid;
+
+ pluginsd_clear_scope_chart(parser, keyword);
+
+ st->pluginsd.pos = 0;
+ parser->user.st = st;
+ parser->user.cleanup_slots = false;
+
+ return true;
+}
+
+static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) {
+ size_t wanted_size = st->pluginsd.size;
+
+ if(slot >= 1) {
+ st->pluginsd.dims_with_slots = true;
+ wanted_size = slot;
+ }
+ else {
+ st->pluginsd.dims_with_slots = false;
+ wanted_size = dictionary_entries(st->rrddim_root_index);
+ }
+
+ if(wanted_size > st->pluginsd.size) {
+ st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim));
+
+ // initialize the empty slots
+ for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) {
+ st->pluginsd.prd_array[i].rda = NULL;
+ st->pluginsd.prd_array[i].rd = NULL;
+ st->pluginsd.prd_array[i].id = NULL;
+ }
+
+ st->pluginsd.size = wanted_size;
+ }
+
+ if(st->pluginsd.dims_with_slots) {
+ struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1];
+
+ if(prd->rd != rd) {
+ prd->rda = rrddim_find_and_acquire(st, string2str(rd->id));
+ prd->rd = rrddim_acquired_to_rrddim(prd->rda);
+ prd->id = string2str(prd->rd->id);
+ }
+
+ if(obsolete)
+ parser->user.cleanup_slots = true;
+ }
+}
+
+static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) {
+ if (unlikely(!dimension || !*dimension)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
+ }
+
+ if (unlikely(!st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
+ }
+
+ struct pluginsd_rrddim *prd;
+ RRDDIM *rd;
+
+ if(likely(st->pluginsd.dims_with_slots)) {
+ // caching with slots
+
+ if(unlikely(slot < 1 || slot > st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.",
+ rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size);
+ return NULL;
+ }
+
+ prd = &st->pluginsd.prd_array[slot - 1];
+
+ rd = prd->rd;
+ if(likely(rd)) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(strcmp(prd->id, dimension) != 0) {
+ ssize_t t;
+ for(t = 0; t < st->pluginsd.size ;t++) {
+ if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0)
+ break;
+ }
+ if(t >= st->pluginsd.size)
+ t = -1;
+
+ internal_fatal(true,
+ "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', "
+ "the right slot is %zd",
+ dimension, slot, prd->id, t);
+ }
+#endif
+ return rd;
+ }
+ }
+ else {
+ // caching without slots
+
+ if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
+ st->pluginsd.pos = 0;
+
+ prd = &st->pluginsd.prd_array[st->pluginsd.pos++];
+
+ rd = prd->rd;
+ if(likely(rd)) {
+ const char *id = prd->id;
+
+ if(strcmp(id, dimension) == 0) {
+ // we found it cached
+ return rd;
+ }
+ else {
+ // the cached one is not good for us
+ rrddim_acquired_release(prd->rda);
+ prd->rda = NULL;
+ prd->rd = NULL;
+ prd->id = NULL;
+ }
+ }
+ }
+
+ // we need to find the dimension and set it to prd
+
+ RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+ if (unlikely(!rda)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
+
+ return NULL;
+ }
+
+ prd->rda = rda;
+ prd->rd = rd = rrddim_acquired_to_rrddim(rda);
+ prd->id = string2str(rd->id);
+
+ return rd;
+}
+
+static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
+ if (unlikely(!chart || !*chart)) {
+ netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
+ rrdhost_hostname(host), cmd);
+ return NULL;
+ }
+
+ RRDSET *st = rrdset_find(host, chart);
+ if (unlikely(!st))
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
+ rrdhost_hostname(host), chart, cmd);
+
+ return st;
+}
+
+static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) {
+ ssize_t slot = -1;
+ char *id = get_word(words, num_words, 1);
+ if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] &&
+ id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') {
+ slot = (ssize_t) str2ull_encoded(&id[5]);
+ if(slot < 0) slot = 0; // to make the caller increment its idx of the words
+ }
+
+ return slot;
+}
+
+static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) {
+ // clean possible old cached data
+ rrdset_pluginsd_receive_unslot(st);
+
+ if(unlikely(slot < 1 || slot >= INT32_MAX))
+ return;
+
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) {
+ spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size;
+ size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2;
+
+ if(new_slots < (size_t)slot)
+ new_slots = slot;
+
+ host->rrdpush.receive.pluginsd_chart_slots.array =
+ reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *));
+
+ for(size_t i = old_slots; i < new_slots ;i++)
+ host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL;
+
+ host->rrdpush.receive.pluginsd_chart_slots.size = new_slots;
+ spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ }
+
+ host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st;
+ st->pluginsd.last_slot = (int32_t)slot - 1;
+ parser->user.cleanup_slots = obsolete;
+}
+
+static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) {
+ if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size))
+ return pluginsd_find_chart(host, id, keyword);
+
+ RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1];
+
+ if(!st) {
+ st = pluginsd_find_chart(host, id, keyword);
+ if(st)
+ pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE));
+ }
+ else {
+ internal_fatal(string_strcmp(st->id, id) != 0,
+ "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'",
+ slot - 1, id, string2str(st->id));
+ }
+
+ return st;
+}
+
+static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
+ SN_FLAGS flags = SN_FLAG_NONE;
+
+ char c;
+ while ((c = *flags_str++)) {
+ switch (c) {
+ case 'A':
+ flags |= SN_FLAG_NOT_ANOMALOUS;
+ break;
+
+ case 'R':
+ flags |= SN_FLAG_RESET;
+ break;
+
+ case 'E':
+ flags = SN_EMPTY_SLOT;
+ return flags;
+
+ default:
+ internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
+ break;
+ }
+ }
+
+ return flags;
+}
+
+#endif //NETDATA_PLUGINSD_INTERNALS_H
diff --git a/src/collectors/plugins.d/pluginsd_parser.c b/src/collectors/plugins.d/pluginsd_parser.c
new file mode 100644
index 000000000..d13fe11d2
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_parser.c
@@ -0,0 +1,1401 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pluginsd_internals.h"
+
+static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *value = get_word(words, num_words, idx++);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ st->pluginsd.set = true;
+
+ if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+ netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
+ rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
+
+ if (value && *value)
+ rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *microseconds_txt = get_word(words, num_words, idx++);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ usec_t microseconds = 0;
+ if (microseconds_txt && *microseconds_txt) {
+ long long t = str2ll(microseconds_txt, NULL);
+ if(t >= 0)
+ microseconds = t;
+ }
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if(st->replay.log_next_data_collection) {
+ st->replay.log_next_data_collection = false;
+
+ internal_error(true,
+ "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
+ rrdhost_hostname(host), rrdset_id(st),
+ st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
+ st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
+ microseconds
+ );
+ }
+#endif
+
+ if (likely(st->counter_done)) {
+ if (likely(microseconds)) {
+ if (parser->user.trust_durations)
+ rrdset_next_usec_unfiltered(st, microseconds);
+ else
+ rrdset_next_usec(st, microseconds);
+ }
+ else
+ rrdset_next(st);
+ }
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
+ char *tv_sec = get_word(words, num_words, 1);
+ char *tv_usec = get_word(words, num_words, 2);
+ char *pending_rrdset_next = get_word(words, num_words, 3);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+ netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
+
+ pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
+ parser->user.data_collections_count++;
+
+ struct timeval tv = {
+ .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0,
+ .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0
+ };
+
+ if(!tv.tv_sec)
+ now_realtime_timeval(&tv);
+
+ rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false);
+
+ return PARSER_RC_OK;
+}
+
+static void pluginsd_host_define_cleanup(PARSER *parser) {
+ string_freez(parser->user.host_define.hostname);
+ rrdlabels_destroy(parser->user.host_define.rrdlabels);
+
+ parser->user.host_define.hostname = NULL;
+ parser->user.host_define.rrdlabels = NULL;
+ parser->user.host_define.parsing_host = false;
+}
+
+static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
+ if(uuid_parse(guid, *uuid))
+ return false;
+
+ uuid_unparse_lower(*uuid, output);
+
+ return true;
+}
+
+static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
+ char *guid = get_word(words, num_words, 1);
+ char *hostname = get_word(words, num_words, 2);
+
+ if(unlikely(!guid || !*guid || !hostname || !*hostname))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
+
+ if(unlikely(parser->user.host_define.parsing_host))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
+ "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
+
+ if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
+
+ parser->user.host_define.hostname = string_strdupz(hostname);
+ parser->user.host_define.rrdlabels = rrdlabels_create();
+ parser->user.host_define.parsing_host = true;
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) {
+ char *name = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
+
+ if(!name || !*name || !value)
+ return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
+
+ if(!parser->user.host_define.parsing_host || !labels)
+ return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+
+ rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG);
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
+ return pluginsd_host_dictionary(words, num_words, parser,
+ parser->user.host_define.rrdlabels,
+ PLUGINSD_KEYWORD_HOST_LABEL);
+}
+
+static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ if(!parser->user.host_define.parsing_host)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+
+ RRDHOST *host = rrdhost_find_or_create(
+ string2str(parser->user.host_define.hostname),
+ string2str(parser->user.host_define.hostname),
+ parser->user.host_define.machine_guid_str,
+ "Netdata Virtual Host 1.0",
+ netdata_configured_timezone,
+ netdata_configured_abbrev_timezone,
+ netdata_configured_utc_offset,
+ program_name,
+ program_version,
+ default_rrd_update_every,
+ default_rrd_history_entries,
+ default_rrd_memory_mode,
+ health_plugin_enabled(),
+ default_rrdpush_enabled,
+ default_rrdpush_destination,
+ default_rrdpush_api_key,
+ default_rrdpush_send_charts_matching,
+ default_rrdpush_enable_replication,
+ default_rrdpush_seconds_to_replicate,
+ default_rrdpush_replication_step,
+ rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
+ false);
+
+ rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
+ dyncfg_host_init(host);
+
+ if(host->rrdlabels) {
+ rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
+ }
+ else {
+ host->rrdlabels = parser->user.host_define.rrdlabels;
+ parser->user.host_define.rrdlabels = NULL;
+ }
+
+ pluginsd_host_define_cleanup(parser);
+
+ parser->user.host = host;
+ pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END);
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ rrdcontext_host_child_connected(host);
+ schedule_node_info_update(host);
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
+ char *guid = get_word(words, num_words, 1);
+
+ if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
+ parser->user.host = localhost;
+ return PARSER_RC_OK;
+ }
+
+ uuid_t uuid;
+ char uuid_str[UUID_STR_LEN];
+ if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
+
+ RRDHOST *host = rrdhost_find_by_guid(uuid_str);
+ if(unlikely(!host))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
+
+ parser->user.host = host;
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *type = get_word(words, num_words, idx++);
+ char *name = get_word(words, num_words, idx++);
+ char *title = get_word(words, num_words, idx++);
+ char *units = get_word(words, num_words, idx++);
+ char *family = get_word(words, num_words, idx++);
+ char *context = get_word(words, num_words, idx++);
+ char *chart = get_word(words, num_words, idx++);
+ char *priority_s = get_word(words, num_words, idx++);
+ char *update_every_s = get_word(words, num_words, idx++);
+ char *options = get_word(words, num_words, idx++);
+ char *plugin = get_word(words, num_words, idx++);
+ char *module = get_word(words, num_words, idx++);
+
+ // parse the id from type
+ char *id = NULL;
+ if (likely(type && (id = strchr(type, '.')))) {
+ *id = '\0';
+ id++;
+ }
+
+ // make sure we have the required variables
+ if (unlikely((!type || !*type || !id || !*id)))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
+
+ // parse the name, and make sure it does not include 'type.'
+ if (unlikely(name && *name)) {
+ // when data are streamed from child nodes
+ // name will be type.name
+ // so, we have to remove 'type.' from name too
+ size_t len = strlen(type);
+ if (strncmp(type, name, len) == 0 && name[len] == '.')
+ name = &name[len + 1];
+
+ // if the name is the same with the id,
+ // or is just 'NULL', clear it.
+ if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
+ name = NULL;
+ }
+
+ int priority = 1000;
+ if (likely(priority_s && *priority_s))
+ priority = str2i(priority_s);
+
+ int update_every = parser->user.cd->update_every;
+ if (likely(update_every_s && *update_every_s))
+ update_every = str2i(update_every_s);
+ if (unlikely(!update_every))
+ update_every = parser->user.cd->update_every;
+
+ RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
+ if (unlikely(chart))
+ chart_type = rrdset_type_id(chart);
+
+ if (unlikely(name && !*name))
+ name = NULL;
+ if (unlikely(family && !*family))
+ family = NULL;
+ if (unlikely(context && !*context))
+ context = NULL;
+ if (unlikely(!title))
+ title = "";
+ if (unlikely(!units))
+ units = "unknown";
+
+ netdata_log_debug(
+ D_PLUGINSD,
+ "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
+ type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
+ priority, update_every);
+
+ RRDSET *st = NULL;
+
+ st = rrdset_create(
+ host, type, id, name, family, context, title, units,
+ (plugin && *plugin) ? plugin : parser->user.cd->filename,
+ module, priority, update_every,
+ chart_type);
+
+ bool obsolete = false;
+ if (likely(st)) {
+ if (options && *options) {
+ if (strstr(options, "obsolete")) {
+ rrdset_is_obsolete___safe_from_collector_thread(st);
+ obsolete = true;
+ }
+ else
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
+
+ if (strstr(options, "detail"))
+ rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+
+ if (strstr(options, "hidden"))
+ rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
+
+ if (strstr(options, "store_first"))
+ rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
+ }
+ else {
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
+ rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+ rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
+ }
+
+ if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete);
+ }
+ else
+ pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
+ const char *first_entry_txt = get_word(words, num_words, 1);
+ const char *last_entry_txt = get_word(words, num_words, 2);
+ const char *wall_clock_time_txt = get_word(words, num_words, 3);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
+ time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
+ time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
+
+ bool ok = true;
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
+
+ ok = replicate_chart_request(send_to_plugin, parser, host, st,
+ first_entry_child, last_entry_child, child_wall_clock_time,
+ 0, 0);
+ }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ else {
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ }
+#endif
+
+ return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
+
+static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *name = get_word(words, num_words, idx++);
+ char *algorithm = get_word(words, num_words, idx++);
+ char *multiplier_s = get_word(words, num_words, idx++);
+ char *divisor_s = get_word(words, num_words, idx++);
+ char *options = get_word(words, num_words, idx++);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if (unlikely(!id))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
+
+ long multiplier = 1;
+ if (multiplier_s && *multiplier_s) {
+ multiplier = str2ll_encoded(multiplier_s);
+ if (unlikely(!multiplier))
+ multiplier = 1;
+ }
+
+ long divisor = 1;
+ if (likely(divisor_s && *divisor_s)) {
+ divisor = str2ll_encoded(divisor_s);
+ if (unlikely(!divisor))
+ divisor = 1;
+ }
+
+ if (unlikely(!algorithm || !*algorithm))
+ algorithm = "absolute";
+
+ if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+ netdata_log_debug(
+ D_PLUGINSD,
+ "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
+ rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
+ options ? options : "");
+
+ RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
+ int unhide_dimension = 1;
+
+ rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ bool obsolete = false;
+ if (options && *options) {
+ if (strstr(options, "obsolete") != NULL) {
+ obsolete = true;
+ rrddim_is_obsolete___safe_from_collector_thread(st, rd);
+ }
+ else
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
+
+ unhide_dimension = !strstr(options, "hidden");
+
+ if (strstr(options, "noreset") != NULL)
+ rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ if (strstr(options, "nooverflow") != NULL)
+ rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ }
+ else
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
+
+ bool should_update_dimension = false;
+
+ if (likely(unhide_dimension)) {
+ rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
+ should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
+ }
+ else {
+ rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
+ should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
+ }
+
+ if (should_update_dimension) {
+ rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
+ rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
+ }
+
+ pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete);
+
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+
+static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
+ char *name = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
+ NETDATA_DOUBLE v;
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_get_scope_chart(parser);
+
+ int global = (st) ? 0 : 1;
+
+ if (name && *name) {
+ if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
+ global = 1;
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
+ } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
+ global = 0;
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
+ }
+ }
+
+ if (unlikely(!name || !*name))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
+
+ if (unlikely(!value || !*value))
+ value = NULL;
+
+ if (unlikely(!value)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ (global) ? "HOST" : "CHART",
+ name);
+ return PARSER_RC_OK;
+ }
+
+ if (!global && !st)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
+
+ char *endptr = NULL;
+ v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
+ if (unlikely(endptr && *endptr)) {
+ if (endptr == value)
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name);
+ else
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name,
+ endptr);
+ }
+
+ if (global) {
+ const RRDVAR_ACQUIRED *rva = rrdvar_host_variable_add_and_acquire(host, name);
+ if (rva) {
+ rrdvar_host_variable_set(host, rva, v);
+ rrdvar_host_variable_release(host, rva);
+ }
+ else
+ netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
+ rrdhost_hostname(host),
+ name);
+ } else {
+ const RRDVAR_ACQUIRED *rsa = rrdvar_chart_variable_add_and_acquire(st, name);
+ if (rsa) {
+ rrdvar_chart_variable_set(st, rsa, v);
+ rrdvar_chart_variable_release(st, rsa);
+ }
+ else
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
+ rrdhost_hostname(host), rrdset_id(st), name);
+ }
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
+ pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH);
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
+ parser->user.enabled = 0;
+ return PARSER_RC_STOP;
+}
+
+static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
+ const char *name = get_word(words, num_words, 1);
+ const char *label_source = get_word(words, num_words, 2);
+ const char *value = get_word(words, num_words, 3);
+
+ if (!name || !label_source || !value)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
+
+ char *store = (char *)value;
+ bool allocated_store = false;
+
+ if(unlikely(num_words > 4)) {
+ allocated_store = true;
+ store = mallocz(PLUGINSD_LINE_MAX + 1);
+ size_t remaining = PLUGINSD_LINE_MAX;
+ char *move = store;
+ char *word;
+ for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
+ if(i > 3) {
+ *move++ = ' ';
+ *move = '\0';
+ remaining--;
+ }
+
+ size_t length = strlen(word);
+ if (length > remaining)
+ length = remaining;
+
+ remaining -= length;
+ memcpy(move, word, length);
+ move += length;
+ *move = '\0';
+ }
+ }
+
+ if(unlikely(!(parser->user.new_host_labels)))
+ parser->user.new_host_labels = rrdlabels_create();
+
+ if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) {
+ int is_ephemeral = appconfig_test_boolean_value((char *) value);
+ if (is_ephemeral) {
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL);
+ if (likely(host))
+ rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
+ }
+ }
+
+ rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
+
+ if (allocated_store)
+ freez(store);
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
+
+ if(unlikely(!host->rrdlabels))
+ host->rrdlabels = rrdlabels_create();
+
+ rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
+ if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST))
+ rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG);
+
+ if(!rrdlabels_exist(host->rrdlabels, "_os"))
+ rrdlabels_add(host->rrdlabels, "_os", string2str(host->os), RRDLABEL_SRC_AUTO);
+
+ if(!rrdlabels_exist(host->rrdlabels, "_hostname"))
+ rrdlabels_add(host->rrdlabels, "_hostname", string2str(host->hostname), RRDLABEL_SRC_AUTO);
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
+
+ rrdlabels_destroy(parser->user.new_host_labels);
+ parser->user.new_host_labels = NULL;
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
+ const char *name = get_word(words, num_words, 1);
+ const char *value = get_word(words, num_words, 2);
+ const char *label_source = get_word(words, num_words, 3);
+
+ if (!name || !value || !label_source) {
+ netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+ }
+
+ if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
+ RRDSET *st = pluginsd_get_scope_chart(parser);
+ parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
+ rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
+ }
+
+ rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
+
+ if(!parser->user.chart_rrdlabels_linked_temporarily) {
+ netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+ }
+
+ rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
+
+ rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
+ rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
+ rrdset_metadata_updated(st);
+
+ parser->user.chart_rrdlabels_linked_temporarily = NULL;
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
+ timing_init();
+
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *update_every_str = get_word(words, num_words, idx++);
+ char *end_time_str = get_word(words, num_words, idx++);
+ char *wall_clock_time_str = get_word(words, num_words, idx++);
+
+ if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ timing_step(TIMING_STEP_BEGIN2_PREPARE);
+
+ RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2);
+
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
+
+ timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
+
+ // ------------------------------------------------------------------------
+ // parse the parameters
+
+ time_t update_every = (time_t) str2ull_encoded(update_every_str);
+ time_t end_time = (time_t) str2ull_encoded(end_time_str);
+
+ time_t wall_clock_time;
+ if(likely(*wall_clock_time_str == '#'))
+ wall_clock_time = end_time;
+ else
+ wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
+
+ if (unlikely(update_every != st->update_every))
+ rrdset_set_update_every_s(st, update_every);
+
+ timing_step(TIMING_STEP_BEGIN2_PARSE);
+
+ // ------------------------------------------------------------------------
+ // prepare our state
+
+ pluginsd_lock_rrdset_data_collection(parser);
+
+ parser->user.v2.update_every = update_every;
+ parser->user.v2.end_time = end_time;
+ parser->user.v2.wall_clock_time = wall_clock_time;
+ parser->user.v2.ml_locked = ml_chart_update_begin(st);
+
+ timing_step(TIMING_STEP_BEGIN2_ML);
+
+ // ------------------------------------------------------------------------
+ // propagate it forward in v2
+
+ if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
+ parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
+
+ if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
+ // check receiver capabilities
+ bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+
+ // check sender capabilities
+ bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+
+ BUFFER *wb = parser->user.v2.stream_buffer.wb;
+
+ buffer_need_bytes(wb, 1024);
+
+ if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "' ", 2);
+
+ if(can_copy)
+ buffer_strcat(wb, update_every_str);
+ else
+ buffer_print_uint64_encoded(wb, integer_encoding, update_every);
+
+ buffer_fast_strcat(wb, " ", 1);
+
+ if(can_copy)
+ buffer_strcat(wb, end_time_str);
+ else
+ buffer_print_uint64_encoded(wb, integer_encoding, end_time);
+
+ buffer_fast_strcat(wb, " ", 1);
+
+ if(can_copy)
+ buffer_strcat(wb, wall_clock_time_str);
+ else
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
+
+ buffer_fast_strcat(wb, "\n", 1);
+
+ parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
+ parser->user.v2.stream_buffer.begin_v2_added = true;
+ }
+
+ timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
+
+ // ------------------------------------------------------------------------
+ // store it
+
+ st->last_collected_time.tv_sec = end_time;
+ st->last_collected_time.tv_usec = 0;
+ st->last_updated.tv_sec = end_time;
+ st->last_updated.tv_usec = 0;
+ st->counter++;
+ st->counter_done++;
+
+ // these are only needed for db mode RAM, ALLOC
+ st->db.current_entry++;
+ if(st->db.current_entry >= st->db.entries)
+ st->db.current_entry -= st->db.entries;
+
+ timing_step(TIMING_STEP_BEGIN2_STORE);
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
+ timing_init();
+
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *collected_str = get_word(words, num_words, idx++);
+ char *value_str = get_word(words, num_words, idx++);
+ char *flags_str = get_word(words, num_words, idx++);
+
+ if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ timing_step(TIMING_STEP_SET2_PREPARE);
+
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2);
+ if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ st->pluginsd.set = true;
+
+ if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
+
+ timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
+
+ // ------------------------------------------------------------------------
+ // parse the parameters
+
+ collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
+
+ NETDATA_DOUBLE value;
+ if(*value_str == '#')
+ value = (NETDATA_DOUBLE)collected_value;
+ else
+ value = str2ndd_encoded(value_str, NULL);
+
+ SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
+
+ timing_step(TIMING_STEP_SET2_PARSE);
+
+ // ------------------------------------------------------------------------
+ // check value and ML
+
+ if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
+ value = NAN;
+ flags = SN_EMPTY_SLOT;
+
+ if(parser->user.v2.ml_locked)
+ ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
+ }
+ else if(parser->user.v2.ml_locked) {
+ if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
+ // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
+ flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
+ }
+ else
+ flags |= SN_FLAG_NOT_ANOMALOUS;
+ }
+
+ timing_step(TIMING_STEP_SET2_ML);
+
+ // ------------------------------------------------------------------------
+ // propagate it forward in v2
+
+ if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
+ // check if receiver and sender have the same number parsing capabilities
+ bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+
+ // check the sender capabilities
+ bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+
+ BUFFER *wb = parser->user.v2.stream_buffer.wb;
+ buffer_need_bytes(wb, 1024);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ if(can_copy)
+ buffer_strcat(wb, collected_str);
+ else
+ buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
+ buffer_fast_strcat(wb, " ", 1);
+ if(can_copy)
+ buffer_strcat(wb, value_str);
+ else
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, flags, true);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
+
+ timing_step(TIMING_STEP_SET2_PROPAGATE);
+
+ // ------------------------------------------------------------------------
+ // store it
+
+ rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
+ rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.last_collected_value = collected_value;
+ rd->collector.last_stored_value = value;
+ rd->collector.last_calculated_value = value;
+ rd->collector.counter++;
+ rrddim_set_updated(rd);
+
+ timing_step(TIMING_STEP_SET2_STORE);
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ timing_init();
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ parser->user.data_collections_count++;
+
+ timing_step(TIMING_STEP_END2_PREPARE);
+
+ // ------------------------------------------------------------------------
+ // propagate the whole chart update in v1
+
+ if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
+ rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
+
+ timing_step(TIMING_STEP_END2_PUSH_V1);
+
+ // ------------------------------------------------------------------------
+ // unblock data collection
+
+ pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
+ rrdcontext_collected_rrdset(st);
+ store_metric_collection_completed();
+
+ timing_step(TIMING_STEP_END2_RRDSET);
+
+ // ------------------------------------------------------------------------
+ // propagate it forward
+
+ rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
+
+ timing_step(TIMING_STEP_END2_PROPAGATE);
+
+ // ------------------------------------------------------------------------
+ // cleanup RRDSET / RRDDIM
+
+ if(likely(st->pluginsd.dims_with_slots)) {
+ for(size_t i = 0; i < st->pluginsd.size ;i++) {
+ RRDDIM *rd = st->pluginsd.prd_array[i].rd;
+
+ if(!rd)
+ continue;
+
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
+ }
+ else {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st){
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
+ rrddim_foreach_done(rd);
+ }
+
+ // ------------------------------------------------------------------------
+ // reset state
+
+ parser->user.v2 = (struct parser_user_object_v2){ 0 };
+
+ timing_step(TIMING_STEP_END2_STORE);
+ timing_report();
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ netdata_log_info("PLUGINSD: plugin called EXIT.");
+ return PARSER_RC_STOP;
+}
+
+static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
+{
+ const char *host_uuid_str = get_word(words, num_words, 1);
+ const char *claim_id_str = get_word(words, num_words, 2);
+
+ if (!host_uuid_str || !claim_id_str) {
+ netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
+ host_uuid_str ? host_uuid_str : "[unset]",
+ claim_id_str ? claim_id_str : "[unset]");
+ return PARSER_RC_ERROR;
+ }
+
+ uuid_t uuid;
+ RRDHOST *host = parser->user.host;
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(host_uuid_str, uuid)) {
+ netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
+ netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(host_uuid_str, host->machine_guid) != 0) {
+ netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewhere else
+ }
+
+ rrdhost_aclk_state_lock(host);
+
+ if (host->aclk_state.claimed_id)
+ freez(host->aclk_state.claimed_id);
+
+ host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
+
+ rrdhost_aclk_state_unlock(host);
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
+
+ rrdpush_send_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+
+void pluginsd_cleanup_v2(PARSER *parser) {
+ // this is called when the thread is stopped while processing
+ pluginsd_clear_scope_chart(parser, "THREAD CLEANUP");
+}
+
+void pluginsd_process_thread_cleanup(void *ptr) {
+ PARSER *parser = (PARSER *)ptr;
+
+ pluginsd_cleanup_v2(parser);
+ pluginsd_host_define_cleanup(parser);
+
+ rrd_collector_finished();
+
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ if(parser->user.stream_log_fp) {
+ fclose(parser->user.stream_log_fp);
+ parser->user.stream_log_fp = NULL;
+ }
+#endif
+
+ parser_destroy(parser);
+}
+
+bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.host)
+ return false;
+
+ buffer_strcat(wb, rrdhost_hostname(parser->user.host));
+ return true;
+}
+
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
+
+ buffer_strcat(wb, rrdset_name(parser->user.st));
+ return true;
+}
+
+bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
+
+ buffer_strcat(wb, string2str(parser->user.st->context));
+ return true;
+}
+
+inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
+{
+ int enabled = cd->unsafe.enabled;
+
+ if (!fp_plugin_input || !fp_plugin_output || !enabled) {
+ cd->unsafe.enabled = 0;
+ return 0;
+ }
+
+ if (unlikely(fileno(fp_plugin_input) == -1)) {
+ netdata_log_error("input file descriptor given is not a valid stream");
+ cd->serial_failures++;
+ return 0;
+ }
+
+ if (unlikely(fileno(fp_plugin_output) == -1)) {
+ netdata_log_error("output file descriptor given is not a valid stream");
+ cd->serial_failures++;
+ return 0;
+ }
+
+ clearerr(fp_plugin_input);
+ clearerr(fp_plugin_output);
+
+ PARSER *parser;
+ {
+ PARSER_USER_OBJECT user = {
+ .enabled = cd->unsafe.enabled,
+ .host = host,
+ .cd = cd,
+ .trust_durations = trust_durations
+ };
+
+ // fp_plugin_output = our input; fp_plugin_input = our output
+ parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
+ }
+
+ pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
+
+ rrd_collector_started();
+
+ size_t count = 0;
+
+ // this keeps the parser with its current value
+ // so, parser needs to be allocated before pushing it
+ netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser)
+ {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ buffered_reader_init(&parser->reader);
+ CLEAN_BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
+ while(likely(service_running(SERVICE_COLLECTORS))) {
+
+ if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
+ buffered_reader_ret_t ret = buffered_reader_read_timeout(
+ &parser->reader,
+ fileno((FILE *) parser->fp_input),
+ 2 * 60 * MSEC_PER_SEC, true
+ );
+
+ if(unlikely(ret != BUFFERED_READER_READ_OK))
+ break;
+
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer)))
+ break;
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+
+ cd->unsafe.enabled = parser->user.enabled;
+ count = parser->user.data_collections_count;
+
+ if(likely(count)) {
+ cd->successful_collections += count;
+ cd->serial_failures = 0;
+ }
+ else
+ cd->serial_failures++;
+ }
+ netdata_thread_cleanup_pop(1); // free parser with the pop function
+
+ return count;
+}
+
+#include "gperf-hashtable.h"
+
+PARSER_RC parser_execute(PARSER *parser, const PARSER_KEYWORD *keyword, char **words, size_t num_words) {
+ switch(keyword->id) {
+ case PLUGINSD_KEYWORD_ID_SET2:
+ return pluginsd_set_v2(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_BEGIN2:
+ return pluginsd_begin_v2(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_END2:
+ return pluginsd_end_v2(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_SET:
+ return pluginsd_set(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_BEGIN:
+ return pluginsd_begin(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_END:
+ return pluginsd_end(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_RSET:
+ return pluginsd_replay_set(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_RBEGIN:
+ return pluginsd_replay_begin(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_RDSTATE:
+ return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_RSSTATE:
+ return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_REND:
+ return pluginsd_replay_end(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_DIMENSION:
+ return pluginsd_dimension(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_CHART:
+ return pluginsd_chart(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_CHART_DEFINITION_END:
+ return pluginsd_chart_definition_end(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_CLABEL:
+ return pluginsd_clabel(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_CLABEL_COMMIT:
+ return pluginsd_clabel_commit(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_FUNCTION:
+ return pluginsd_function(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_FUNCTION_RESULT_BEGIN:
+ return pluginsd_function_result_begin(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_FUNCTION_PROGRESS:
+ return pluginsd_function_progress(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_LABEL:
+ return pluginsd_label(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_OVERWRITE:
+ return pluginsd_overwrite(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_VARIABLE:
+ return pluginsd_variable(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_CLAIMED_ID:
+ return streaming_claimed_id(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_HOST:
+ return pluginsd_host(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_HOST_DEFINE:
+ return pluginsd_host_define(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_HOST_DEFINE_END:
+ return pluginsd_host_define_end(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_HOST_LABEL:
+ return pluginsd_host_labels(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_FLUSH:
+ return pluginsd_flush(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_DISABLE:
+ return pluginsd_disable(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_EXIT:
+ return pluginsd_exit(words, num_words, parser);
+ case PLUGINSD_KEYWORD_ID_CONFIG:
+ return pluginsd_config(words, num_words, parser);
+
+ case PLUGINSD_KEYWORD_ID_DYNCFG_ENABLE:
+ case PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_MODULE:
+ case PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_JOB:
+ case PLUGINSD_KEYWORD_ID_DYNCFG_RESET:
+ case PLUGINSD_KEYWORD_ID_REPORT_JOB_STATUS:
+ case PLUGINSD_KEYWORD_ID_DELETE_JOB:
+ return pluginsd_dyncfg_noop(words, num_words, parser);
+
+ default:
+ netdata_log_error("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
+ return PARSER_RC_ERROR;;
+ }
+}
+
+void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
+ parser->repertoire = repertoire;
+
+ for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
+ if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
+ worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
+ }
+}
+
+int pluginsd_parser_unittest(void) {
+ PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
+ pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
+
+ char *lines[] = {
+ "BEGIN2 abcdefghijklmnopqr 123",
+ "SET2 abcdefg 0x12345678 0 0",
+ "SET2 hijklmnoqr 0x12345678 0 0",
+ "SET2 stuvwxyz 0x12345678 0 0",
+ "END2",
+ NULL,
+ };
+
+ char *words[PLUGINSD_MAX_WORDS];
+ size_t iterations = 1000000;
+ size_t count = 0;
+ char input[PLUGINSD_LINE_MAX + 1];
+
+ usec_t started = now_realtime_usec();
+ while(--iterations) {
+ for(size_t line = 0; lines[line] ;line++) {
+ strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
+ size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(words, num_words, 0);
+ const PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
+ if(unlikely(!keyword))
+ fatal("Cannot parse the line '%s'", lines[line]);
+ count++;
+ }
+ }
+ usec_t ended = now_realtime_usec();
+
+ netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
+ (double)(ended - started) / (double)USEC_PER_SEC,
+ (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
+
+ parser_destroy(p);
+ return 0;
+}
diff --git a/src/collectors/plugins.d/pluginsd_parser.h b/src/collectors/plugins.d/pluginsd_parser.h
new file mode 100644
index 000000000..d317a77be
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_parser.h
@@ -0,0 +1,244 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINSD_PARSER_H
+#define NETDATA_PLUGINSD_PARSER_H
+
+#include "daemon/common.h"
+
+#define WORKER_PARSER_FIRST_JOB 3
+
+// this has to be in-sync with the same at receiver.c
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
+
+// this controls the max response size of a function
+#define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024)
+
+#define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024
+
+#define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral"
+// PARSER return codes
+typedef enum __attribute__ ((__packed__)) parser_rc {
+ PARSER_RC_OK, // Callback was successful, go on
+ PARSER_RC_STOP, // Callback says STOP
+ PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
+} PARSER_RC;
+
+typedef enum __attribute__ ((__packed__)) parser_input_type {
+ PARSER_INPUT_SPLIT = (1 << 1),
+ PARSER_DEFER_UNTIL_KEYWORD = (1 << 2),
+} PARSER_INPUT_TYPE;
+
+typedef enum __attribute__ ((__packed__)) {
+ PARSER_INIT_PLUGINSD = (1 << 1),
+ PARSER_INIT_STREAMING = (1 << 2),
+ PARSER_REP_METADATA = (1 << 3),
+} PARSER_REPERTOIRE;
+
+struct parser;
+typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser);
+
+typedef struct parser_keyword {
+ char *keyword;
+ size_t id;
+ PARSER_REPERTOIRE repertoire;
+ size_t worker_job_id;
+} PARSER_KEYWORD;
+
+typedef struct parser_user_object {
+ bool cleanup_slots;
+ RRDSET *st;
+ RRDHOST *host;
+ void *opaque;
+ struct plugind *cd;
+ int trust_durations;
+ RRDLABELS *new_host_labels;
+ RRDLABELS *chart_rrdlabels_linked_temporarily;
+ size_t data_collections_count;
+ int enabled;
+
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ FILE *stream_log_fp;
+ PARSER_REPERTOIRE stream_log_repertoire;
+#endif
+
+ STREAM_CAPABILITIES capabilities; // receiver capabilities
+
+ struct {
+ bool parsing_host;
+ uuid_t machine_guid;
+ char machine_guid_str[UUID_STR_LEN];
+ STRING *hostname;
+ RRDLABELS *rrdlabels;
+ } host_define;
+
+ struct parser_user_object_replay {
+ time_t start_time;
+ time_t end_time;
+
+ usec_t start_time_ut;
+ usec_t end_time_ut;
+
+ time_t wall_clock_time;
+
+ bool rset_enabled;
+ } replay;
+
+ struct parser_user_object_v2 {
+ bool locked_data_collection;
+ RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this
+ time_t update_every;
+ time_t end_time;
+ time_t wall_clock_time;
+ bool ml_locked;
+ } v2;
+} PARSER_USER_OBJECT;
+
+typedef struct parser {
+ uint8_t version; // Parser version
+ PARSER_REPERTOIRE repertoire;
+ uint32_t flags;
+ int fd; // Socket
+ FILE *fp_input; // Input source e.g. stream
+ FILE *fp_output; // Stream to send commands to plugin
+
+#ifdef ENABLE_HTTPS
+ NETDATA_SSL *ssl_output;
+#endif
+#ifdef ENABLE_H2O
+ void *h2o_ctx; // if set we use h2o_stream functions to send data
+#endif
+
+ PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
+
+ struct buffered_reader reader;
+ struct line_splitter line;
+ const PARSER_KEYWORD *keyword;
+
+ struct {
+ const char *end_keyword;
+ BUFFER *response;
+ void (*action)(struct parser *parser, void *action_data);
+ void *action_data;
+ } defer;
+
+ struct {
+ DICTIONARY *functions;
+ usec_t smaller_monotonic_timeout_ut;
+ } inflight;
+
+ struct {
+ SPINLOCK spinlock;
+ } writer;
+
+} PARSER;
+
+PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
+void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire);
+void parser_destroy(PARSER *working_parser);
+void pluginsd_cleanup_v2(PARSER *parser);
+void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire);
+PARSER_RC parser_execute(PARSER *parser, const PARSER_KEYWORD *keyword, char **words, size_t num_words);
+
+static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) {
+ const char *s = src, *keyword_start;
+
+ while (unlikely(isspace_map[(uint8_t)*s])) s++;
+ keyword_start = s;
+
+ while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) {
+ *dst++ = *s++;
+ dst_size--;
+ }
+ *dst = '\0';
+ return dst_size == 0 ? 0 : (int) (s - keyword_start);
+}
+
+const PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len);
+
+static inline const PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) {
+ const PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command));
+ if(t && (t->repertoire & parser->repertoire))
+ return t;
+
+ return NULL;
+}
+
+bool parser_reconstruct_node(BUFFER *wb, void *ptr);
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr);
+bool parser_reconstruct_context(BUFFER *wb, void *ptr);
+
+static inline int parser_action(PARSER *parser, char *input) {
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ static __thread char line[PLUGINSD_LINE_MAX + 1];
+ strncpyz(line, input, sizeof(line) - 1);
+#endif
+
+ parser->line.count++;
+
+ if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
+ char command[100 + 1];
+ bool has_keyword = find_first_keyword(input, command, 100, isspace_map_pluginsd);
+
+ if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
+ if(parser->defer.response) {
+ buffer_strcat(parser->defer.response, input);
+ if(buffer_strlen(parser->defer.response) > PLUGINSD_MAX_DEFERRED_SIZE) {
+ // more than PLUGINSD_MAX_DEFERRED_SIZE of data,
+ // or a bad plugin that did not send the end_keyword
+ nd_log(NDLS_DAEMON, NDLP_ERR, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
+ return 1;
+ }
+ }
+ return 0;
+ }
+ else {
+ // call the action
+ parser->defer.action(parser, parser->defer.action_data);
+
+ // empty everything
+ parser->defer.action = NULL;
+ parser->defer.action_data = NULL;
+ parser->defer.end_keyword = NULL;
+ parser->defer.response = NULL;
+ parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD;
+ }
+ return 0;
+ }
+
+ parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(parser->line.words, parser->line.num_words, 0);
+
+ if(unlikely(!command)) {
+ line_splitter_reset(&parser->line);
+ return 0;
+ }
+
+ PARSER_RC rc;
+ parser->keyword = parser_find_keyword(parser, command);
+ if(likely(parser->keyword)) {
+ worker_is_busy(parser->keyword->worker_job_id);
+
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire)
+ fprintf(parser->user.stream_log_fp, "%s", line);
+#endif
+
+ rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words);
+ // rc = (*t->func)(words, num_words, parser);
+ worker_is_idle();
+ }
+ else
+ rc = PARSER_RC_ERROR;
+
+ if(rc == PARSER_RC_ERROR) {
+ CLEAN_BUFFER *wb = buffer_create(1024, NULL);
+ line_splitter_reconstruct_line(wb, &parser->line);
+ netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
+ command, parser->line.count, buffer_tostring(wb));
+ }
+
+ line_splitter_reset(&parser->line);
+ return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
+}
+
+#endif //NETDATA_PLUGINSD_PARSER_H
diff --git a/src/collectors/plugins.d/pluginsd_replication.c b/src/collectors/plugins.d/pluginsd_replication.c
new file mode 100644
index 000000000..8d0975210
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_replication.c
@@ -0,0 +1,371 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "pluginsd_replication.h"
+
+PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *start_time_str = get_word(words, num_words, idx++);
+ char *end_time_str = get_word(words, num_words, idx++);
+ char *child_now_str = get_word(words, num_words, idx++);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st;
+ if (likely(!id || !*id))
+ st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ else
+ st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN))
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if(start_time_str && end_time_str) {
+ time_t start_time = (time_t) str2ull_encoded(start_time_str);
+ time_t end_time = (time_t) str2ull_encoded(end_time_str);
+
+ time_t wall_clock_time = 0, tolerance;
+ bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
+ if(child_now_str) {
+ wall_clock_time = (time_t) str2ull_encoded(child_now_str);
+ tolerance = st->update_every + 1;
+ wall_clock_comes_from_child = true;
+ }
+
+ if(wall_clock_time <= 0) {
+ wall_clock_time = now_realtime_sec();
+ tolerance = st->update_every + 5;
+ wall_clock_comes_from_child = false;
+ }
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(
+ (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
+ "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
+
+ internal_error(
+ true,
+ "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
+ st->replay.after, st->replay.before);
+#endif
+
+ if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
+ if (unlikely(end_time - start_time != st->update_every))
+ rrdset_set_update_every_s(st, end_time - start_time);
+
+ st->last_collected_time.tv_sec = end_time;
+ st->last_collected_time.tv_usec = 0;
+
+ st->last_updated.tv_sec = end_time;
+ st->last_updated.tv_usec = 0;
+
+ st->counter++;
+ st->counter_done++;
+
+ // these are only needed for db mode RAM, ALLOC
+ st->db.current_entry++;
+ if(st->db.current_entry >= st->db.entries)
+ st->db.current_entry -= st->db.entries;
+
+ parser->user.replay.start_time = start_time;
+ parser->user.replay.end_time = end_time;
+ parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+ parser->user.replay.wall_clock_time = wall_clock_time;
+ parser->user.replay.rset_enabled = true;
+
+ return PARSER_RC_OK;
+ }
+
+ netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
+ " from %ld to %ld, but timestamps are invalid "
+ "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+ wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock",
+ tolerance);
+ }
+
+ // the child sends an RBEGIN without any parameters initially
+ // setting rset_enabled to false, means the RSET should not store any metrics
+ // to store metrics, the RBEGIN needs to have timestamps
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
+ parser->user.replay.wall_clock_time = 0;
+ parser->user.replay.rset_enabled = false;
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *value_str = get_word(words, num_words, idx++);
+ char *flags_str = get_word(words, num_words, idx++);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if(!parser->user.replay.rset_enabled) {
+ nd_log_limit_static_thread_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR,
+ "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
+ rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+
+ // we have to return OK here
+ return PARSER_RC_OK;
+ }
+
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ st->pluginsd.set = true;
+
+ if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
+ rrdhost_hostname(host),
+ rrdset_id(st),
+ dimension,
+ PLUGINSD_KEYWORD_REPLAY_SET,
+ parser->user.replay.start_time,
+ parser->user.replay.end_time,
+ PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+ }
+
+ if (unlikely(!value_str || !*value_str))
+ value_str = "NAN";
+
+ if(unlikely(!flags_str))
+ flags_str = "";
+
+ if (likely(value_str)) {
+ RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
+
+ if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
+ NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
+ SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
+
+ if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
+ value = NAN;
+ flags = SN_EMPTY_SLOT;
+ }
+
+ rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
+ rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.counter++;
+ }
+ else {
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. "
+ "Ignoring data.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
+ }
+ }
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) {
+ if(parser->user.replay.rset_enabled == false)
+ return PARSER_RC_OK;
+
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *last_collected_ut_str = get_word(words, num_words, idx++);
+ char *last_collected_value_str = get_word(words, num_words, idx++);
+ char *last_calculated_value_str = get_word(words, num_words, idx++);
+ char *last_stored_value_str = get_word(words, num_words, idx++);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ if(st->pluginsd.set) {
+ // reset pos to reuse the same RDAs
+ st->pluginsd.pos = 0;
+ st->pluginsd.set = false;
+ }
+
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
+ if(last_collected_ut > dim_last_collected_ut) {
+ rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
+ }
+
+ rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
+ rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
+ rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) {
+ if(parser->user.replay.rset_enabled == false)
+ return PARSER_RC_OK;
+
+ char *last_collected_ut_str = get_word(words, num_words, 1);
+ char *last_updated_ut_str = get_word(words, num_words, 2);
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE,
+ PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
+ if(last_collected_ut > chart_last_collected_ut) {
+ st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
+ }
+
+ usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
+ usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
+ if(last_updated_ut > chart_last_updated_ut) {
+ st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
+ st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
+ }
+
+ st->counter++;
+ st->counter_done++;
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
+ if (num_words < 7) { // accepts 7, but the 7th is optional
+ netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
+ return PARSER_RC_ERROR;
+ }
+
+ const char *update_every_child_txt = get_word(words, num_words, 1);
+ const char *first_entry_child_txt = get_word(words, num_words, 2);
+ const char *last_entry_child_txt = get_word(words, num_words, 3);
+ const char *start_streaming_txt = get_word(words, num_words, 4);
+ const char *first_entry_requested_txt = get_word(words, num_words, 5);
+ const char *last_entry_requested_txt = get_word(words, num_words, 6);
+ const char *child_world_time_txt = get_word(words, num_words, 7); // optional
+
+ time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
+ time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
+ time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
+
+ bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
+ time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
+ time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
+
+ // the optional child world time
+ time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
+ child_world_time_txt) : now_realtime_sec();
+
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true,
+ "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
+ rrdhost_hostname(host), rrdset_id(st),
+ (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
+ start_streaming?"true":"false",
+ (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
+ (unsigned long long)child_world_time
+ );
+#endif
+
+ parser->user.data_collections_count++;
+
+ if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
+ time_t now = now_realtime_sec();
+ time_t started = st->rrdhost->receiver->replication_first_time_t;
+ time_t current = parser->user.replay.end_time;
+
+ if(started && current > started) {
+ host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ host->rrdpush_receiver_replication_percent);
+ }
+ }
+
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
+ parser->user.replay.wall_clock_time = 0;
+ parser->user.replay.rset_enabled = false;
+
+ st->counter++;
+ st->counter_done++;
+ store_metric_collection_completed();
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+ if(start_streaming)
+ st->replay.log_next_data_collection = true;
+#endif
+
+ if (start_streaming) {
+ if (st->update_every != update_every_child)
+ rrdset_set_update_every_s(st, update_every_child);
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
+ rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
+ }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ else
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
+ rrdhost_hostname(host), rrdset_id(st));
+#endif
+
+ pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
+
+ host->rrdpush_receiver_replication_percent = 100.0;
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
+
+ return PARSER_RC_OK;
+ }
+
+ pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
+
+ rrdcontext_updated_retention_rrdset(st);
+
+ bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
+ first_entry_child, last_entry_child, child_world_time,
+ first_entry_requested, last_entry_requested);
+ return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
diff --git a/src/collectors/plugins.d/pluginsd_replication.h b/src/collectors/plugins.d/pluginsd_replication.h
new file mode 100644
index 000000000..1c6f617e6
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_replication.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINSD_REPLICATION_H
+#define NETDATA_PLUGINSD_REPLICATION_H
+
+#include "pluginsd_internals.h"
+
+PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser);
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser);
+PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser);
+PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser);
+PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser);
+
+#endif //NETDATA_PLUGINSD_REPLICATION_H