diff options
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/Makefile.am | 12 | ||||
-rw-r--r-- | collectors/plugins.d/README.md | 680 | ||||
-rw-r--r-- | collectors/plugins.d/gperf-config.txt | 58 | ||||
-rw-r--r-- | collectors/plugins.d/gperf-hashtable.h | 177 | ||||
-rw-r--r-- | collectors/plugins.d/local_listeners.c | 400 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 362 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 67 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 3208 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 245 |
9 files changed, 0 insertions, 5209 deletions
diff --git a/collectors/plugins.d/Makefile.am b/collectors/plugins.d/Makefile.am deleted file mode 100644 index 67fed309..00000000 --- a/collectors/plugins.d/Makefile.am +++ /dev/null @@ -1,12 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -SUBDIRS = \ - $(NULL) - -dist_noinst_DATA = \ - gperf-config.txt \ - README.md \ - $(NULL) diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md deleted file mode 100644 index 0752d389..00000000 --- a/collectors/plugins.d/README.md +++ /dev/null @@ -1,680 +0,0 @@ -<!-- -title: "External plugins" -custom_edit_url: "https://github.com/netdata/netdata/edit/master/collectors/plugins.d/README.md" -sidebar_label: "External plugins" -learn_status: "Published" -learn_topic_type: "References" -learn_rel_path: "Developers/External plugins" ---> - -# External plugins - -`plugins.d` is the Netdata internal plugin that collects metrics -from external processes, thus allowing Netdata to use **external plugins**. - -## Provided External Plugins - -| plugin | language | O/S | description | -|:------------------------------------------------------------------------------------------------------:|:--------:|:--------------:|:----------------------------------------------------------------------------------------------------------------------------------------| -| [apps.plugin](https://github.com/netdata/netdata/blob/master/collectors/apps.plugin/README.md) | `C` | linux, freebsd | monitors the whole process tree on Linux and FreeBSD and breaks down system resource usage by **process**, **user** and **user group**. | -| [charts.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/charts.d.plugin/README.md) | `BASH` | all | a **plugin orchestrator** for data collection modules written in `BASH` v4+. | -| [cups.plugin](https://github.com/netdata/netdata/blob/master/collectors/cups.plugin/README.md) | `C` | all | monitors **CUPS** | -| [ebpf.plugin](https://github.com/netdata/netdata/blob/master/collectors/ebpf.plugin/README.md) | `C` | linux | monitors different metrics on environments using kernel internal functions. | -| [go.d.plugin](https://github.com/netdata/go.d.plugin/blob/master/README.md) | `GO` | all | collects metrics from the system, applications, or third-party APIs. | -| [ioping.plugin](https://github.com/netdata/netdata/blob/master/collectors/ioping.plugin/README.md) | `C` | all | measures disk latency. | -| [freeipmi.plugin](https://github.com/netdata/netdata/blob/master/collectors/freeipmi.plugin/README.md) | `C` | linux | collects metrics from enterprise hardware sensors, on Linux servers. | -| [nfacct.plugin](https://github.com/netdata/netdata/blob/master/collectors/nfacct.plugin/README.md) | `C` | linux | collects netfilter firewall, connection tracker and accounting metrics using `libmnl` and `libnetfilter_acct`. | -| [xenstat.plugin](https://github.com/netdata/netdata/blob/master/collectors/xenstat.plugin/README.md) | `C` | linux | collects XenServer and XCP-ng metrics using `lxenstat`. | -| [perf.plugin](https://github.com/netdata/netdata/blob/master/collectors/perf.plugin/README.md) | `C` | linux | collects CPU performance metrics using performance monitoring units (PMU). | -| [python.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/README.md) | `python` | all | a **plugin orchestrator** for data collection modules written in `python` v2 or v3 (both are supported). | -| [slabinfo.plugin](https://github.com/netdata/netdata/blob/master/collectors/slabinfo.plugin/README.md) | `C` | linux | collects kernel internal cache objects (SLAB) metrics. | - -Plugin orchestrators may also be described as **modular plugins**. They are modular since they accept custom made modules to be included. Writing modules for these plugins is easier than accessing the native Netdata API directly. You will find modules already available for each orchestrator under the directory of the particular modular plugin (e.g. under python.d.plugin for the python orchestrator). -Each of these modular plugins has each own methods for defining modules. Please check the examples and their documentation. - -## Motivation - -This plugin allows Netdata to use **external plugins** for data collection: - -1. external data collection plugins may be written in any computer language. - -2. external data collection plugins may use O/S capabilities or `setuid` to - run with escalated privileges (compared to the `netdata` daemon). - The communication between the external plugin and Netdata is unidirectional - (from the plugin to Netdata), so that Netdata cannot manipulate an external - plugin running with escalated privileges. - -## Operation - -Each of the external plugins is expected to run forever. -Netdata will start it when it starts and stop it when it exits. - -If the external plugin exits or crashes, Netdata will log an error. -If the external plugin exits or crashes without pushing metrics to Netdata, Netdata will not start it again. - -- Plugins that exit with any value other than zero, will be disabled. Plugins that exit with zero, will be restarted after some time. -- Plugins may also be disabled by Netdata if they output things that Netdata does not understand. - -The `stdout` of external plugins is connected to Netdata to receive metrics, -with the API defined below. - -The `stderr` of external plugins is connected to Netdata's `error.log`. - -Plugins can create any number of charts with any number of dimensions each. Each chart can have its own characteristics independently of the others generated by the same plugin. For example, one chart may have an update frequency of 1 second, another may have 5 seconds and a third may have 10 seconds. - -## Configuration - -Netdata will supply the environment variables `NETDATA_USER_CONFIG_DIR` (for user supplied) and `NETDATA_STOCK_CONFIG_DIR` (for Netdata supplied) configuration files to identify the directory where configuration files are stored. It is up to the plugin to read the configuration it needs. - -The `netdata.conf` section `[plugins]` section contains a list of all the plugins found at the system where Netdata runs, with a boolean setting to enable them or not. - -Example: - -``` -[plugins] - # enable running new plugins = yes - # check for new plugins every = 60 - - # charts.d = yes - # ioping = yes - # python.d = yes -``` - -The setting `enable running new plugins` sets the default behavior for all external plugins. It can be -overridden for distinct plugins by modifying the appropriate plugin value configuration to either `yes` or `no`. - -The setting `check for new plugins every` sets the interval between scans of the directory -`/usr/libexec/netdata/plugins.d`. New plugins can be added any time, and Netdata will detect them in a timely manner. - -For each of the external plugins enabled, another `netdata.conf` section -is created, in the form of `[plugin:NAME]`, where `NAME` is the name of the external plugin. -This section allows controlling the update frequency of the plugin and provide -additional command line arguments to it. - -For example, for `apps.plugin` the following section is available: - -``` -[plugin:apps] - # update every = 1 - # command options = -``` - -- `update every` controls the granularity of the external plugin. -- `command options` allows giving additional command line options to the plugin. - -Netdata will provide to the external plugins the environment variable `NETDATA_UPDATE_EVERY`, in seconds (the default is 1). This is the **minimum update frequency** for all charts. A plugin that is updating values more frequently than this, is just wasting resources. - -Netdata will call the plugin with just one command line parameter: the number of seconds the user requested this plugin to update its data (by default is also 1). - -Other than the above, the plugin configuration is up to the plugin. - -Keep in mind, that the user may use Netdata configuration to overwrite chart and dimension parameters. This is transparent to the plugin. - -### Autoconfiguration - -Plugins should attempt to autoconfigure themselves when possible. - -For example, if your plugin wants to monitor `squid`, you can search for it on port `3128` or `8080`. If any succeeds, you can proceed. If it fails you can output an error (on stderr) saying that you cannot find `squid` running and giving instructions about the plugin configuration. Then you can stop (exit with non-zero value), so that Netdata will not attempt to start the plugin again. - -## External Plugins API - -Any program that can print a few values to its standard output can become a Netdata external plugin. - -Netdata parses lines starting with: - -- `CHART` - create or update a chart -- `DIMENSION` - add or update a dimension to the chart just created -- `VARIABLE` - define a variable (to be used in health calculations) -- `CLABEL` - add a label to a chart -- `CLABEL_COMMIT` - commit added labels to the chart -- `FUNCTION` - define a function that can be called later to execute it -- `BEGIN` - initialize data collection for a chart -- `SET` - set the value of a dimension for the initialized chart -- `END` - complete data collection for the initialized chart -- `FLUSH` - ignore the last collected values -- `DISABLE` - disable this plugin - -a single program can produce any number of charts with any number of dimensions each. - -Charts can be added any time (not just the beginning). - -### Command line parameters - -The plugin **MUST** accept just **one** parameter: **the number of seconds it is -expected to update the values for its charts**. The value passed by Netdata -to the plugin is controlled via its configuration file (so there is no need -for the plugin to handle this configuration option). - -The external plugin can overwrite the update frequency. For example, the server may -request per second updates, but the plugin may ignore it and update its charts -every 5 seconds. - -### Environment variables - -There are a few environment variables that are set by `netdata` and are -available for the plugin to use. - -| variable | description | -|:---------------------------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `NETDATA_USER_CONFIG_DIR` | The directory where all Netdata-related user configuration should be stored. If the plugin requires custom user configuration, this is the place the user has saved it (normally under `/etc/netdata`). | -| `NETDATA_STOCK_CONFIG_DIR` | The directory where all Netdata -related stock configuration should be stored. If the plugin is shipped with configuration files, this is the place they can be found (normally under `/usr/lib/netdata/conf.d`). | -| `NETDATA_PLUGINS_DIR` | The directory where all Netdata plugins are stored. | -| `NETDATA_USER_PLUGINS_DIRS` | The list of directories where custom plugins are stored. | -| `NETDATA_WEB_DIR` | The directory where the web files of Netdata are saved. | -| `NETDATA_CACHE_DIR` | The directory where the cache files of Netdata are stored. Use this directory if the plugin requires a place to store data. A new directory should be created for the plugin for this purpose, inside this directory. | -| `NETDATA_LOG_DIR` | The directory where the log files are stored. By default the `stderr` output of the plugin will be saved in the `error.log` file of Netdata. | -| `NETDATA_HOST_PREFIX` | This is used in environments where system directories like `/sys` and `/proc` have to be accessed at a different path. | -| `NETDATA_DEBUG_FLAGS` | This is a number (probably in hex starting with `0x`), that enables certain Netdata debugging features. Check **\[[Tracing Options]]** for more information. | -| `NETDATA_UPDATE_EVERY` | The minimum number of seconds between chart refreshes. This is like the **internal clock** of Netdata (it is user configurable, defaulting to `1`). There is no meaning for a plugin to update its values more frequently than this number of seconds. | - -### The output of the plugin - -The plugin should output instructions for Netdata to its output (`stdout`). Since this uses pipes, please make sure you flush stdout after every iteration. - -#### DISABLE - -`DISABLE` will disable this plugin. This will prevent Netdata from restarting the plugin. You can also exit with the value `1` to have the same effect. - -#### HOST_DEFINE - -`HOST_DEFINE` defines a new (or updates an existing) virtual host. - -The template is: - -> HOST_DEFINE machine_guid hostname - -where: - -- `machine_guid` - - uniquely identifies the host, this is what will be needed to add charts to the host. - -- `hostname` - - is the hostname of the virtual host - -#### HOST_LABEL - -`HOST_LABEL` adds a key-value pair to the virtual host labels. It has to be given between `HOST_DEFINE` and `HOST_DEFINE_END`. - -The template is: - -> HOST_LABEL key value - -where: - -- `key` - - uniquely identifies the key of the label - -- `value` - - is the value associated with this key - -There are a few special keys that are used to define the system information of the monitored system: - -- `_cloud_provider_type` -- `_cloud_instance_type` -- `_cloud_instance_region` -- `_os_name` -- `_os_version` -- `_kernel_version` -- `_system_cores` -- `_system_cpu_freq` -- `_system_ram_total` -- `_system_disk_space` -- `_architecture` -- `_virtualization` -- `_container` -- `_container_detection` -- `_virt_detection` -- `_is_k8s_node` -- `_install_type` -- `_prebuilt_arch` -- `_prebuilt_dist` - -#### HOST_DEFINE_END - -`HOST_DEFINE_END` commits the host information, creating a new host entity, or updating an existing one with the same `machine_guid`. - -#### HOST - -`HOST` switches data collection between hosts. - -The template is: - -> HOST machine_guid - -where: - -- `machine_guid` - - is the UUID of the host to switch to. After this command, every other command following it is assumed to be associated with this host. - Setting machine_guid to `localhost` switches data collection to the local host. - -#### CHART - -`CHART` defines a new chart. - -the template is: - -> CHART type.id name title units \[family \[context \[charttype \[priority \[update_every \[options \[plugin [module]]]]]]]] - - where: - -- `type.id` - - uniquely identifies the chart, - this is what will be needed to add values to the chart - - the `type` part controls the menu the charts will appear in - -- `name` - - is the name that will be presented to the user instead of `id` in `type.id`. This means that only the `id` part of - `type.id` is changed. When a name has been given, the chart is indexed (and can be referred) as both `type.id` and - `type.name`. You can set name to `''`, or `null`, or `(null)` to disable it. If a chart with the same name already - exists, a serial number is automatically attached to the name to avoid naming collisions. - -- `title` - - the text above the chart - -- `units` - - the label of the vertical axis of the chart, - all dimensions added to a chart should have the same units - of measurement - -- `family` - - is used to group charts together - (for example all eth0 charts should say: eth0), - if empty or missing, the `id` part of `type.id` will be used - - this controls the sub-menu on the dashboard - -- `context` - - the context is giving the template of the chart. For example, if multiple charts present the same information for a different family, they should have the same `context` - - this is used for looking up rendering information for the chart (colors, sizes, informational texts) and also apply alerts to it - -- `charttype` - - one of `line`, `area` or `stacked`, - if empty or missing, the `line` will be used - -- `priority` - - is the relative priority of the charts as rendered on the web page, - lower numbers make the charts appear before the ones with higher numbers, - if empty or missing, `1000` will be used - -- `update_every` - - overwrite the update frequency set by the server, - if empty or missing, the user configured value will be used - -- `options` - - a space separated list of options, enclosed in quotes. 4 options are currently supported: `obsolete` to mark a chart as obsolete (Netdata will hide it and delete it after some time), `detail` to mark a chart as insignificant (this may be used by dashboards to make the charts smaller, or somehow visualize properly a less important chart), `store_first` to make Netdata store the first collected value, assuming there was an invisible previous value set to zero (this is used by statsd charts - if the first data collected value of incremental dimensions is not zero based, unrealistic spikes will appear with this option set) and `hidden` to perform all operations on a chart, but do not offer it on dashboards (the chart will be send to external databases). `CHART` options have been added in Netdata v1.7 and the `hidden` option was added in 1.10. - -- `plugin` and `module` - - both are just names that are used to let the user identify the plugin and the module that generated the chart. If `plugin` is unset or empty, Netdata will automatically set the filename of the plugin that generated the chart. `module` has not default. - -#### DIMENSION - -`DIMENSION` defines a new dimension for the chart - -the template is: - -> DIMENSION id \[name \[algorithm \[multiplier \[divisor [options]]]]] - - where: - -- `id` - - the `id` of this dimension (it is a text value, not numeric), - this will be needed later to add values to the dimension - - We suggest to avoid using `.` in dimension ids. External databases expect metrics to be `.` separated and people will get confused if a dimension id contains a dot. - -- `name` - - the name of the dimension as it will appear at the legend of the chart, - if empty or missing the `id` will be used - -- `algorithm` - - one of: - - - `absolute` - - the value is to drawn as-is (interpolated to second boundary), - if `algorithm` is empty, invalid or missing, `absolute` is used - - - `incremental` - - the value increases over time, - the difference from the last value is presented in the chart, - the server interpolates the value and calculates a per second figure - - - `percentage-of-absolute-row` - - the % of this value compared to the total of all dimensions - - - `percentage-of-incremental-row` - - the % of this value compared to the incremental total of - all dimensions - -- `multiplier` - - an integer value to multiply the collected value, - if empty or missing, `1` is used - -- `divisor` - - an integer value to divide the collected value, - if empty or missing, `1` is used - -- `options` - - a space separated list of options, enclosed in quotes. Options supported: `obsolete` to mark a dimension as obsolete (Netdata will delete it after some time) and `hidden` to make this dimension hidden, it will take part in the calculations but will not be presented in the chart. - -#### VARIABLE - -> VARIABLE [SCOPE] name = value - -`VARIABLE` defines a variable that can be used in alerts. This is to used for setting constants (like the max connections a server may accept). - -Variables support 2 scopes: - -- `GLOBAL` or `HOST` to define the variable at the host level. -- `LOCAL` or `CHART` to define the variable at the chart level. Use chart-local variables when the same variable may exist for different charts (i.e. Netdata monitors 2 mysql servers, and you need to set the `max_connections` each server accepts). Using chart-local variables is the ideal to build alert templates. - -The position of the `VARIABLE` line, sets its default scope (in case you do not specify a scope). So, defining a `VARIABLE` before any `CHART`, or between `END` and `BEGIN` (outside any chart), sets `GLOBAL` scope, while defining a `VARIABLE` just after a `CHART` or a `DIMENSION`, or within the `BEGIN` - `END` block of a chart, sets `LOCAL` scope. - -These variables can be set and updated at any point. - -Variable names should use alphanumeric characters, the `.` and the `_`. - -The `value` is floating point (Netdata used `long double`). - -Variables are transferred to upstream Netdata servers (streaming and database replication). - -#### CLABEL - -> CLABEL name value source - -`CLABEL` defines a label used to organize and identify a chart. - -Name and value accept characters according to the following table: - -| Character | Symbol | Label Name | Label Value | -|---------------------|:------:|:----------:|:-----------:| -| UTF-8 character | UTF-8 | _ | keep | -| Lower case letter | [a-z] | keep | keep | -| Upper case letter | [A-Z] | keep | [a-z] | -| Digit | [0-9] | keep | keep | -| Underscore | _ | keep | keep | -| Minus | - | keep | keep | -| Plus | + | _ | keep | -| Colon | : | _ | keep | -| Semicolon | ; | _ | : | -| Equal | = | _ | : | -| Period | . | keep | keep | -| Comma | , | . | . | -| Slash | / | keep | keep | -| Backslash | \ | / | / | -| At | @ | _ | keep | -| Space | ' ' | _ | keep | -| Opening parenthesis | ( | _ | keep | -| Closing parenthesis | ) | _ | keep | -| Anything else | | _ | _ | - -The `source` is an integer field that can have the following values: -- `1`: The value was set automatically. -- `2`: The value was set manually. -- `4`: This is a K8 label. -- `8`: This is a label defined using `netdata` agent cloud link. - -#### CLABEL_COMMIT - -`CLABEL_COMMIT` indicates that all labels were defined and the chart can be updated. - -#### FUNCTION - -> FUNCTION [GLOBAL] "name and parameters of the function" timeout "help string for users" - -A function can be used by users to ask for more information from the collector. Netdata maintains a registry of functions in 2 levels: - -- per node -- per chart - -Both node and chart functions are exactly the same, but chart functions allow Netdata to relate functions with charts and therefore present a context sensitive menu of functions related to the chart the user is using. - -A function is identified by a string. The allowed characters in the function definition are: - -| Character | Symbol | In Functions | -|-------------------|:------:|:------------:| -| UTF-8 character | UTF-8 | keep | -| Lower case letter | [a-z] | keep | -| Upper case letter | [A-Z] | keep | -| Digit | [0-9] | keep | -| Underscore | _ | keep | -| Comma | , | keep | -| Minus | - | keep | -| Period | . | keep | -| Colon | : | keep | -| Slash | / | keep | -| Space | ' ' | keep | -| Semicolon | ; | : | -| Equal | = | : | -| Backslash | \ | / | -| Anything else | | _ | - -Uses can get a list of all the registered functions using the `/api/v1/functions` end point of Netdata. - -Users can call functions using the `/api/v1/function` end point of Netdata. -Once a function is called, the plugin will receive at its standard input a command that looks like this: - -> FUNCTION transaction_id timeout "name and parameters of the function" - -The plugin is expected to parse and validate `name and parameters of the function`. Netdata allows users to edit this string, append more parameters or even change the ones the plugin originally exposed. To minimize the security risk, Netdata guarantees that only the characters shown above are accepted in function definitions, but still the plugin should carefully inspect the `name and parameters of the function` to ensure that it is valid and not harmful. - -If the plugin rejects the request, it should respond with this: - -``` -FUNCTION_RESULT_BEGIN transaction_id 400 application/json -{ - "status": 400, - "error_message": "description of the rejection reasons" -} -FUNCTION_RESULT_END -``` - -If the plugin prepares a response, it should send (via its standard output, together with the collected data, but not interleaved with them): - -> FUNCTION_RESULT_BEGIN transaction_id http_error_code content_type expiration - -Where: - - - `transaction_id` is the transaction id that Netdata sent for this function execution - - `http_error` is the http error code Netdata should respond with, 200 is the "ok" response - - `content_type` is the content type of the response - - `expiration` is the absolute timestamp (number, unix epoch) this response expires - -Immediately after this, all text is assumed to be the response content. -The content is text and line oriented. The maximum line length accepted is 15kb. Longer lines will be truncated. -The type of the context itself depends on the plugin and the UI. - -To terminate the message, Netdata seeks a line with just this: - -> FUNCTION_RESULT_END - -This defines the end of the message. `FUNCTION_RESULT_END` should appear in a line alone, without any other text, so it is wise to add `\n` before and after it. - -After this line, Netdata resumes processing collected metrics from the plugin. - -## Data collection - -data collection is defined as a series of `BEGIN` -> `SET` -> `END` lines - -> BEGIN type.id [microseconds] - -- `type.id` - - is the unique identification of the chart (as given in `CHART`) - -- `microseconds` - - is the number of microseconds since the last update of the chart. It is optional. - - Under heavy system load, the system may have some latency transferring - data from the plugins to Netdata via the pipe. This number improves - accuracy significantly, since the plugin is able to calculate the - duration between its iterations better than Netdata. - - The first time the plugin is started, no microseconds should be given - to Netdata. - -> SET id = value - -- `id` - - is the unique identification of the dimension (of the chart just began) - -- `value` - - is the collected value, only integer values are collected. If you want to push fractional values, multiply this value by 100 or 1000 and set the `DIMENSION` divider to 1000. - -> END - - END does not take any parameters, it commits the collected values for all dimensions to the chart. If a dimensions was not `SET`, its value will be empty for this commit. - -More `SET` lines may appear to update all the dimensions of the chart. -All of them in one `BEGIN` -> `END` block. - -All `SET` lines within a single `BEGIN` -> `END` block have to refer to the -same chart. - -If more charts need to be updated, each chart should have its own -`BEGIN` -> `SET` -> `END` block. - -If, for any reason, a plugin has issued a `BEGIN` but wants to cancel it, -it can issue a `FLUSH`. The `FLUSH` command will instruct Netdata to ignore -all the values collected since the last `BEGIN` command. - -If a plugin does not behave properly (outputs invalid lines, or does not -follow these guidelines), will be disabled by Netdata. - -### collected values - -Netdata will collect any **signed** value in the 64bit range: -`-9.223.372.036.854.775.808` to `+9.223.372.036.854.775.807` - -If a value is not collected, leave it empty, like this: - -`SET id =` - -or do not output the line at all. - -## Modular Plugins - -1. **python**, use `python.d.plugin`, there are many examples in the [python.d - directory](https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/README.md) - - python is ideal for Netdata plugins. It is a simple, yet powerful way to collect data, it has a very small memory footprint, although it is not the most CPU efficient way to do it. - -2. **BASH**, use `charts.d.plugin`, there are many examples in the [charts.d - directory](https://github.com/netdata/netdata/blob/master/collectors/charts.d.plugin/README.md) - - BASH is the simplest scripting language for collecting values. It is the less efficient though in terms of CPU resources. You can use it to collect data quickly, but extensive use of it might use a lot of system resources. - -3. **C** - - Of course, C is the most efficient way of collecting data. This is why Netdata itself is written in C. - -## Writing Plugins Properly - -There are a few rules for writing plugins properly: - -1. Respect system resources - - Pay special attention to efficiency: - - - Initialize everything once, at the beginning. Initialization is not an expensive operation. Your plugin will most probably be started once and run forever. So, do whatever heavy operation is needed at the beginning, just once. - - Do the absolutely minimum while iterating to collect values repeatedly. - - If you need to connect to another server to collect values, avoid re-connects if possible. Connect just once, with keep-alive (for HTTP) enabled and collect values using the same connection. - - Avoid any CPU or memory heavy operation while collecting data. If you control memory allocation, avoid any memory allocation while iterating to collect values. - - Avoid running external commands when possible. If you are writing shell scripts avoid especially pipes (each pipe is another fork, a very expensive operation). - -2. The best way to iterate at a constant pace is this pseudo code: - -```js - var update_every = argv[1] * 1000; /* seconds * 1000 = milliseconds */ - - readConfiguration(); - - if(!verifyWeCanCollectValues()) { - print("DISABLE"); - exit(1); - } - - createCharts(); /* print CHART and DIMENSION statements */ - - var loops = 0; - var last_run = 0; - var next_run = 0; - var dt_since_last_run = 0; - var now = 0; - - while(true) { - /* find the current time in milliseconds */ - now = currentTimeStampInMilliseconds(); - - /* - * find the time of the next loop - * this makes sure we are always aligned - * with the Netdata daemon - */ - next_run = now - (now % update_every) + update_every; - - /* - * wait until it is time - * it is important to do it in a loop - * since many wait functions can be interrupted - */ - while( now < next_run ) { - sleepMilliseconds(next_run - now); - now = currentTimeStampInMilliseconds(); - } - - /* calculate the time passed since the last run */ - if ( loops > 0 ) - dt_since_last_run = (now - last_run) * 1000; /* in microseconds */ - - /* prepare for the next loop */ - last_run = now; - loops++; - - /* do your magic here to collect values */ - collectValues(); - - /* send the collected data to Netdata */ - printValues(dt_since_last_run); /* print BEGIN, SET, END statements */ - } -``` - - Using the above procedure, your plugin will be synchronized to start data collection on steps of `update_every`. There will be no need to keep track of latencies in data collection. - - Netdata interpolates values to second boundaries, so even if your plugin is not perfectly aligned it does not matter. Netdata will find out. When your plugin works in increments of `update_every`, there will be no gaps in the charts due to the possible cumulative micro-delays in data collection. Gaps will only appear if the data collection is really delayed. - -3. If you are not sure of memory leaks, exit every one hour. Netdata will re-start your process. - -4. If possible, try to autodetect if your plugin should be enabled, without any configuration. - - diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt deleted file mode 100644 index bad51367..00000000 --- a/collectors/plugins.d/gperf-config.txt +++ /dev/null @@ -1,58 +0,0 @@ -%struct-type -%omit-struct-type -%define hash-function-name gperf_keyword_hash_function -%define lookup-function-name gperf_lookup_keyword -%define word-array-name gperf_keywords -%define constants-prefix GPERF_PARSER_ -%define slot-name keyword -%global-table -%null-strings -PARSER_KEYWORD; -%% -# -# Plugins Only Keywords -# -FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1 -DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2 -EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3 -HOST, 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4 -HOST_DEFINE, 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5 -HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6 -HOST_LABEL, 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7 -# -# Common keywords -# -BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8 -CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9 -CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10 -CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11 -DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12 -END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13 -FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14 -FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15 -LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16 -OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17 -SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18 -VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19 -DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 -DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 -DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 -DYNCFG_RESET, 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 -REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 -DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 -# -# Streaming only keywords -# -CLAIMED_ID, 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26 -BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 -SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 -END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 -# -# Streaming Replication keywords -# -CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30 -RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 -RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 -REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33 -RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34 -RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35 diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h deleted file mode 100644 index b327d8d6..00000000 --- a/collectors/plugins.d/gperf-hashtable.h +++ /dev/null @@ -1,177 +0,0 @@ -/* ANSI-C code produced by gperf version 3.1 */ -/* Command-line: gperf --multiple-iterations=1000 --output-file=gperf-hashtable.h gperf-config.txt */ -/* Computed positions: -k'1-2' */ - -#if !((' ' == 32) && ('!' == 33) && ('"' == 34) && ('#' == 35) \ - && ('%' == 37) && ('&' == 38) && ('\'' == 39) && ('(' == 40) \ - && (')' == 41) && ('*' == 42) && ('+' == 43) && (',' == 44) \ - && ('-' == 45) && ('.' == 46) && ('/' == 47) && ('0' == 48) \ - && ('1' == 49) && ('2' == 50) && ('3' == 51) && ('4' == 52) \ - && ('5' == 53) && ('6' == 54) && ('7' == 55) && ('8' == 56) \ - && ('9' == 57) && (':' == 58) && (';' == 59) && ('<' == 60) \ - && ('=' == 61) && ('>' == 62) && ('?' == 63) && ('A' == 65) \ - && ('B' == 66) && ('C' == 67) && ('D' == 68) && ('E' == 69) \ - && ('F' == 70) && ('G' == 71) && ('H' == 72) && ('I' == 73) \ - && ('J' == 74) && ('K' == 75) && ('L' == 76) && ('M' == 77) \ - && ('N' == 78) && ('O' == 79) && ('P' == 80) && ('Q' == 81) \ - && ('R' == 82) && ('S' == 83) && ('T' == 84) && ('U' == 85) \ - && ('V' == 86) && ('W' == 87) && ('X' == 88) && ('Y' == 89) \ - && ('Z' == 90) && ('[' == 91) && ('\\' == 92) && (']' == 93) \ - && ('^' == 94) && ('_' == 95) && ('a' == 97) && ('b' == 98) \ - && ('c' == 99) && ('d' == 100) && ('e' == 101) && ('f' == 102) \ - && ('g' == 103) && ('h' == 104) && ('i' == 105) && ('j' == 106) \ - && ('k' == 107) && ('l' == 108) && ('m' == 109) && ('n' == 110) \ - && ('o' == 111) && ('p' == 112) && ('q' == 113) && ('r' == 114) \ - && ('s' == 115) && ('t' == 116) && ('u' == 117) && ('v' == 118) \ - && ('w' == 119) && ('x' == 120) && ('y' == 121) && ('z' == 122) \ - && ('{' == 123) && ('|' == 124) && ('}' == 125) && ('~' == 126)) -/* The character set is not based on ISO-646. */ -#error "gperf generated tables don't work with this execution character set. Please report a bug to <bug-gperf@gnu.org>." -#endif - - -#define GPERF_PARSER_TOTAL_KEYWORDS 35 -#define GPERF_PARSER_MIN_WORD_LENGTH 3 -#define GPERF_PARSER_MAX_WORD_LENGTH 22 -#define GPERF_PARSER_MIN_HASH_VALUE 3 -#define GPERF_PARSER_MAX_HASH_VALUE 47 -/* maximum key range = 45, duplicates = 0 */ - -#ifdef __GNUC__ -__inline -#else -#ifdef __cplusplus -inline -#endif -#endif -static unsigned int -gperf_keyword_hash_function (register const char *str, register size_t len) -{ - static unsigned char asso_values[] = - { - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 11, 18, 0, 0, 0, - 6, 48, 9, 0, 48, 48, 20, 48, 0, 8, - 48, 48, 1, 12, 48, 20, 18, 48, 2, 0, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, - 48, 48, 48, 48, 48, 48 - }; - return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]]; -} - -static PARSER_KEYWORD gperf_keywords[] = - { - {(char*)0}, {(char*)0}, {(char*)0}, -#line 30 "gperf-config.txt" - {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, -#line 49 "gperf-config.txt" - {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, -#line 56 "gperf-config.txt" - {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33}, -#line 17 "gperf-config.txt" - {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}, -#line 16 "gperf-config.txt" - {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, -#line 55 "gperf-config.txt" - {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, -#line 29 "gperf-config.txt" - {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12}, -#line 42 "gperf-config.txt" - {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, - {(char*)0}, -#line 40 "gperf-config.txt" - {"DYNCFG_RESET", 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, -#line 37 "gperf-config.txt" - {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, -#line 26 "gperf-config.txt" - {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9}, -#line 35 "gperf-config.txt" - {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, -#line 48 "gperf-config.txt" - {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, -#line 57 "gperf-config.txt" - {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34}, -#line 41 "gperf-config.txt" - {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, -#line 39 "gperf-config.txt" - {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, -#line 58 "gperf-config.txt" - {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35}, -#line 18 "gperf-config.txt" - {"HOST", 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4}, -#line 38 "gperf-config.txt" - {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, -#line 25 "gperf-config.txt" - {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, -#line 47 "gperf-config.txt" - {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, -#line 54 "gperf-config.txt" - {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, -#line 27 "gperf-config.txt" - {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10}, -#line 21 "gperf-config.txt" - {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7}, -#line 19 "gperf-config.txt" - {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5}, -#line 53 "gperf-config.txt" - {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30}, -#line 46 "gperf-config.txt" - {"CLAIMED_ID", 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26}, -#line 15 "gperf-config.txt" - {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, -#line 20 "gperf-config.txt" - {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6}, -#line 28 "gperf-config.txt" - {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11}, -#line 31 "gperf-config.txt" - {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14}, -#line 34 "gperf-config.txt" - {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17}, -#line 33 "gperf-config.txt" - {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16}, -#line 36 "gperf-config.txt" - {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19}, - {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, - {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, - {(char*)0}, -#line 32 "gperf-config.txt" - {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15} - }; - -PARSER_KEYWORD * -gperf_lookup_keyword (register const char *str, register size_t len) -{ - if (len <= GPERF_PARSER_MAX_WORD_LENGTH && len >= GPERF_PARSER_MIN_WORD_LENGTH) - { - register unsigned int key = gperf_keyword_hash_function (str, len); - - if (key <= GPERF_PARSER_MAX_HASH_VALUE) - { - register const char *s = gperf_keywords[key].keyword; - - if (s && *str == *s && !strcmp (str + 1, s + 1)) - return &gperf_keywords[key]; - } - } - return 0; -} diff --git a/collectors/plugins.d/local_listeners.c b/collectors/plugins.d/local_listeners.c deleted file mode 100644 index f2c5e688..00000000 --- a/collectors/plugins.d/local_listeners.c +++ /dev/null @@ -1,400 +0,0 @@ -#include "libnetdata/libnetdata.h" -#include "libnetdata/required_dummies.h" - -#include <stdio.h> -#include <stdlib.h> -#include <stdbool.h> -#include <dirent.h> -#include <string.h> -#include <unistd.h> -#include <ctype.h> -#include <arpa/inet.h> - -typedef enum { - PROC_NET_PROTOCOL_TCP, - PROC_NET_PROTOCOL_TCP6, - PROC_NET_PROTOCOL_UDP, - PROC_NET_PROTOCOL_UDP6, -} PROC_NET_PROTOCOLS; - -#define MAX_ERROR_LOGS 10 - -static size_t pid_fds_processed = 0; -static size_t pid_fds_failed = 0; -static size_t errors_encountered = 0; - -static inline const char *protocol_name(PROC_NET_PROTOCOLS protocol) { - switch(protocol) { - default: - case PROC_NET_PROTOCOL_TCP: - return "TCP"; - - case PROC_NET_PROTOCOL_UDP: - return "UDP"; - - case PROC_NET_PROTOCOL_TCP6: - return "TCP6"; - - case PROC_NET_PROTOCOL_UDP6: - return "UDP6"; - } -} - -static inline int read_cmdline(pid_t pid, char* buffer, size_t bufferSize) { - char path[FILENAME_MAX + 1]; - snprintfz(path, FILENAME_MAX, "%s/proc/%d/cmdline", netdata_configured_host_prefix, pid); - - FILE* file = fopen(path, "r"); - if (!file) { - if(++errors_encountered < MAX_ERROR_LOGS) - collector_error("LOCAL-LISTENERS: error opening file: %s\n", path); - - return -1; - } - - size_t bytesRead = fread(buffer, 1, bufferSize - 1, file); - buffer[bytesRead] = '\0'; // Ensure null-terminated - - // Replace null characters in cmdline with spaces - for (size_t i = 0; i < bytesRead; i++) { - if (buffer[i] == '\0') { - buffer[i] = ' '; - } - } - - fclose(file); - return 0; -} - -static inline void fix_cmdline(char* str) { - if (str == NULL) - return; - - char *s = str; - - do { - if(*s == '|' || iscntrl(*s)) - *s = '_'; - - } while(*++s); - - - while(s > str && *(s-1) == ' ') - *--s = '\0'; -} - -// ---------------------------------------------------------------------------- - -#define HASH_TABLE_SIZE 100000 - -typedef struct Node { - unsigned int inode; // key - - // values - unsigned int port; - char local_address[INET6_ADDRSTRLEN]; - PROC_NET_PROTOCOLS protocol; - bool processed; - - // linking - struct Node *prev, *next; -} Node; - -typedef struct HashTable { - Node *table[HASH_TABLE_SIZE]; -} HashTable; - -static HashTable *hashTable_key_inode_port_value = NULL; - -static inline void generate_output(const char *protocol, const char *address, unsigned int port, const char *cmdline) { - printf("%s|%s|%u|%s\n", protocol, address, port, cmdline); -} - -HashTable* createHashTable() { - HashTable *hashTable = (HashTable*)mallocz(sizeof(HashTable)); - memset(hashTable, 0, sizeof(HashTable)); - return hashTable; -} - -static inline unsigned int hashFunction(unsigned int inode) { - return inode % HASH_TABLE_SIZE; -} - -static inline void insertHashTable(HashTable *hashTable, unsigned int inode, unsigned int port, PROC_NET_PROTOCOLS protocol, char *local_address) { - unsigned int index = hashFunction(inode); - Node *newNode = (Node*)mallocz(sizeof(Node)); - newNode->inode = inode; - newNode->port = port; - newNode->protocol = protocol; - strncpyz(newNode->local_address, local_address, INET6_ADDRSTRLEN - 1); - DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(hashTable->table[index], newNode, prev, next); -} - -static inline bool lookupHashTable_and_execute(HashTable *hashTable, unsigned int inode, pid_t pid) { - unsigned int index = hashFunction(inode); - for(Node *node = hashTable->table[index], *next = NULL ; node ; node = next) { - next = node->next; - - if(node->inode == inode && node->port) { - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(hashTable->table[index], node, prev, next); - char cmdline[8192] = ""; - read_cmdline(pid, cmdline, sizeof(cmdline)); - fix_cmdline(cmdline); - generate_output(protocol_name(node->protocol), node->local_address, node->port, cmdline); - freez(node); - return true; - } - } - - return false; -} - -void freeHashTable(HashTable *hashTable) { - for (unsigned int i = 0; i < HASH_TABLE_SIZE; i++) { - while(hashTable->table[i]) { - Node *tmp = hashTable->table[i]; - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(hashTable->table[i], tmp, prev, next); - generate_output(protocol_name(tmp->protocol), tmp->local_address, tmp->port, ""); - freez(tmp); - } - } - freez(hashTable); -} - -// ---------------------------------------------------------------------------- - -static inline void found_this_socket_inode(pid_t pid, unsigned int inode) { - lookupHashTable_and_execute(hashTable_key_inode_port_value, inode, pid); -} - -bool find_all_sockets_in_proc(const char *proc_filename) { - DIR *proc_dir, *fd_dir; - struct dirent *proc_entry, *fd_entry; - char path_buffer[FILENAME_MAX + 1]; - - proc_dir = opendir(proc_filename); - if (proc_dir == NULL) { - if(++errors_encountered < MAX_ERROR_LOGS) - collector_error("LOCAL-LISTENERS: cannot opendir() '%s'", proc_filename); - - pid_fds_failed++; - return false; - } - - while ((proc_entry = readdir(proc_dir)) != NULL) { - // Check if directory entry is a PID by seeing if the name is made up of digits only - int is_pid = 1; - for (char *c = proc_entry->d_name; *c != '\0'; c++) { - if (*c < '0' || *c > '9') { - is_pid = 0; - break; - } - } - - if (!is_pid) - continue; - - // Build the path to the fd directory of the process - snprintfz(path_buffer, FILENAME_MAX, "%s/%s/fd/", proc_filename, proc_entry->d_name); - - fd_dir = opendir(path_buffer); - if (fd_dir == NULL) { - if(++errors_encountered < MAX_ERROR_LOGS) - collector_error("LOCAL-LISTENERS: cannot opendir() '%s'", path_buffer); - - pid_fds_failed++; - continue; - } - - while ((fd_entry = readdir(fd_dir)) != NULL) { - if(!strcmp(fd_entry->d_name, ".") || !strcmp(fd_entry->d_name, "..")) - continue; - - char link_path[FILENAME_MAX + 1]; - char link_target[FILENAME_MAX + 1]; - int inode; - - // Build the path to the file descriptor link - snprintfz(link_path, FILENAME_MAX, "%s/%s", path_buffer, fd_entry->d_name); - - ssize_t len = readlink(link_path, link_target, sizeof(link_target) - 1); - if (len == -1) { - if(++errors_encountered < MAX_ERROR_LOGS) - collector_error("LOCAL-LISTENERS: cannot read link '%s'", link_path); - - pid_fds_failed++; - continue; - } - link_target[len] = '\0'; - - pid_fds_processed++; - - // If the link target indicates a socket, print its inode number - if (sscanf(link_target, "socket:[%d]", &inode) == 1) - found_this_socket_inode((pid_t)strtoul(proc_entry->d_name, NULL, 10), inode); - } - - closedir(fd_dir); - } - - closedir(proc_dir); - return true; -} - -// ---------------------------------------------------------------------------- - -static inline void add_port_and_inode(PROC_NET_PROTOCOLS protocol, unsigned int port, unsigned int inode, char *local_address) { - insertHashTable(hashTable_key_inode_port_value, inode, port, protocol, local_address); -} - -static inline void print_ipv6_address(const char *ipv6_str, char *dst) { - unsigned k; - char buf[9]; - struct sockaddr_in6 sa; - - // Initialize sockaddr_in6 - memset(&sa, 0, sizeof(struct sockaddr_in6)); - sa.sin6_family = AF_INET6; - sa.sin6_port = htons(0); // replace 0 with your port number - - // Convert hex string to byte array - for (k = 0; k < 4; ++k) - { - memset(buf, 0, 9); - memcpy(buf, ipv6_str + (k * 8), 8); - sa.sin6_addr.s6_addr32[k] = strtoul(buf, NULL, 16); - } - - // Convert to human-readable format - if (inet_ntop(AF_INET6, &(sa.sin6_addr), dst, INET6_ADDRSTRLEN) == NULL) - *dst = '\0'; -} - -static inline void print_ipv4_address(uint32_t address, char *dst) { - uint8_t octets[4]; - octets[0] = address & 0xFF; - octets[1] = (address >> 8) & 0xFF; - octets[2] = (address >> 16) & 0xFF; - octets[3] = (address >> 24) & 0xFF; - sprintf(dst, "%u.%u.%u.%u", octets[0], octets[1], octets[2], octets[3]); -} - -bool read_proc_net_x(const char *filename, PROC_NET_PROTOCOLS protocol) { - FILE *fp; - char *line = NULL; - size_t len = 0; - ssize_t read; - char address[INET6_ADDRSTRLEN]; - - ssize_t min_line_length = (protocol == PROC_NET_PROTOCOL_TCP || protocol == PROC_NET_PROTOCOL_UDP) ? 105 : 155; - - fp = fopen(filename, "r"); - if (fp == NULL) - return false; - - // Read line by line - while ((read = getline(&line, &len, fp)) != -1) { - if(read < min_line_length) continue; - - char local_address6[33], rem_address6[33]; - unsigned int local_address, local_port, state, rem_address, rem_port, inode; - - switch(protocol) { - case PROC_NET_PROTOCOL_TCP: - if(line[34] != '0' || line[35] != 'A') - continue; - // fall-through - - case PROC_NET_PROTOCOL_UDP: - if (sscanf(line, "%*d: %X:%X %X:%X %X %*X:%*X %*X:%*X %*X %*d %*d %u", - &local_address, &local_port, &rem_address, &rem_port, &state, &inode) != 6) - continue; - - print_ipv4_address(local_address, address); - break; - - case PROC_NET_PROTOCOL_TCP6: - if(line[82] != '0' || line[83] != 'A') - continue; - // fall-through - - case PROC_NET_PROTOCOL_UDP6: - if(sscanf(line, "%*d: %32[0-9A-Fa-f]:%X %32[0-9A-Fa-f]:%X %X %*X:%*X %*X:%*X %*X %*d %*d %u", - local_address6, &local_port, rem_address6, &rem_port, &state, &inode) != 6) - continue; - - print_ipv6_address(local_address6, address); - break; - } - - add_port_and_inode(protocol, local_port, inode, address); - } - - fclose(fp); - if (line) - free(line); - - return true; -} - -// ---------------------------------------------------------------------------- -typedef struct { - bool read_tcp; - bool read_tcp6; - bool read_udp; - bool read_udp6; -} CommandLineArguments; - -int main(int argc, char **argv) { - char path[FILENAME_MAX + 1]; - hashTable_key_inode_port_value = createHashTable(); - - netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); - if(!netdata_configured_host_prefix) netdata_configured_host_prefix = ""; - - CommandLineArguments args = {.read_tcp = false, .read_tcp6 = false, .read_udp = false, .read_udp6 = false}; - - for (int i = 1; i < argc; i++) { - if (strcmp("tcp", argv[i]) == 0) { - args.read_tcp = true; - continue; - } else if (strcmp("tcp6", argv[i]) == 0) { - args.read_tcp6 = true; - continue; - } else if (strcmp("udp", argv[i]) == 0) { - args.read_udp = true; - continue; - } else if (strcmp("udp6", argv[i]) == 0) { - args.read_udp6 = true; - continue; - } - } - - bool read_all_files = (!args.read_tcp && !args.read_tcp6 && !args.read_udp && !args.read_udp6); - - if (read_all_files || args.read_tcp) { - snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_TCP); - } - - if (read_all_files || args.read_udp) { - snprintfz(path, FILENAME_MAX, "%s/proc/net/udp", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_UDP); - } - - if (read_all_files || args.read_tcp6) { - snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp6", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_TCP6); - } - - if (read_all_files || args.read_udp6) { - snprintfz(path, FILENAME_MAX, "%s/proc/net/udp6", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_UDP6); - } - - snprintfz(path, FILENAME_MAX, "%s/proc", netdata_configured_host_prefix); - find_all_sockets_in_proc(path); - - freeHashTable(hashTable_key_inode_port_value); - return 0; -} diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c deleted file mode 100644 index 20061ad2..00000000 --- a/collectors/plugins.d/plugins_d.c +++ /dev/null @@ -1,362 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "plugins_d.h" -#include "pluginsd_parser.h" - -char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { [0] = PLUGINS_DIR, }; -struct plugind *pluginsd_root = NULL; - -inline size_t pluginsd_initialize_plugin_directories() -{ - char plugins_dirs[(FILENAME_MAX * 2) + 1]; - static char *plugins_dir_list = NULL; - - // Get the configuration entry - if (likely(!plugins_dir_list)) { - snprintfz(plugins_dirs, FILENAME_MAX * 2, "\"%s\" \"%s/custom-plugins.d\"", PLUGINS_DIR, CONFIG_DIR); - plugins_dir_list = strdupz(config_get(CONFIG_SECTION_DIRECTORIES, "plugins", plugins_dirs)); - } - - // Parse it and store it to plugin directories - return quoted_strings_splitter_config(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES); -} - -static inline void plugin_set_disabled(struct plugind *cd) { - spinlock_lock(&cd->unsafe.spinlock); - cd->unsafe.enabled = false; - spinlock_unlock(&cd->unsafe.spinlock); -} - -bool plugin_is_enabled(struct plugind *cd) { - spinlock_lock(&cd->unsafe.spinlock); - bool ret = cd->unsafe.enabled; - spinlock_unlock(&cd->unsafe.spinlock); - return ret; -} - -static inline void plugin_set_running(struct plugind *cd) { - spinlock_lock(&cd->unsafe.spinlock); - cd->unsafe.running = true; - spinlock_unlock(&cd->unsafe.spinlock); -} - -static inline bool plugin_is_running(struct plugind *cd) { - spinlock_lock(&cd->unsafe.spinlock); - bool ret = cd->unsafe.running; - spinlock_unlock(&cd->unsafe.spinlock); - return ret; -} - -static void pluginsd_worker_thread_cleanup(void *arg) { - struct plugind *cd = (struct plugind *)arg; - - worker_unregister(); - - spinlock_lock(&cd->unsafe.spinlock); - - cd->unsafe.running = false; - cd->unsafe.thread = 0; - - pid_t pid = cd->unsafe.pid; - cd->unsafe.pid = 0; - - spinlock_unlock(&cd->unsafe.spinlock); - - if (pid) { - siginfo_t info; - netdata_log_info("PLUGINSD: 'host:%s', killing data collection child process with pid %d", - rrdhost_hostname(cd->host), pid); - - if (killpid(pid) != -1) { - netdata_log_info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...", - rrdhost_hostname(cd->host), pid); - - netdata_waitid(P_PID, (id_t)pid, &info, WEXITED); - } - } -} - -#define SERIAL_FAILURES_THRESHOLD 10 -static void pluginsd_worker_thread_handle_success(struct plugind *cd) { - if (likely(cd->successful_collections)) { - sleep((unsigned int)cd->update_every); - return; - } - - if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) { - netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, - plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); - - sleep((unsigned int)(cd->update_every * 10)); - return; - } - - if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - netdata_log_error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, " - "although it reports success (exits with 0)." - "We have tried to collect something %zu times - unsuccessfully. Disabling it.", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures); - plugin_set_disabled(cd); - return; - } -} - -static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) { - if (worker_ret_code == -1) { - netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); - plugin_set_disabled(cd); - return; - } - - if (!cd->successful_collections) { - netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code); - plugin_set_disabled(cd); - return; - } - - if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) { - netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, - plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); - sleep((unsigned int)(cd->update_every * 10)); - return; - } - - if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." - "We tried to restart it %zu times, but it failed to generate data. Disabling it.", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, - cd->successful_collections, cd->serial_failures); - plugin_set_disabled(cd); - return; - } -} - -#undef SERIAL_FAILURES_THRESHOLD - -static void *pluginsd_worker_thread(void *arg) { - worker_register("PLUGINSD"); - - netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg); - - { - struct plugind *cd = (struct plugind *) arg; - plugin_set_running(cd); - - size_t count = 0; - - while(service_running(SERVICE_COLLECTORS)) { - FILE *fp_child_input = NULL; - FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); - - if(unlikely(!fp_child_input || !fp_child_output)) { - netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", - rrdhost_hostname(cd->host), cd->cmd); - break; - } - - nd_log(NDLS_DAEMON, NDLP_DEBUG, - "PLUGINSD: 'host:%s' connected to '%s' running on pid %d", - rrdhost_hostname(cd->host), - cd->fullfilename, cd->unsafe.pid); - - const char *plugin = strrchr(cd->fullfilename, '/'); - if(plugin) - plugin++; - else - plugin = cd->fullfilename; - - char module[100]; - snprintfz(module, sizeof(module), "plugins.d[%s]", plugin); - ND_LOG_STACK lgs[] = { - ND_LOG_FIELD_TXT(NDF_MODULE, module), - ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rrdhost_hostname(cd->host)), - ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "pluginsd"), - ND_LOG_FIELD_END(), - }; - ND_LOG_STACK_PUSH(lgs); - - count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0); - - nd_log(NDLS_DAEMON, NDLP_DEBUG, - "PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count); - - killpid(cd->unsafe.pid); - - int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); - - if(likely(worker_ret_code == 0)) - pluginsd_worker_thread_handle_success(cd); - else - pluginsd_worker_thread_handle_error(cd, worker_ret_code); - - cd->unsafe.pid = 0; - - if(unlikely(!plugin_is_enabled(cd))) - break; - } - } - - netdata_thread_cleanup_pop(1); - return NULL; -} - -static void pluginsd_main_cleanup(void *data) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - netdata_log_info("PLUGINSD: cleaning up..."); - - struct plugind *cd; - for (cd = pluginsd_root; cd; cd = cd->next) { - spinlock_lock(&cd->unsafe.spinlock); - if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) { - netdata_log_info("PLUGINSD: 'host:%s', stopping plugin thread: %s", - rrdhost_hostname(cd->host), cd->id); - - netdata_thread_cancel(cd->unsafe.thread); - } - spinlock_unlock(&cd->unsafe.spinlock); - } - - netdata_log_info("PLUGINSD: cleanup completed."); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - - worker_unregister(); -} - -void *pluginsd_main(void *ptr) -{ - netdata_thread_cleanup_push(pluginsd_main_cleanup, ptr); - - int automatic_run = config_get_boolean(CONFIG_SECTION_PLUGINS, "enable running new plugins", 1); - int scan_frequency = (int)config_get_number(CONFIG_SECTION_PLUGINS, "check for new plugins every", 60); - if (scan_frequency < 1) - scan_frequency = 1; - - // disable some plugins by default - config_get_boolean(CONFIG_SECTION_PLUGINS, "slabinfo", CONFIG_BOOLEAN_NO); - config_get_boolean(CONFIG_SECTION_PLUGINS, "logs-management", -#if defined(LOGS_MANAGEMENT_DEV_MODE) - CONFIG_BOOLEAN_YES -#else - CONFIG_BOOLEAN_NO -#endif - ); - // it crashes (both threads) on Alpine after we made it multi-threaded - // works with "--device /dev/ipmi0", but this is not default - // see https://github.com/netdata/netdata/pull/15564 for details - if (getenv("NETDATA_LISTENER_PORT")) - config_get_boolean(CONFIG_SECTION_PLUGINS, "freeipmi", CONFIG_BOOLEAN_NO); - - // store the errno for each plugins directory - // so that we don't log broken directories on each loop - int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 }; - - while (service_running(SERVICE_COLLECTORS)) { - int idx; - const char *directory_name; - - for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) { - if (unlikely(!service_running(SERVICE_COLLECTORS))) - break; - - errno = 0; - DIR *dir = opendir(directory_name); - if (unlikely(!dir)) { - if (directory_errors[idx] != errno) { - directory_errors[idx] = errno; - netdata_log_error("cannot open plugins directory '%s'", directory_name); - } - continue; - } - - struct dirent *file = NULL; - while (likely((file = readdir(dir)))) { - if (unlikely(!service_running(SERVICE_COLLECTORS))) - break; - - netdata_log_debug(D_PLUGINSD, "examining file '%s'", file->d_name); - - if (unlikely(strcmp(file->d_name, ".") == 0 || strcmp(file->d_name, "..") == 0)) - continue; - - int len = (int)strlen(file->d_name); - if (unlikely(len <= (int)PLUGINSD_FILE_SUFFIX_LEN)) - continue; - if (unlikely(strcmp(PLUGINSD_FILE_SUFFIX, &file->d_name[len - (int)PLUGINSD_FILE_SUFFIX_LEN]) != 0)) { - netdata_log_debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX); - continue; - } - - char pluginname[CONFIG_MAX_NAME + 1]; - snprintfz(pluginname, CONFIG_MAX_NAME, "%.*s", (int)(len - PLUGINSD_FILE_SUFFIX_LEN), file->d_name); - int enabled = config_get_boolean(CONFIG_SECTION_PLUGINS, pluginname, automatic_run); - - if (unlikely(!enabled)) { - netdata_log_debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name); - continue; - } - - // check if it runs already - struct plugind *cd; - for (cd = pluginsd_root; cd; cd = cd->next) - if (unlikely(strcmp(cd->filename, file->d_name) == 0)) - break; - - if (likely(cd && plugin_is_running(cd))) { - netdata_log_debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename); - continue; - } - - // it is not running - // allocate a new one, or use the obsolete one - if (unlikely(!cd)) { - cd = callocz(sizeof(struct plugind), 1); - - snprintfz(cd->id, CONFIG_MAX_NAME, "plugin:%s", pluginname); - - strncpyz(cd->filename, file->d_name, FILENAME_MAX); - snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename); - - cd->host = localhost; - cd->unsafe.enabled = enabled; - cd->unsafe.running = false; - - cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every); - cd->started_t = now_realtime_sec(); - - char *def = ""; - snprintfz( - cd->cmd, PLUGINSD_CMD_MAX, "exec %s %d %s", cd->fullfilename, cd->update_every, - config_get(cd->id, "command options", def)); - - // link it - DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(pluginsd_root, cd, prev, next); - - if (plugin_is_enabled(cd)) { - char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PD[%s]", pluginname); - - // spawn a new thread for it - netdata_thread_create(&cd->unsafe.thread, - tag, - NETDATA_THREAD_OPTION_DEFAULT, - pluginsd_worker_thread, - cd); - } - } - } - - closedir(dir); - } - - sleep((unsigned int)scan_frequency); - } - - netdata_thread_cleanup_pop(1); - return NULL; -} diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h deleted file mode 100644 index 37c70f7e..00000000 --- a/collectors/plugins.d/plugins_d.h +++ /dev/null @@ -1,67 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_PLUGINS_D_H -#define NETDATA_PLUGINS_D_H 1 - -#include "daemon/common.h" - -#define PLUGINSD_FILE_SUFFIX ".plugin" -#define PLUGINSD_FILE_SUFFIX_LEN strlen(PLUGINSD_FILE_SUFFIX) -#define PLUGINSD_CMD_MAX (FILENAME_MAX*2) -#define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0 - -#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD "FUNCTION_PAYLOAD" -#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "FUNCTION_PAYLOAD_END" - -#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE" -#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE" -#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB "DYNCFG_REGISTER_JOB" -#define PLUGINSD_KEYWORD_DYNCFG_RESET "DYNCFG_RESET" - -#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS" -#define PLUGINSD_KEYWORD_DELETE_JOB "DELETE_JOB" - -#define PLUGINSD_MAX_DIRECTORIES 20 -extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES]; - -struct plugind { - char id[CONFIG_MAX_NAME+1]; // config node id - - char filename[FILENAME_MAX+1]; // just the filename - char fullfilename[FILENAME_MAX+1]; // with path - char cmd[PLUGINSD_CMD_MAX+1]; // the command that it executes - - size_t successful_collections; // the number of times we have seen - // values collected from this plugin - - size_t serial_failures; // the number of times the plugin started - // without collecting values - - RRDHOST *host; // the host the plugin collects data for - int update_every; // the plugin default data collection frequency - - struct { - SPINLOCK spinlock; - bool running; // do not touch this structure after setting this to 1 - bool enabled; // if this is enabled or not - netdata_thread_t thread; - pid_t pid; - } unsafe; - - time_t started_t; - - const DICTIONARY_ITEM *cfg_dict_item; - struct configurable_plugin *configuration; - - struct plugind *prev; - struct plugind *next; -}; - -extern struct plugind *pluginsd_root; - -size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); -void pluginsd_process_thread_cleanup(void *ptr); - -size_t pluginsd_initialize_plugin_directories(); - -#endif /* NETDATA_PLUGINS_D_H */ diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c deleted file mode 100644 index 3b47c6c0..00000000 --- a/collectors/plugins.d/pluginsd_parser.c +++ /dev/null @@ -1,3208 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "pluginsd_parser.h" - -#define LOG_FUNCTIONS false - -#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING) -#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD) - -static ssize_t send_to_plugin(const char *txt, void *data) { - PARSER *parser = data; - - if(!txt || !*txt) - return 0; - -#ifdef ENABLE_H2O - if(parser->h2o_ctx) - return h2o_stream_write(parser->h2o_ctx, txt, strlen(txt)); -#endif - - errno = 0; - spinlock_lock(&parser->writer.spinlock); - ssize_t bytes = -1; - -#ifdef ENABLE_HTTPS - NETDATA_SSL *ssl = parser->ssl_output; - if(ssl) { - - if(SSL_connection(ssl)) - bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt)); - - else - netdata_log_error("PLUGINSD: cannot send command (SSL)"); - - spinlock_unlock(&parser->writer.spinlock); - return bytes; - } -#endif - - if(parser->fp_output) { - - bytes = fprintf(parser->fp_output, "%s", txt); - if(bytes <= 0) { - netdata_log_error("PLUGINSD: cannot send command (FILE)"); - bytes = -2; - } - else - fflush(parser->fp_output); - - spinlock_unlock(&parser->writer.spinlock); - return bytes; - } - - if(parser->fd != -1) { - bytes = 0; - ssize_t total = (ssize_t)strlen(txt); - ssize_t sent; - - do { - sent = write(parser->fd, &txt[bytes], total - bytes); - if(sent <= 0) { - netdata_log_error("PLUGINSD: cannot send command (fd)"); - spinlock_unlock(&parser->writer.spinlock); - return -3; - } - bytes += sent; - } - while(bytes < total); - - spinlock_unlock(&parser->writer.spinlock); - return (int)bytes; - } - - spinlock_unlock(&parser->writer.spinlock); - netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); - return -4; -} - -static inline RRDHOST *pluginsd_require_scope_host(PARSER *parser, const char *cmd) { - RRDHOST *host = parser->user.host; - - if(unlikely(!host)) - netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd); - - return host; -} - -static inline RRDSET *pluginsd_require_scope_chart(PARSER *parser, const char *cmd, const char *parent_cmd) { - RRDSET *st = parser->user.st; - - if(unlikely(!st)) - netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd); - - return st; -} - -static inline RRDSET *pluginsd_get_scope_chart(PARSER *parser) { - return parser->user.st; -} - -static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) { - if(parser->user.st && !parser->user.v2.locked_data_collection) { - spinlock_lock(&parser->user.st->data_collection_lock); - parser->user.v2.locked_data_collection = true; - } -} - -static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) { - if(parser->user.st && parser->user.v2.locked_data_collection) { - spinlock_unlock(&parser->user.st->data_collection_lock); - parser->user.v2.locked_data_collection = false; - return true; - } - - return false; -} - -static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) { - if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) { - if(stale) - netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked", - rrdhost_hostname(parser->user.st->rrdhost), - rrdset_id(parser->user.st), - keyword); - } - - if(unlikely(parser->user.v2.ml_locked)) { - ml_chart_update_end(parser->user.st); - parser->user.v2.ml_locked = false; - - if(stale) - netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked", - rrdhost_hostname(parser->user.st->rrdhost), - rrdset_id(parser->user.st), - keyword); - } -} - -static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) { - pluginsd_unlock_previous_scope_chart(parser, keyword, true); - - if(parser->user.cleanup_slots && parser->user.st) - rrdset_pluginsd_receive_unslot(parser->user.st); - - parser->user.st = NULL; - parser->user.cleanup_slots = false; -} - -static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) { - RRDSET *old_st = parser->user.st; - pid_t old_collector_tid = (old_st) ? old_st->pluginsd.collector_tid : 0; - pid_t my_collector_tid = gettid(); - - if(unlikely(old_collector_tid)) { - if(old_collector_tid != my_collector_tid) { - nd_log_limit_static_global_var(erl, 1, 0); - nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING, - "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)", - keyword ? keyword : "UNKNOWN", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - my_collector_tid, old_collector_tid); - - return false; - } - - old_st->pluginsd.collector_tid = 0; - } - - st->pluginsd.collector_tid = my_collector_tid; - - pluginsd_clear_scope_chart(parser, keyword); - - st->pluginsd.pos = 0; - parser->user.st = st; - parser->user.cleanup_slots = false; - - return true; -} - -static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) { - size_t wanted_size = st->pluginsd.size; - - if(slot >= 1) { - st->pluginsd.dims_with_slots = true; - wanted_size = slot; - } - else { - st->pluginsd.dims_with_slots = false; - wanted_size = dictionary_entries(st->rrddim_root_index); - } - - if(wanted_size > st->pluginsd.size) { - st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim)); - - // initialize the empty slots - for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) { - st->pluginsd.prd_array[i].rda = NULL; - st->pluginsd.prd_array[i].rd = NULL; - st->pluginsd.prd_array[i].id = NULL; - } - - st->pluginsd.size = wanted_size; - } - - if(st->pluginsd.dims_with_slots) { - struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1]; - - if(prd->rd != rd) { - prd->rda = rrddim_find_and_acquire(st, string2str(rd->id)); - prd->rd = rrddim_acquired_to_rrddim(prd->rda); - prd->id = string2str(prd->rd->id); - } - - if(obsolete) - parser->user.cleanup_slots = true; - } -} - -static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) { - if (unlikely(!dimension || !*dimension)) { - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", - rrdhost_hostname(host), rrdset_id(st), cmd); - return NULL; - } - - if (unlikely(!st->pluginsd.size)) { - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.", - rrdhost_hostname(host), rrdset_id(st), cmd); - return NULL; - } - - struct pluginsd_rrddim *prd; - RRDDIM *rd; - - if(likely(st->pluginsd.dims_with_slots)) { - // caching with slots - - if(unlikely(slot < 1 || slot > st->pluginsd.size)) { - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.", - rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size); - return NULL; - } - - prd = &st->pluginsd.prd_array[slot - 1]; - - rd = prd->rd; - if(likely(rd)) { -#ifdef NETDATA_INTERNAL_CHECKS - if(strcmp(prd->id, dimension) != 0) { - ssize_t t; - for(t = 0; t < st->pluginsd.size ;t++) { - if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0) - break; - } - if(t >= st->pluginsd.size) - t = -1; - - internal_fatal(true, - "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', " - "the right slot is %zd", - dimension, slot, prd->id, t); - } -#endif - return rd; - } - } - else { - // caching without slots - - if(unlikely(st->pluginsd.pos >= st->pluginsd.size)) - st->pluginsd.pos = 0; - - prd = &st->pluginsd.prd_array[st->pluginsd.pos++]; - - rd = prd->rd; - if(likely(rd)) { - const char *id = prd->id; - - if(strcmp(id, dimension) == 0) { - // we found it cached - return rd; - } - else { - // the cached one is not good for us - rrddim_acquired_release(prd->rda); - prd->rda = NULL; - prd->rd = NULL; - prd->id = NULL; - } - } - } - - // we need to find the dimension and set it to prd - - RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); - if (unlikely(!rda)) { - netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", - rrdhost_hostname(host), rrdset_id(st), dimension, cmd); - - return NULL; - } - - prd->rda = rda; - prd->rd = rd = rrddim_acquired_to_rrddim(rda); - prd->id = string2str(rd->id); - - return rd; -} - -static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { - if (unlikely(!chart || !*chart)) { - netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.", - rrdhost_hostname(host), cmd); - return NULL; - } - - RRDSET *st = rrdset_find(host, chart); - if (unlikely(!st)) - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.", - rrdhost_hostname(host), chart, cmd); - - return st; -} - -static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) { - ssize_t slot = -1; - char *id = get_word(words, num_words, 1); - if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] && - id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') { - slot = (ssize_t) str2ull_encoded(&id[5]); - if(slot < 0) slot = 0; // to make the caller increment its idx of the words - } - - return slot; -} - -static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) { - // clean possible old cached data - rrdset_pluginsd_receive_unslot(st); - - if(unlikely(slot < 1 || slot >= INT32_MAX)) - return; - - RRDHOST *host = st->rrdhost; - - if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) { - spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock); - size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size; - size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2; - - if(new_slots < (size_t)slot) - new_slots = slot; - - host->rrdpush.receive.pluginsd_chart_slots.array = - reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *)); - - for(size_t i = old_slots; i < new_slots ;i++) - host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL; - - host->rrdpush.receive.pluginsd_chart_slots.size = new_slots; - spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock); - } - - host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st; - st->pluginsd.last_slot = (int32_t)slot - 1; - parser->user.cleanup_slots = obsolete; -} - -static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) { - if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) - return pluginsd_find_chart(host, id, keyword); - - RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1]; - - if(!st) { - st = pluginsd_find_chart(host, id, keyword); - if(st) - pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)); - } - else { - internal_fatal(string_strcmp(st->id, id) != 0, - "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'", - slot - 1, id, string2str(st->id)); - } - - return st; -} - -static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) { - parser->user.enabled = 0; - - if(keyword && msg) { - nd_log_limit_static_global_var(erl, 1, 0); - nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO, - "PLUGINSD: keyword %s: %s", keyword, msg); - } - - return PARSER_RC_ERROR; -} - -static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) { - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *dimension = get_word(words, num_words, idx++); - char *value = get_word(words, num_words, idx++); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - st->pluginsd.set = true; - - if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", - rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET"); - - if (value && *value) - rrddim_set_by_pointer(st, rd, str2ll_encoded(value)); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) { - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *id = get_word(words, num_words, idx++); - char *microseconds_txt = get_word(words, num_words, idx++); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN)) - return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - usec_t microseconds = 0; - if (microseconds_txt && *microseconds_txt) { - long long t = str2ll(microseconds_txt, NULL); - if(t >= 0) - microseconds = t; - } - -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - if(st->replay.log_next_data_collection) { - st->replay.log_next_data_collection = false; - - internal_error(true, - "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu", - rrdhost_hostname(host), rrdset_id(st), - st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec, - st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec, - microseconds - ); - } -#endif - - if (likely(st->counter_done)) { - if (likely(microseconds)) { - if (parser->user.trust_durations) - rrdset_next_usec_unfiltered(st, microseconds); - else - rrdset_next_usec(st, microseconds); - } - else - rrdset_next(st); - } - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) { - char *tv_sec = get_word(words, num_words, 1); - char *tv_usec = get_word(words, num_words, 2); - char *pending_rrdset_next = get_word(words, num_words, 3); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); - - pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END); - parser->user.data_collections_count++; - - struct timeval tv = { - .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0, - .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0 - }; - - if(!tv.tv_sec) - now_realtime_timeval(&tv); - - rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false); - - return PARSER_RC_OK; -} - -static void pluginsd_host_define_cleanup(PARSER *parser) { - string_freez(parser->user.host_define.hostname); - rrdlabels_destroy(parser->user.host_define.rrdlabels); - - parser->user.host_define.hostname = NULL; - parser->user.host_define.rrdlabels = NULL; - parser->user.host_define.parsing_host = false; -} - -static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) { - if(uuid_parse(guid, *uuid)) - return false; - - uuid_unparse_lower(*uuid, output); - - return true; -} - -static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) { - char *guid = get_word(words, num_words, 1); - char *hostname = get_word(words, num_words, 2); - - if(unlikely(!guid || !*guid || !hostname || !*hostname)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters"); - - if(unlikely(parser->user.host_define.parsing_host)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, - "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?"); - - if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?"); - - parser->user.host_define.hostname = string_strdupz(hostname); - parser->user.host_define.rrdlabels = rrdlabels_create(); - parser->user.host_define.parsing_host = true; - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) { - char *name = get_word(words, num_words, 1); - char *value = get_word(words, num_words, 2); - - if(!name || !*name || !value) - return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters"); - - if(!parser->user.host_define.parsing_host || !labels) - return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); - - rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) { - return pluginsd_host_dictionary(words, num_words, parser, - parser->user.host_define.rrdlabels, - PLUGINSD_KEYWORD_HOST_LABEL); -} - -static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { - if(!parser->user.host_define.parsing_host) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); - - RRDHOST *host = rrdhost_find_or_create( - string2str(parser->user.host_define.hostname), - string2str(parser->user.host_define.hostname), - parser->user.host_define.machine_guid_str, - "Netdata Virtual Host 1.0", - netdata_configured_timezone, - netdata_configured_abbrev_timezone, - netdata_configured_utc_offset, - NULL, - program_name, - program_version, - default_rrd_update_every, - default_rrd_history_entries, - default_rrd_memory_mode, - default_health_enabled, - default_rrdpush_enabled, - default_rrdpush_destination, - default_rrdpush_api_key, - default_rrdpush_send_charts_matching, - default_rrdpush_enable_replication, - default_rrdpush_seconds_to_replicate, - default_rrdpush_replication_step, - rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels), - false); - - rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST); - - if(host->rrdlabels) { - rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels); - } - else { - host->rrdlabels = parser->user.host_define.rrdlabels; - parser->user.host_define.rrdlabels = NULL; - } - - pluginsd_host_define_cleanup(parser); - - parser->user.host = host; - pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END); - - rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); - rrdcontext_host_child_connected(host); - schedule_node_info_update(host); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) { - char *guid = get_word(words, num_words, 1); - - if(!guid || !*guid || strcmp(guid, "localhost") == 0) { - parser->user.host = localhost; - return PARSER_RC_OK; - } - - uuid_t uuid; - char uuid_str[UUID_STR_LEN]; - if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?"); - - RRDHOST *host = rrdhost_find_by_guid(uuid_str); - if(unlikely(!host)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?"); - - parser->user.host = host; - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) { - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *type = get_word(words, num_words, idx++); - char *name = get_word(words, num_words, idx++); - char *title = get_word(words, num_words, idx++); - char *units = get_word(words, num_words, idx++); - char *family = get_word(words, num_words, idx++); - char *context = get_word(words, num_words, idx++); - char *chart = get_word(words, num_words, idx++); - char *priority_s = get_word(words, num_words, idx++); - char *update_every_s = get_word(words, num_words, idx++); - char *options = get_word(words, num_words, idx++); - char *plugin = get_word(words, num_words, idx++); - char *module = get_word(words, num_words, idx++); - - // parse the id from type - char *id = NULL; - if (likely(type && (id = strchr(type, '.')))) { - *id = '\0'; - id++; - } - - // make sure we have the required variables - if (unlikely((!type || !*type || !id || !*id))) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters"); - - // parse the name, and make sure it does not include 'type.' - if (unlikely(name && *name)) { - // when data are streamed from child nodes - // name will be type.name - // so, we have to remove 'type.' from name too - size_t len = strlen(type); - if (strncmp(type, name, len) == 0 && name[len] == '.') - name = &name[len + 1]; - - // if the name is the same with the id, - // or is just 'NULL', clear it. - if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0)) - name = NULL; - } - - int priority = 1000; - if (likely(priority_s && *priority_s)) - priority = str2i(priority_s); - - int update_every = parser->user.cd->update_every; - if (likely(update_every_s && *update_every_s)) - update_every = str2i(update_every_s); - if (unlikely(!update_every)) - update_every = parser->user.cd->update_every; - - RRDSET_TYPE chart_type = RRDSET_TYPE_LINE; - if (unlikely(chart)) - chart_type = rrdset_type_id(chart); - - if (unlikely(name && !*name)) - name = NULL; - if (unlikely(family && !*family)) - family = NULL; - if (unlikely(context && !*context)) - context = NULL; - if (unlikely(!title)) - title = ""; - if (unlikely(!units)) - units = "unknown"; - - netdata_log_debug( - D_PLUGINSD, - "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d", - type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type), - priority, update_every); - - RRDSET *st = NULL; - - st = rrdset_create( - host, type, id, name, family, context, title, units, - (plugin && *plugin) ? plugin : parser->user.cd->filename, - module, priority, update_every, - chart_type); - - bool obsolete = false; - if (likely(st)) { - if (options && *options) { - if (strstr(options, "obsolete")) { - rrdset_is_obsolete___safe_from_collector_thread(st); - obsolete = true; - } - else - rrdset_isnot_obsolete___safe_from_collector_thread(st); - - if (strstr(options, "detail")) - rrdset_flag_set(st, RRDSET_FLAG_DETAIL); - else - rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); - - if (strstr(options, "hidden")) - rrdset_flag_set(st, RRDSET_FLAG_HIDDEN); - else - rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN); - - if (strstr(options, "store_first")) - rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST); - else - rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); - } - else { - rrdset_isnot_obsolete___safe_from_collector_thread(st); - rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); - rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); - } - - if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART)) - return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete); - } - else - pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) { - const char *first_entry_txt = get_word(words, num_words, 1); - const char *last_entry_txt = get_word(words, num_words, 2); - const char *wall_clock_time_txt = get_word(words, num_words, 3); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; - time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; - time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec(); - - bool ok = true; - if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { - -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - st->replay.start_streaming = false; - st->replay.after = 0; - st->replay.before = 0; -#endif - - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); - - ok = replicate_chart_request(send_to_plugin, parser, host, st, - first_entry_child, last_entry_child, child_wall_clock_time, - 0, 0); - } -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - else { - internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - } -#endif - - return ok ? PARSER_RC_OK : PARSER_RC_ERROR; -} - -static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) { - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *id = get_word(words, num_words, idx++); - char *name = get_word(words, num_words, idx++); - char *algorithm = get_word(words, num_words, idx++); - char *multiplier_s = get_word(words, num_words, idx++); - char *divisor_s = get_word(words, num_words, idx++); - char *options = get_word(words, num_words, idx++); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if (unlikely(!id)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id"); - - long multiplier = 1; - if (multiplier_s && *multiplier_s) { - multiplier = str2ll_encoded(multiplier_s); - if (unlikely(!multiplier)) - multiplier = 1; - } - - long divisor = 1; - if (likely(divisor_s && *divisor_s)) { - divisor = str2ll_encoded(divisor_s); - if (unlikely(!divisor)) - divisor = 1; - } - - if (unlikely(!algorithm || !*algorithm)) - algorithm = "absolute"; - - if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - netdata_log_debug( - D_PLUGINSD, - "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'", - rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor, - options ? options : ""); - - RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm)); - int unhide_dimension = 1; - - rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); - bool obsolete = false; - if (options && *options) { - if (strstr(options, "obsolete") != NULL) { - obsolete = true; - rrddim_is_obsolete___safe_from_collector_thread(st, rd); - } - else - rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); - - unhide_dimension = !strstr(options, "hidden"); - - if (strstr(options, "noreset") != NULL) - rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); - if (strstr(options, "nooverflow") != NULL) - rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); - } - else - rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); - - bool should_update_dimension = false; - - if (likely(unhide_dimension)) { - rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN); - should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); - } - else { - rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN); - should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); - } - - if (should_update_dimension) { - rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE); - rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - } - - pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete); - - return PARSER_RC_OK; -} - -// ---------------------------------------------------------------------------- -// execution of functions - -struct inflight_function { - int code; - int timeout; - STRING *function; - BUFFER *result_body_wb; - rrd_function_result_callback_t result_cb; - void *result_cb_data; - usec_t timeout_ut; - usec_t started_ut; - usec_t sent_ut; - const char *payload; - PARSER *parser; - bool virtual; -}; - -static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { - struct inflight_function *pf = func; - - PARSER *parser = parser_ptr; - - // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller - pf->code = HTTP_RESP_GATEWAY_TIMEOUT; - - const char *transaction = dictionary_acquired_item_name(item); - - char buffer[2048 + 1]; - snprintfz(buffer, sizeof(buffer) - 1, "%s %s %d \"%s\"\n", - pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION", - transaction, - pf->timeout, - string2str(pf->function)); - - // send the command to the plugin - ssize_t ret = send_to_plugin(buffer, parser); - - pf->sent_ut = now_realtime_usec(); - - if(ret < 0) { - netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret); - rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); - } - else { - internal_error(LOG_FUNCTIONS, - "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)", - string2str(pf->function), dictionary_acquired_item_name(item), ret, - pf->sent_ut - pf->started_ut); - } - - if (!pf->payload) - return; - - // send the payload to the plugin - ret = send_to_plugin(pf->payload, parser); - - if(ret < 0) { - netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret); - rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); - } - else { - internal_error(LOG_FUNCTIONS, - "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)", - string2str(pf->function), dictionary_acquired_item_name(item), ret, - pf->sent_ut - pf->started_ut); - } - - send_to_plugin("\nFUNCTION_PAYLOAD_END\n", parser); -} - -static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) { - struct inflight_function *pf = new_func; - - netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); - pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); - pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); - string_freez(pf->function); - - return false; -} - -void delete_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug, const char *fnc_sig, int code) { - if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) - return; - - char *params_local = strdupz(fnc_sig); - char *words[DYNCFG_MAX_WORDS]; - size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); - - if (words_c != 3) { - netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job"); - freez(params_local); - return; - } - - const char *module = words[1]; - const char *job = words[2]; - - delete_job(plug, module, job); - - unlink_job(plug->name, module, job); - - rrdpush_send_job_deleted(localhost, plug->name, module, job); - - freez(params_local); -} - -void set_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug __maybe_unused, const char *fnc_sig, int code) { - if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) - return; - - char *params_local = strdupz(fnc_sig); - char *words[DYNCFG_MAX_WORDS]; - size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); - - if (words_c != 3) { - netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config"); - freez(params_local); - return; - } - - const char *module_name = get_word(words, words_c, 1); - const char *job_name = get_word(words, words_c, 2); - - if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) { - freez(params_local); - return; - } - - // only send this if it is not existing already (register_job cares for that) - rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED); - - freez(params_local); -} - -static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) { - struct inflight_function *pf = func; - struct parser *parser = (struct parser *)parser_ptr; - - internal_error(LOG_FUNCTIONS, - "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)", - string2str(pf->function), dictionary_acquired_item_name(item), - buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); - - if (pf->virtual && SERVING_PLUGINSD(parser)) { - if (pf->payload) { - if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) - set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); - dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration); - } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) { - delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); - } - } - - pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); - - string_freez(pf->function); - freez((void *)pf->payload); -} - -void inflight_functions_init(PARSER *parser) { - parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0); - dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser); - dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser); - dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser); -} - -static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { - parser->inflight.smaller_timeout = 0; - struct inflight_function *pf; - dfe_start_write(parser->inflight.functions, pf) { - if (pf->timeout_ut < now) { - internal_error(true, - "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.", - string2str(pf->function), pf_dfe.name, now - pf->started_ut); - - if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK) - pf->code = rrd_call_function_error(pf->result_body_wb, - "Timeout waiting for collector response.", - HTTP_RESP_GATEWAY_TIMEOUT); - - dictionary_del(parser->inflight.functions, pf_dfe.name); - } - - else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout) - parser->inflight.smaller_timeout = pf->timeout_ut; - } - dfe_done(pf); -} - -void pluginsd_function_cancel(void *data) { - struct inflight_function *look_for = data, *t; - - bool sent = false; - dfe_start_read(look_for->parser->inflight.functions, t) { - if(look_for == t) { - const char *transaction = t_dfe.name; - - internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction); - - char buffer[2048 + 1]; - snprintfz(buffer, sizeof(buffer) - 1, "%s %s\n", - PLUGINSD_KEYWORD_FUNCTION_CANCEL, - transaction); - - // send the command to the plugin - ssize_t ret = send_to_plugin(buffer, t->parser); - if(ret < 0) - sent = true; - - break; - } - } - dfe_done(t); - - if(sent <= 0) - nd_log(NDLS_DAEMON, NDLP_DEBUG, - "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); -} - -// this is the function that is called from -// rrd_call_function_and_wait() and rrd_call_function_async() -static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function, - void *execute_cb_data, - rrd_function_result_callback_t result_cb, void *result_cb_data, - rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused, - void *is_cancelled_cb_data __maybe_unused, - rrd_function_register_canceller_cb_t register_canceller_cb, - void *register_canceller_db_data) { - PARSER *parser = execute_cb_data; - - usec_t now = now_realtime_usec(); - - struct inflight_function tmp = { - .started_ut = now, - .timeout_ut = now + timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT, - .result_body_wb = result_body_wb, - .timeout = timeout, - .function = string_strdupz(function), - .result_cb = result_cb, - .result_cb_data = result_cb_data, - .payload = NULL, - .parser = parser, - }; - - uuid_t uuid; - uuid_generate_random(uuid); - - char transaction[UUID_STR_LEN]; - uuid_unparse_lower(uuid, transaction); - - dictionary_write_lock(parser->inflight.functions); - - // if there is any error, our dictionary callbacks will call the caller callback to notify - // the caller about the error - no need for error handling here. - void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function)); - if(register_canceller_cb) - register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t); - - if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) - parser->inflight.smaller_timeout = tmp.timeout_ut; - - // garbage collect stale inflight functions - if(parser->inflight.smaller_timeout < now) - inflight_functions_garbage_collect(parser, now); - - dictionary_write_unlock(parser->inflight.functions); - - return HTTP_RESP_OK; -} - -static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) { - // a plugin or a child is registering a function - - bool global = false; - size_t i = 1; - if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { - i++; - global = true; - } - - char *name = get_word(words, num_words, i++); - char *timeout_s = get_word(words, num_words, i++); - char *help = get_word(words, num_words, i++); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_FUNCTION); - if(!host) return PARSER_RC_ERROR; - - RRDSET *st = (global)? NULL: pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART); - if(!st) global = true; - - if (unlikely(!timeout_s || !name || !help || (!global && !st))) { - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.", - rrdhost_hostname(host), - st?rrdset_id(st):"(unset)", - global?"yes":"no", - name?name:"(unset)", - timeout_s?timeout_s:"(unset)", - help?help:"(unset)" - ); - return PARSER_RC_ERROR; - } - - int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; - if (timeout_s && *timeout_s) { - timeout = str2i(timeout_s); - if (unlikely(timeout <= 0)) - timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; - } - - rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser); - - parser->user.data_collections_count++; - - return PARSER_RC_OK; -} - -static void pluginsd_function_result_end(struct parser *parser, void *action_data) { - STRING *key = action_data; - if(key) - dictionary_del(parser->inflight.functions, string2str(key)); - string_freez(key); - - parser->user.data_collections_count++; -} - -static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) { - char *key = get_word(words, num_words, 1); - char *status = get_word(words, num_words, 2); - char *format = get_word(words, num_words, 3); - char *expires = get_word(words, num_words, 4); - - if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) { - netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." - , key ? key : "(unset)" - , status ? status : "(unset)" - , format ? format : "(unset)" - , expires ? expires : "(unset)" - ); - } - - int code = (status && *status) ? str2i(status) : 0; - if (code <= 0) - code = HTTP_RESP_BACKEND_RESPONSE_INVALID; - - time_t expiration = (expires && *expires) ? str2l(expires) : 0; - - struct inflight_function *pf = NULL; - - if(key && *key) - pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key); - - if(!pf) { - netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)"); - } - else { - if(format && *format) - pf->result_body_wb->content_type = functions_format_to_content_type(format); - - pf->code = code; - - pf->result_body_wb->expires = expiration; - if(expiration <= now_realtime_sec()) - buffer_no_cacheable(pf->result_body_wb); - else - buffer_cacheable(pf->result_body_wb); - } - - parser->defer.response = (pf) ? pf->result_body_wb : NULL; - parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END; - parser->defer.action = pluginsd_function_result_end; - parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL - parser->flags |= PARSER_DEFER_UNTIL_KEYWORD; - - return PARSER_RC_OK; -} - -// ---------------------------------------------------------------------------- - -static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) { - char *name = get_word(words, num_words, 1); - char *value = get_word(words, num_words, 2); - NETDATA_DOUBLE v; - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_get_scope_chart(parser); - - int global = (st) ? 0 : 1; - - if (name && *name) { - if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) { - global = 1; - name = get_word(words, num_words, 2); - value = get_word(words, num_words, 3); - } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) { - global = 0; - name = get_word(words, num_words, 2); - value = get_word(words, num_words, 3); - } - } - - if (unlikely(!name || !*name)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name"); - - if (unlikely(!value || !*value)) - value = NULL; - - if (unlikely(!value)) { - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - (global) ? "HOST" : "CHART", - name); - return PARSER_RC_OK; - } - - if (!global && !st) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given"); - - char *endptr = NULL; - v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr); - if (unlikely(endptr && *endptr)) { - if (endptr == value) - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - value, - name); - else - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - value, - name, - endptr); - } - - if (global) { - const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name); - if (rva) { - rrdvar_custom_host_variable_set(host, rva, v); - rrdvar_custom_host_variable_release(host, rva); - } - else - netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'", - rrdhost_hostname(host), - name); - } else { - const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name); - if (rsa) { - rrdsetvar_custom_chart_variable_set(st, rsa, v); - rrdsetvar_custom_chart_variable_release(st, rsa); - } - else - netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'", - rrdhost_hostname(host), rrdset_id(st), name); - } - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { - netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH); - pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH); - parser->user.replay.start_time = 0; - parser->user.replay.end_time = 0; - parser->user.replay.start_time_ut = 0; - parser->user.replay.end_time_ut = 0; - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { - netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it."); - parser->user.enabled = 0; - return PARSER_RC_STOP; -} - -static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) { - const char *name = get_word(words, num_words, 1); - const char *label_source = get_word(words, num_words, 2); - const char *value = get_word(words, num_words, 3); - - if (!name || !label_source || !value) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters"); - - char *store = (char *)value; - bool allocated_store = false; - - if(unlikely(num_words > 4)) { - allocated_store = true; - store = mallocz(PLUGINSD_LINE_MAX + 1); - size_t remaining = PLUGINSD_LINE_MAX; - char *move = store; - char *word; - for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) { - if(i > 3) { - *move++ = ' '; - *move = '\0'; - remaining--; - } - - size_t length = strlen(word); - if (length > remaining) - length = remaining; - - remaining -= length; - memcpy(move, word, length); - move += length; - *move = '\0'; - } - } - - if(unlikely(!(parser->user.new_host_labels))) - parser->user.new_host_labels = rrdlabels_create(); - - if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) { - int is_ephemeral = appconfig_test_boolean_value((char *) value); - if (is_ephemeral) { - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL); - if (likely(host)) - rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST); - } - } - - rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source)); - - if (allocated_store) - freez(store); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels"); - - if(unlikely(!host->rrdlabels)) - host->rrdlabels = rrdlabels_create(); - - rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels); - if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST)) - rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG); - rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); - - rrdlabels_destroy(parser->user.new_host_labels); - parser->user.new_host_labels = NULL; - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) { - const char *name = get_word(words, num_words, 1); - const char *value = get_word(words, num_words, 2); - const char *label_source = get_word(words, num_words, 3); - - if (!name || !value || !label_source) { - netdata_log_error("Ignoring malformed or empty CHART LABEL command."); - return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - } - - if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) { - RRDSET *st = pluginsd_get_scope_chart(parser); - parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels; - rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily); - } - - rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source)); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - netdata_log_debug(D_PLUGINSD, "requested to commit chart labels"); - - if(!parser->user.chart_rrdlabels_linked_temporarily) { - netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - } - - rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily); - - rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); - rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - rrdset_metadata_updated(st); - - parser->user.chart_rrdlabels_linked_temporarily = NULL; - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) { - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *id = get_word(words, num_words, idx++); - char *start_time_str = get_word(words, num_words, idx++); - char *end_time_str = get_word(words, num_words, idx++); - char *child_now_str = get_word(words, num_words, idx++); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st; - if (likely(!id || !*id)) - st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); - else - st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_REPLAY_BEGIN); - - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN)) - return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if(start_time_str && end_time_str) { - time_t start_time = (time_t) str2ull_encoded(start_time_str); - time_t end_time = (time_t) str2ull_encoded(end_time_str); - - time_t wall_clock_time = 0, tolerance; - bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child; - if(child_now_str) { - wall_clock_time = (time_t) str2ull_encoded(child_now_str); - tolerance = st->update_every + 1; - wall_clock_comes_from_child = true; - } - - if(wall_clock_time <= 0) { - wall_clock_time = now_realtime_sec(); - tolerance = st->update_every + 5; - wall_clock_comes_from_child = false; - } - -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error( - (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), - "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).", - rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before); - - internal_error( - true, - "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time", - st->replay.after, st->replay.before); -#endif - - if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { - if (unlikely(end_time - start_time != st->update_every)) - rrdset_set_update_every_s(st, end_time - start_time); - - st->last_collected_time.tv_sec = end_time; - st->last_collected_time.tv_usec = 0; - - st->last_updated.tv_sec = end_time; - st->last_updated.tv_usec = 0; - - st->counter++; - st->counter_done++; - - // these are only needed for db mode RAM, SAVE, MAP, ALLOC - st->db.current_entry++; - if(st->db.current_entry >= st->db.entries) - st->db.current_entry -= st->db.entries; - - parser->user.replay.start_time = start_time; - parser->user.replay.end_time = end_time; - parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; - parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; - parser->user.replay.wall_clock_time = wall_clock_time; - parser->user.replay.rset_enabled = true; - - return PARSER_RC_OK; - } - - netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN - " from %ld to %ld, but timestamps are invalid " - "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, - rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, - wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", - tolerance); - } - - // the child sends an RBEGIN without any parameters initially - // setting rset_enabled to false, means the RSET should not store any metrics - // to store metrics, the RBEGIN needs to have timestamps - parser->user.replay.start_time = 0; - parser->user.replay.end_time = 0; - parser->user.replay.start_time_ut = 0; - parser->user.replay.end_time_ut = 0; - parser->user.replay.wall_clock_time = 0; - parser->user.replay.rset_enabled = false; - return PARSER_RC_OK; -} - -static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) { - SN_FLAGS flags = SN_FLAG_NONE; - - char c; - while ((c = *flags_str++)) { - switch (c) { - case 'A': - flags |= SN_FLAG_NOT_ANOMALOUS; - break; - - case 'R': - flags |= SN_FLAG_RESET; - break; - - case 'E': - flags = SN_EMPTY_SLOT; - return flags; - - default: - internal_error(true, "Unknown SN_FLAGS flag '%c'", c); - break; - } - } - - return flags; -} - -static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) { - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *dimension = get_word(words, num_words, idx++); - char *value_str = get_word(words, num_words, idx++); - char *flags_str = get_word(words, num_words, idx++); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if(!parser->user.replay.rset_enabled) { - nd_log_limit_static_thread_var(erl, 1, 0); - nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR, - "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", - rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); - - // we have to return OK here - return PARSER_RC_OK; - } - - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_SET); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - st->pluginsd.set = true; - - if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) { - netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.", - rrdhost_hostname(host), - rrdset_id(st), - dimension, - PLUGINSD_KEYWORD_REPLAY_SET, - parser->user.replay.start_time, - parser->user.replay.end_time, - PLUGINSD_KEYWORD_REPLAY_BEGIN); - return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - } - - if (unlikely(!value_str || !*value_str)) - value_str = "NAN"; - - if(unlikely(!flags_str)) - flags_str = ""; - - if (likely(value_str)) { - RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); - - if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) { - NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL); - SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str); - - if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) { - value = NAN; - flags = SN_EMPTY_SLOT; - } - - rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags); - rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time; - rd->collector.last_collected_time.tv_usec = 0; - rd->collector.counter++; - } - else { - nd_log_limit_static_global_var(erl, 1, 0); - nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING, - "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. " - "Ignoring data.", - rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd)); - } - } - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) { - if(parser->user.replay.rset_enabled == false) - return PARSER_RC_OK; - - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *dimension = get_word(words, num_words, idx++); - char *last_collected_ut_str = get_word(words, num_words, idx++); - char *last_collected_value_str = get_word(words, num_words, idx++); - char *last_calculated_value_str = get_word(words, num_words, idx++); - char *last_stored_value_str = get_word(words, num_words, idx++); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if(st->pluginsd.set) { - // reset pos to reuse the same RDAs - st->pluginsd.pos = 0; - st->pluginsd.set = false; - } - - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec; - usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; - if(last_collected_ut > dim_last_collected_ut) { - rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); - rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); - } - - rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0; - rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0; - rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0; - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) { - if(parser->user.replay.rset_enabled == false) - return PARSER_RC_OK; - - char *last_collected_ut_str = get_word(words, num_words, 1); - char *last_updated_ut_str = get_word(words, num_words, 2); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, - PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec; - usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; - if(last_collected_ut > chart_last_collected_ut) { - st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); - st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); - } - - usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec; - usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0; - if(last_updated_ut > chart_last_updated_ut) { - st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC); - st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC); - } - - st->counter++; - st->counter_done++; - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) { - if (num_words < 7) { // accepts 7, but the 7th is optional - netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); - return PARSER_RC_ERROR; - } - - const char *update_every_child_txt = get_word(words, num_words, 1); - const char *first_entry_child_txt = get_word(words, num_words, 2); - const char *last_entry_child_txt = get_word(words, num_words, 3); - const char *start_streaming_txt = get_word(words, num_words, 4); - const char *first_entry_requested_txt = get_word(words, num_words, 5); - const char *last_entry_requested_txt = get_word(words, num_words, 6); - const char *child_world_time_txt = get_word(words, num_words, 7); // optional - - time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt); - time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt); - time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt); - - bool start_streaming = (strcmp(start_streaming_txt, "true") == 0); - time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt); - time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt); - - // the optional child world time - time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded( - child_world_time_txt) : now_realtime_sec(); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error(true, - "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu", - rrdhost_hostname(host), rrdset_id(st), - (unsigned long long)first_entry_child, (unsigned long long)last_entry_child, - start_streaming?"true":"false", - (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested, - (unsigned long long)child_world_time - ); -#endif - - parser->user.data_collections_count++; - - if(parser->user.replay.rset_enabled && st->rrdhost->receiver) { - time_t now = now_realtime_sec(); - time_t started = st->rrdhost->receiver->replication_first_time_t; - time_t current = parser->user.replay.end_time; - - if(started && current > started) { - host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started); - worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, - host->rrdpush_receiver_replication_percent); - } - } - - parser->user.replay.start_time = 0; - parser->user.replay.end_time = 0; - parser->user.replay.start_time_ut = 0; - parser->user.replay.end_time_ut = 0; - parser->user.replay.wall_clock_time = 0; - parser->user.replay.rset_enabled = false; - - st->counter++; - st->counter_done++; - store_metric_collection_completed(); - -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - st->replay.start_streaming = false; - st->replay.after = 0; - st->replay.before = 0; - if(start_streaming) - st->replay.log_next_data_collection = true; -#endif - - if (start_streaming) { - if (st->update_every != update_every_child) - rrdset_set_update_every_s(st, update_every_child); - - if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK); - rrdhost_receiver_replicating_charts_minus_one(st->rrdhost); - } -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - else - internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.", - rrdhost_hostname(host), rrdset_id(st)); -#endif - - pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END); - - host->rrdpush_receiver_replication_percent = 100.0; - worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent); - - return PARSER_RC_OK; - } - - pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END); - - rrdcontext_updated_retention_rrdset(st); - - bool ok = replicate_chart_request(send_to_plugin, parser, host, st, - first_entry_child, last_entry_child, child_world_time, - first_entry_requested, last_entry_requested); - return ok ? PARSER_RC_OK : PARSER_RC_ERROR; -} - -static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) { - timing_init(); - - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *id = get_word(words, num_words, idx++); - char *update_every_str = get_word(words, num_words, idx++); - char *end_time_str = get_word(words, num_words, idx++); - char *wall_clock_time_str = get_word(words, num_words, idx++); - - if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - timing_step(TIMING_STEP_BEGIN2_PREPARE); - - RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2); - - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2)) - return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) - rrdset_isnot_obsolete___safe_from_collector_thread(st); - - timing_step(TIMING_STEP_BEGIN2_FIND_CHART); - - // ------------------------------------------------------------------------ - // parse the parameters - - time_t update_every = (time_t) str2ull_encoded(update_every_str); - time_t end_time = (time_t) str2ull_encoded(end_time_str); - - time_t wall_clock_time; - if(likely(*wall_clock_time_str == '#')) - wall_clock_time = end_time; - else - wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str); - - if (unlikely(update_every != st->update_every)) - rrdset_set_update_every_s(st, update_every); - - timing_step(TIMING_STEP_BEGIN2_PARSE); - - // ------------------------------------------------------------------------ - // prepare our state - - pluginsd_lock_rrdset_data_collection(parser); - - parser->user.v2.update_every = update_every; - parser->user.v2.end_time = end_time; - parser->user.v2.wall_clock_time = wall_clock_time; - parser->user.v2.ml_locked = ml_chart_update_begin(st); - - timing_step(TIMING_STEP_BEGIN2_ML); - - // ------------------------------------------------------------------------ - // propagate it forward in v2 - - if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost)) - parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time); - - if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) { - // check receiver capabilities - bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); - - // check sender capabilities - bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false; - NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; - - BUFFER *wb = parser->user.v2.stream_buffer.wb; - - buffer_need_bytes(wb, 1024); - - if(unlikely(parser->user.v2.stream_buffer.begin_v2_added)) - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); - - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); - - if(with_slots) { - buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); - buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot); - } - - buffer_fast_strcat(wb, " '", 2); - buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); - buffer_fast_strcat(wb, "' ", 2); - - if(can_copy) - buffer_strcat(wb, update_every_str); - else - buffer_print_uint64_encoded(wb, integer_encoding, update_every); - - buffer_fast_strcat(wb, " ", 1); - - if(can_copy) - buffer_strcat(wb, end_time_str); - else - buffer_print_uint64_encoded(wb, integer_encoding, end_time); - - buffer_fast_strcat(wb, " ", 1); - - if(can_copy) - buffer_strcat(wb, wall_clock_time_str); - else - buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); - - buffer_fast_strcat(wb, "\n", 1); - - parser->user.v2.stream_buffer.last_point_end_time_s = end_time; - parser->user.v2.stream_buffer.begin_v2_added = true; - } - - timing_step(TIMING_STEP_BEGIN2_PROPAGATE); - - // ------------------------------------------------------------------------ - // store it - - st->last_collected_time.tv_sec = end_time; - st->last_collected_time.tv_usec = 0; - st->last_updated.tv_sec = end_time; - st->last_updated.tv_usec = 0; - st->counter++; - st->counter_done++; - - // these are only needed for db mode RAM, SAVE, MAP, ALLOC - st->db.current_entry++; - if(st->db.current_entry >= st->db.entries) - st->db.current_entry -= st->db.entries; - - timing_step(TIMING_STEP_BEGIN2_STORE); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) { - timing_init(); - - int idx = 1; - ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); - if(slot >= 0) idx++; - - char *dimension = get_word(words, num_words, idx++); - char *collected_str = get_word(words, num_words, idx++); - char *value_str = get_word(words, num_words, idx++); - char *flags_str = get_word(words, num_words, idx++); - - if(unlikely(!dimension || !collected_str || !value_str || !flags_str)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - timing_step(TIMING_STEP_SET2_PREPARE); - - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2); - if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - st->pluginsd.set = true; - - if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED))) - rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); - - timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION); - - // ------------------------------------------------------------------------ - // parse the parameters - - collected_number collected_value = (collected_number) str2ll_encoded(collected_str); - - NETDATA_DOUBLE value; - if(*value_str == '#') - value = (NETDATA_DOUBLE)collected_value; - else - value = str2ndd_encoded(value_str, NULL); - - SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str); - - timing_step(TIMING_STEP_SET2_PARSE); - - // ------------------------------------------------------------------------ - // check value and ML - - if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) { - value = NAN; - flags = SN_EMPTY_SLOT; - - if(parser->user.v2.ml_locked) - ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false); - } - else if(parser->user.v2.ml_locked) { - if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) { - // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous - flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS); - } - else - flags |= SN_FLAG_NOT_ANOMALOUS; - } - - timing_step(TIMING_STEP_SET2_ML); - - // ------------------------------------------------------------------------ - // propagate it forward in v2 - - if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) { - // check if receiver and sender have the same number parsing capabilities - bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); - - // check the sender capabilities - bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false; - NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; - NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; - - BUFFER *wb = parser->user.v2.stream_buffer.wb; - buffer_need_bytes(wb, 1024); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); - - if(with_slots) { - buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); - buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); - } - - buffer_fast_strcat(wb, " '", 2); - buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); - buffer_fast_strcat(wb, "' ", 2); - if(can_copy) - buffer_strcat(wb, collected_str); - else - buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex - buffer_fast_strcat(wb, " ", 1); - if(can_copy) - buffer_strcat(wb, value_str); - else - buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal - buffer_fast_strcat(wb, " ", 1); - buffer_print_sn_flags(wb, flags, true); - buffer_fast_strcat(wb, "\n", 1); - } - - timing_step(TIMING_STEP_SET2_PROPAGATE); - - // ------------------------------------------------------------------------ - // store it - - rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags); - rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time; - rd->collector.last_collected_time.tv_usec = 0; - rd->collector.last_collected_value = collected_value; - rd->collector.last_stored_value = value; - rd->collector.last_calculated_value = value; - rd->collector.counter++; - rrddim_set_updated(rd); - - timing_step(TIMING_STEP_SET2_STORE); - - return PARSER_RC_OK; -} - -void pluginsd_cleanup_v2(PARSER *parser) { - // this is called when the thread is stopped while processing - pluginsd_clear_scope_chart(parser, "THREAD CLEANUP"); -} - -static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { - timing_init(); - - RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - - parser->user.data_collections_count++; - - timing_step(TIMING_STEP_END2_PREPARE); - - // ------------------------------------------------------------------------ - // propagate the whole chart update in v1 - - if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb)) - rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st); - - timing_step(TIMING_STEP_END2_PUSH_V1); - - // ------------------------------------------------------------------------ - // unblock data collection - - pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false); - rrdcontext_collected_rrdset(st); - store_metric_collection_completed(); - - timing_step(TIMING_STEP_END2_RRDSET); - - // ------------------------------------------------------------------------ - // propagate it forward - - rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st); - - timing_step(TIMING_STEP_END2_PROPAGATE); - - // ------------------------------------------------------------------------ - // cleanup RRDSET / RRDDIM - - if(likely(st->pluginsd.dims_with_slots)) { - for(size_t i = 0; i < st->pluginsd.size ;i++) { - RRDDIM *rd = st->pluginsd.prd_array[i].rd; - - if(!rd) - continue; - - rd->collector.calculated_value = 0; - rd->collector.collected_value = 0; - rrddim_clear_updated(rd); - } - } - else { - RRDDIM *rd; - rrddim_foreach_read(rd, st){ - rd->collector.calculated_value = 0; - rd->collector.collected_value = 0; - rrddim_clear_updated(rd); - } - rrddim_foreach_done(rd); - } - - // ------------------------------------------------------------------------ - // reset state - - parser->user.v2 = (struct parser_user_object_v2){ 0 }; - - timing_step(TIMING_STEP_END2_STORE); - timing_report(); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { - netdata_log_info("PLUGINSD: plugin called EXIT."); - return PARSER_RC_STOP; -} - -struct mutex_cond { - pthread_mutex_t lock; - pthread_cond_t cond; - int rc; -}; - -static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *callback_data) -{ - struct mutex_cond *ctx = callback_data; - pthread_mutex_lock(&ctx->lock); - ctx->rc = code; - pthread_cond_broadcast(&ctx->cond); - pthread_mutex_unlock(&ctx->lock); -} - -#define VIRT_FNC_TIMEOUT 1 -#define VIRT_FNC_BUF_SIZE (4096) -void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) { - PARSER *parser = NULL; - - //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all) - char *words[PLUGINSD_MAX_WORDS]; - char *function_with_params = strdupz(name); - size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd); - - if (num_words < 2) { - netdata_log_error("PLUGINSD: virtual function name is empty."); - freez(function_with_params); - return; - } - - const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1)); - if (unlikely(cpi == NULL)) { - netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name); - freez(function_with_params); - return; - } - struct configurable_plugin *cp = dictionary_acquired_item_value(cpi); - parser = (PARSER *)cp->cb_usr_ctx; - - BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL); - // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name) - buffer_strcat(function_out, get_word(words, num_words, 0)); - for (size_t i = 1; i < num_words; i++) { - if (i == 1 && SERVING_PLUGINSD(parser)) - continue; - buffer_sprintf(function_out, " %s", get_word(words, num_words, i)); - } - freez(function_with_params); - - usec_t now = now_realtime_usec(); - - struct inflight_function tmp = { - .started_ut = now, - .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, - .result_body_wb = wb, - .timeout = VIRT_FNC_TIMEOUT * 10, - .function = string_strdupz(buffer_tostring(function_out)), - .result_cb = callback, - .result_cb_data = callback_data, - .payload = payload != NULL ? strdupz(payload) : NULL, - .virtual = true, - }; - buffer_free(function_out); - - uuid_t uuid; - uuid_generate_time(uuid); - - char key[UUID_STR_LEN]; - uuid_unparse_lower(uuid, key); - - dictionary_write_lock(parser->inflight.functions); - - // if there is any error, our dictionary callbacks will call the caller callback to notify - // the caller about the error - no need for error handling here. - dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); - - if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) - parser->inflight.smaller_timeout = tmp.timeout_ut; - - // garbage collect stale inflight functions - if(parser->inflight.smaller_timeout < now) - inflight_functions_garbage_collect(parser, now); - - dictionary_write_unlock(parser->inflight.functions); -} - - -dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) { - usec_t now = now_realtime_usec(); - BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL); - - struct mutex_cond cond = { - .lock = PTHREAD_MUTEX_INITIALIZER, - .cond = PTHREAD_COND_INITIALIZER - }; - - struct inflight_function tmp = { - .started_ut = now, - .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, - .result_body_wb = wb, - .timeout = VIRT_FNC_TIMEOUT, - .function = string_strdupz(name), - .result_cb = virt_fnc_got_data_cb, - .result_cb_data = &cond, - .payload = payload != NULL ? strdupz(payload) : NULL, - .virtual = true, - }; - - uuid_t uuid; - uuid_generate_time(uuid); - - char key[UUID_STR_LEN]; - uuid_unparse_lower(uuid, key); - - dictionary_write_lock(parser->inflight.functions); - - // if there is any error, our dictionary callbacks will call the caller callback to notify - // the caller about the error - no need for error handling here. - dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); - - if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) - parser->inflight.smaller_timeout = tmp.timeout_ut; - - // garbage collect stale inflight functions - if(parser->inflight.smaller_timeout < now) - inflight_functions_garbage_collect(parser, now); - - dictionary_write_unlock(parser->inflight.functions); - - struct timespec tp; - clock_gettime(CLOCK_REALTIME, &tp); - tp.tv_sec += (time_t)VIRT_FNC_TIMEOUT; - - pthread_mutex_lock(&cond.lock); - - int ret = pthread_cond_timedwait(&cond.cond, &cond.lock, &tp); - if (ret == ETIMEDOUT) - netdata_log_error("PLUGINSD: DYNCFG virtual function %s timed out", name); - - pthread_mutex_unlock(&cond.lock); - - dyncfg_config_t cfg; - cfg.data = strdupz(buffer_tostring(wb)); - cfg.data_size = buffer_strlen(wb); - - if (rc != NULL) - *rc = cond.rc; - - buffer_free(wb); - return cfg; -} - -#define CVF_MAX_LEN (1024) -static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name) -{ - PARSER *parser = usr_ctx; - - if (SERVING_STREAMING(parser)) { - char buf[CVF_MAX_LEN + 1]; - snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); - } - - return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL); -} - -static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name) -{ - PARSER *parser = usr_ctx; - - if (SERVING_STREAMING(parser)) { - char buf[CVF_MAX_LEN + 1]; - snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); - } - - return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL); -} - -static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG); - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - buffer_sprintf(wb, " %s", module_name); - - dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); - - buffer_free(wb); - - return ret; -} - -static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA); - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - buffer_sprintf(wb, " %s", module_name); - - dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); - - buffer_free(wb); - - return ret; -} - -static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA); - - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - buffer_sprintf(wb, " %s", module_name); - - dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); - - buffer_free(wb); - - return ret; -} - -static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG); - - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - buffer_sprintf(wb, " %s %s", module_name, job_name); - - dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); - - buffer_free(wb); - - return ret; -} - -enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG); - - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - int rc; - call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - - buffer_free(wb); - if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) - return SET_CONFIG_REJECTED; - return SET_CONFIG_ACCEPTED; -} - -enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG); - - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - buffer_sprintf(wb, " %s", module_name); - - int rc; - call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - - buffer_free(wb); - - if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) - return SET_CONFIG_REJECTED; - return SET_CONFIG_ACCEPTED; -} - -enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG); - - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - buffer_sprintf(wb, " %s %s", module_name, job_name); - - int rc; - call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - - buffer_free(wb); - - if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) - return SET_CONFIG_REJECTED; - return SET_CONFIG_ACCEPTED; -} - -enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name) -{ - PARSER *parser = usr_ctx; - BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); - - buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB); - - if (SERVING_STREAMING(parser)) - buffer_sprintf(wb, " %s", plugin_name); - - buffer_sprintf(wb, " %s %s", module_name, job_name); - - int rc; - call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL); - - buffer_free(wb); - - if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) - return SET_CONFIG_REJECTED; - return SET_CONFIG_ACCEPTED; -} - - -static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { - netdata_log_info("PLUGINSD: DYNCFG_ENABLE"); - - if (unlikely (num_words != 2)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "missing name parameter"); - - struct configurable_plugin *cfg = callocz(1, sizeof(struct configurable_plugin)); - - cfg->name = strdupz(words[1]); - cfg->set_config_cb = set_plugin_config_cb; - cfg->get_config_cb = get_plugin_config_cb; - cfg->get_config_schema_cb = get_plugin_config_schema_cb; - cfg->cb_usr_ctx = parser; - - const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser)); - if (unlikely(di == NULL)) { - freez(cfg->name); - freez(cfg); - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin"); - } - - if (SERVING_PLUGINSD(parser)) { - // this is optimization for pluginsd to avoid extra dictionary lookup - // as we know which plugin is comunicating with us - parser->user.cd->cfg_dict_item = di; - parser->user.cd->configuration = cfg; - } else { - // register_plugin keeps the item acquired, so we need to release it - dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); - } - - rrdpush_send_dyncfg_enable(parser->user.host, cfg->name); - - return PARSER_RC_OK; -} - -#define LOG_MSG_SIZE (1024) -#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2) -#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3) -static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { - netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE"); - - size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4; - - if (unlikely(num_words != expected_num_words)) { - char log[LOG_MSG_SIZE + 1]; - snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log); - } - - struct configurable_plugin *plug_cfg; - const DICTIONARY_ITEM *di = NULL; - if (SERVING_PLUGINSD(parser)) { - plug_cfg = parser->user.cd->configuration; - if (unlikely(plug_cfg == NULL)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); - } else { - di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); - if (unlikely(di == NULL)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found"); - - plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di); - } - - struct module *mod = callocz(1, sizeof(struct module)); - - mod->type = str2_module_type(words[MODULE_TYPE_IDX]); - if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) { - freez(mod); - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)"); - } - - mod->name = strdupz(words[MODULE_NAME_IDX]); - - mod->set_config_cb = set_module_config_cb; - mod->get_config_cb = get_module_config_cb; - mod->get_config_schema_cb = get_module_config_schema_cb; - mod->config_cb_usr_ctx = parser; - - mod->get_job_config_cb = get_job_config_cb; - mod->get_job_config_schema_cb = get_job_config_schema_cb; - mod->set_job_config_cb = set_job_config_cb; - mod->delete_job_cb = delete_job_cb; - mod->job_config_cb_usr_ctx = parser; - - register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser)); - - if (di != NULL) - dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); - - rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) { - const char *module_name = words[0]; - const char *job_name = words[1]; - const char *job_type_str = words[2]; - const char *flags_str = words[3]; - - long f = str2l(flags_str); - - if (f < 0) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags received"); - - dyncfg_job_flg_t flags = f; - - if (SERVING_PLUGINSD(parser)) - flags |= JOB_FLG_PLUGIN_PUSHED; - else - flags |= JOB_FLG_STREAMING_PUSHED; - - enum job_type job_type = dyncfg_str2job_type(job_type_str); - if (job_type == JOB_TYPE_UNKNOWN) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type"); - - if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)"); - - if (register_job(parser->user.host->configurable_plugins, plugin_name, module_name, job_name, job_type, flags, 0)) // ignore existing is off as this is explicitly called register job - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job"); - - rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, module_name, job_name, job_type, flags); - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { - size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6; - - if (unlikely(num_words != expected_num_words)) { - char log[LOG_MSG_SIZE + 1]; - snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log); - } - - if (SERVING_PLUGINSD(parser)) { - return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); - } - return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]); -} - -static inline PARSER_RC pluginsd_dyncfg_reset(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { - if (unlikely(num_words != (SERVING_PLUGINSD(parser) ? 1 : 2))) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, SERVING_PLUGINSD(parser) ? "expected 0 parameters" : "expected 1 parameter: plugin_name"); - - if (SERVING_PLUGINSD(parser)) { - unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item); - rrdpush_send_dyncfg_reset(parser->user.host, parser->user.cd->configuration->name); - parser->user.cd->configuration = NULL; - } else { - const DICTIONARY_ITEM *di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); - if (unlikely(di == NULL)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, "plugin not found"); - unregister_plugin(parser->user.host->configurable_plugins, di); - rrdpush_send_dyncfg_reset(parser->user.host, words[1]); - } - - return PARSER_RC_OK; -} - -static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) { - int state = str2i(words[3]); - - enum job_status status = str2job_state(words[2]); - if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status"); - - char *message = NULL; - if (num_words == 5 && strlen(words[4]) > 0) - message = words[4]; - - const DICTIONARY_ITEM *plugin_item; - DICTIONARY *job_dict; - const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message); - - if (job_item != NULL) { - struct job *job = dictionary_acquired_item_value(job_item); - rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job); - - pthread_mutex_unlock(&job->lock); - dictionary_acquired_item_release(job_dict, job_item); - dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item); - } - - return PARSER_RC_OK; -} - -// job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message] -static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) { - if (SERVING_PLUGINSD(parser)) { - if (unlikely(num_words != 5 && num_words != 6)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); - } else { - if (unlikely(num_words != 6 && num_words != 7)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]"); - } - - if (SERVING_PLUGINSD(parser)) { - return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); - } - return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]); -} - -static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) { - // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function - // they are of opossite direction - if (num_words != 4) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name"); - - const char *plugin_name = get_word(words, num_words, 1); - const char *module_name = get_word(words, num_words, 2); - const char *job_name = get_word(words, num_words, 3); - - if (SERVING_STREAMING(parser)) - delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name); - - // forward to parent if any - rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name); - return PARSER_RC_OK; -} - -static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser) -{ - const char *host_uuid_str = get_word(words, num_words, 1); - const char *claim_id_str = get_word(words, num_words, 2); - - if (!host_uuid_str || !claim_id_str) { - netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", - host_uuid_str ? host_uuid_str : "[unset]", - claim_id_str ? claim_id_str : "[unset]"); - return PARSER_RC_ERROR; - } - - uuid_t uuid; - RRDHOST *host = parser->user.host; - - // We don't need the parsed UUID - // just do it to check the format - if(uuid_parse(host_uuid_str, uuid)) { - netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); - return PARSER_RC_ERROR; - } - if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) { - netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); - return PARSER_RC_ERROR; - } - - if(strcmp(host_uuid_str, host->machine_guid) != 0) { - netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); - return PARSER_RC_OK; //the message is OK problem must be somewhere else - } - - rrdhost_aclk_state_lock(host); - - if (host->aclk_state.claimed_id) - freez(host->aclk_state.claimed_id); - - host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - - rrdhost_aclk_state_unlock(host); - - rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); - - rrdpush_send_claimed_id(host); - - return PARSER_RC_OK; -} - -// ---------------------------------------------------------------------------- - -void pluginsd_process_thread_cleanup(void *ptr) { - PARSER *parser = (PARSER *)ptr; - - pluginsd_cleanup_v2(parser); - pluginsd_host_define_cleanup(parser); - - rrd_collector_finished(); - -#ifdef NETDATA_LOG_STREAM_RECEIVE - if(parser->user.stream_log_fp) { - fclose(parser->user.stream_log_fp); - parser->user.stream_log_fp = NULL; - } -#endif - - parser_destroy(parser); -} - -bool parser_reconstruct_node(BUFFER *wb, void *ptr) { - PARSER *parser = ptr; - if(!parser || !parser->user.host) - return false; - - buffer_strcat(wb, rrdhost_hostname(parser->user.host)); - return true; -} - -bool parser_reconstruct_instance(BUFFER *wb, void *ptr) { - PARSER *parser = ptr; - if(!parser || !parser->user.st) - return false; - - buffer_strcat(wb, rrdset_name(parser->user.st)); - return true; -} - -bool parser_reconstruct_context(BUFFER *wb, void *ptr) { - PARSER *parser = ptr; - if(!parser || !parser->user.st) - return false; - - buffer_strcat(wb, string2str(parser->user.st->context)); - return true; -} - -inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) -{ - int enabled = cd->unsafe.enabled; - - if (!fp_plugin_input || !fp_plugin_output || !enabled) { - cd->unsafe.enabled = 0; - return 0; - } - - if (unlikely(fileno(fp_plugin_input) == -1)) { - netdata_log_error("input file descriptor given is not a valid stream"); - cd->serial_failures++; - return 0; - } - - if (unlikely(fileno(fp_plugin_output) == -1)) { - netdata_log_error("output file descriptor given is not a valid stream"); - cd->serial_failures++; - return 0; - } - - clearerr(fp_plugin_input); - clearerr(fp_plugin_output); - - PARSER *parser; - { - PARSER_USER_OBJECT user = { - .enabled = cd->unsafe.enabled, - .host = host, - .cd = cd, - .trust_durations = trust_durations - }; - - // fp_plugin_output = our input; fp_plugin_input = our output - parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); - } - - pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD); - - rrd_collector_started(); - - size_t count = 0; - - // this keeps the parser with its current value - // so, parser needs to be allocated before pushing it - netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - - { - ND_LOG_STACK lgs[] = { - ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), - ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), - ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), - ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), - ND_LOG_FIELD_END(), - }; - ND_LOG_STACK_PUSH(lgs); - - buffered_reader_init(&parser->reader); - BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); - while(likely(service_running(SERVICE_COLLECTORS))) { - - if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { - buffered_reader_ret_t ret = buffered_reader_read_timeout( - &parser->reader, - fileno((FILE *) parser->fp_input), - 2 * 60 * MSEC_PER_SEC, true - ); - - if(unlikely(ret != BUFFERED_READER_READ_OK)) - break; - - continue; - } - - if(unlikely(parser_action(parser, buffer->buffer))) - break; - - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - buffer_free(buffer); - - cd->unsafe.enabled = parser->user.enabled; - count = parser->user.data_collections_count; - - if(likely(count)) { - cd->successful_collections += count; - cd->serial_failures = 0; - } - else - cd->serial_failures++; - } - - // free parser with the pop function - netdata_thread_cleanup_pop(1); - - return count; -} - -void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) { - parser_init_repertoire(parser, repertoire); - - if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) - inflight_functions_init(parser); -} - -PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, - PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) { - PARSER *parser; - - parser = callocz(1, sizeof(*parser)); - if(user) - parser->user = *user; - parser->fd = fd; - parser->fp_input = fp_input; - parser->fp_output = fp_output; -#ifdef ENABLE_HTTPS - parser->ssl_output = ssl; -#endif - parser->flags = flags; - - spinlock_init(&parser->writer.spinlock); - return parser; -} - -PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) { - switch(keyword->id) { - case 1: - return pluginsd_set_v2(words, num_words, parser); - - case 2: - return pluginsd_begin_v2(words, num_words, parser); - - case 3: - return pluginsd_end_v2(words, num_words, parser); - - case 11: - return pluginsd_set(words, num_words, parser); - - case 12: - return pluginsd_begin(words, num_words, parser); - - case 13: - return pluginsd_end(words, num_words, parser); - - case 21: - return pluginsd_replay_set(words, num_words, parser); - - case 22: - return pluginsd_replay_begin(words, num_words, parser); - - case 23: - return pluginsd_replay_rrddim_collection_state(words, num_words, parser); - - case 24: - return pluginsd_replay_rrdset_collection_state(words, num_words, parser); - - case 25: - return pluginsd_replay_end(words, num_words, parser); - - case 31: - return pluginsd_dimension(words, num_words, parser); - - case 32: - return pluginsd_chart(words, num_words, parser); - - case 33: - return pluginsd_chart_definition_end(words, num_words, parser); - - case 34: - return pluginsd_clabel(words, num_words, parser); - - case 35: - return pluginsd_clabel_commit(words, num_words, parser); - - case 41: - return pluginsd_function(words, num_words, parser); - - case 42: - return pluginsd_function_result_begin(words, num_words, parser); - - case 51: - return pluginsd_label(words, num_words, parser); - - case 52: - return pluginsd_overwrite(words, num_words, parser); - - case 53: - return pluginsd_variable(words, num_words, parser); - - case 61: - return streaming_claimed_id(words, num_words, parser); - - case 71: - return pluginsd_host(words, num_words, parser); - - case 72: - return pluginsd_host_define(words, num_words, parser); - - case 73: - return pluginsd_host_define_end(words, num_words, parser); - - case 74: - return pluginsd_host_labels(words, num_words, parser); - - case 97: - return pluginsd_flush(words, num_words, parser); - - case 98: - return pluginsd_disable(words, num_words, parser); - - case 99: - return pluginsd_exit(words, num_words, parser); - - case 101: - return pluginsd_register_plugin(words, num_words, parser); - - case 102: - return pluginsd_register_module(words, num_words, parser); - - case 103: - return pluginsd_register_job(words, num_words, parser); - - case 104: - return pluginsd_dyncfg_reset(words, num_words, parser); - - case 110: - return pluginsd_job_status(words, num_words, parser); - - case 111: - return pluginsd_delete_job(words, num_words, parser); - - default: - fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id); - } -} - -#include "gperf-hashtable.h" - -void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) { - parser->repertoire = repertoire; - - for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) { - if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire)) - worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword); - } -} - -static void parser_destroy_dyncfg(PARSER *parser) { - if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { - unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item); - parser->user.cd->configuration = NULL; - } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){ - dictionary_flush(parser->user.host->configurable_plugins); - } -} - -void parser_destroy(PARSER *parser) { - if (unlikely(!parser)) - return; - - parser_destroy_dyncfg(parser); - - dictionary_destroy(parser->inflight.functions); - freez(parser); -} - -int pluginsd_parser_unittest(void) { - PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); - pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING); - - char *lines[] = { - "BEGIN2 abcdefghijklmnopqr 123", - "SET2 abcdefg 0x12345678 0 0", - "SET2 hijklmnoqr 0x12345678 0 0", - "SET2 stuvwxyz 0x12345678 0 0", - "END2", - NULL, - }; - - char *words[PLUGINSD_MAX_WORDS]; - size_t iterations = 1000000; - size_t count = 0; - char input[PLUGINSD_LINE_MAX + 1]; - - usec_t started = now_realtime_usec(); - while(--iterations) { - for(size_t line = 0; lines[line] ;line++) { - strncpyz(input, lines[line], PLUGINSD_LINE_MAX); - size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); - const char *command = get_word(words, num_words, 0); - PARSER_KEYWORD *keyword = parser_find_keyword(p, command); - if(unlikely(!keyword)) - fatal("Cannot parse the line '%s'", lines[line]); - count++; - } - } - usec_t ended = now_realtime_usec(); - - netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count, - (double)(ended - started) / (double)USEC_PER_SEC, - (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0); - - parser_destroy(p); - return 0; -} diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h deleted file mode 100644 index 1fce9a89..00000000 --- a/collectors/plugins.d/pluginsd_parser.h +++ /dev/null @@ -1,245 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_PLUGINSD_PARSER_H -#define NETDATA_PLUGINSD_PARSER_H - -#include "daemon/common.h" - -#define WORKER_PARSER_FIRST_JOB 3 - -// this has to be in-sync with the same at receiver.c -#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - -// this controls the max response size of a function -#define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024) - -#define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024 - -#define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral" -// PARSER return codes -typedef enum __attribute__ ((__packed__)) parser_rc { - PARSER_RC_OK, // Callback was successful, go on - PARSER_RC_STOP, // Callback says STOP - PARSER_RC_ERROR // Callback failed (abort rest of callbacks) -} PARSER_RC; - -typedef enum __attribute__ ((__packed__)) parser_input_type { - PARSER_INPUT_SPLIT = (1 << 1), - PARSER_DEFER_UNTIL_KEYWORD = (1 << 2), -} PARSER_INPUT_TYPE; - -typedef enum __attribute__ ((__packed__)) { - PARSER_INIT_PLUGINSD = (1 << 1), - PARSER_INIT_STREAMING = (1 << 2), - PARSER_REP_METADATA = (1 << 3), -} PARSER_REPERTOIRE; - -struct parser; -typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser); - -typedef struct parser_keyword { - char *keyword; - size_t id; - PARSER_REPERTOIRE repertoire; - size_t worker_job_id; -} PARSER_KEYWORD; - -typedef struct parser_user_object { - bool cleanup_slots; - RRDSET *st; - RRDHOST *host; - void *opaque; - struct plugind *cd; - int trust_durations; - RRDLABELS *new_host_labels; - RRDLABELS *chart_rrdlabels_linked_temporarily; - size_t data_collections_count; - int enabled; - -#ifdef NETDATA_LOG_STREAM_RECEIVE - FILE *stream_log_fp; - PARSER_REPERTOIRE stream_log_repertoire; -#endif - - STREAM_CAPABILITIES capabilities; // receiver capabilities - - struct { - bool parsing_host; - uuid_t machine_guid; - char machine_guid_str[UUID_STR_LEN]; - STRING *hostname; - RRDLABELS *rrdlabels; - } host_define; - - struct parser_user_object_replay { - time_t start_time; - time_t end_time; - - usec_t start_time_ut; - usec_t end_time_ut; - - time_t wall_clock_time; - - bool rset_enabled; - } replay; - - struct parser_user_object_v2 { - bool locked_data_collection; - RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this - time_t update_every; - time_t end_time; - time_t wall_clock_time; - bool ml_locked; - } v2; -} PARSER_USER_OBJECT; - -typedef struct parser { - uint8_t version; // Parser version - PARSER_REPERTOIRE repertoire; - uint32_t flags; - int fd; // Socket - FILE *fp_input; // Input source e.g. stream - FILE *fp_output; // Stream to send commands to plugin - -#ifdef ENABLE_HTTPS - NETDATA_SSL *ssl_output; -#endif -#ifdef ENABLE_H2O - void *h2o_ctx; // if set we use h2o_stream functions to send data -#endif - - PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls - - struct buffered_reader reader; - struct line_splitter line; - PARSER_KEYWORD *keyword; - - struct { - const char *end_keyword; - BUFFER *response; - void (*action)(struct parser *parser, void *action_data); - void *action_data; - } defer; - - struct { - DICTIONARY *functions; - usec_t smaller_timeout; - } inflight; - - struct { - SPINLOCK spinlock; - } writer; - -} PARSER; - -PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); -void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire); -void parser_destroy(PARSER *working_parser); -void pluginsd_cleanup_v2(PARSER *parser); -void inflight_functions_init(PARSER *parser); -void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire); -PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words); - -static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) { - const char *s = src, *keyword_start; - - while (unlikely(isspace_map[(uint8_t)*s])) s++; - keyword_start = s; - - while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) { - *dst++ = *s++; - dst_size--; - } - *dst = '\0'; - return dst_size == 0 ? 0 : (int) (s - keyword_start); -} - -PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len); - -static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) { - PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command)); - if(t && (t->repertoire & parser->repertoire)) - return t; - - return NULL; -} - -bool parser_reconstruct_node(BUFFER *wb, void *ptr); -bool parser_reconstruct_instance(BUFFER *wb, void *ptr); -bool parser_reconstruct_context(BUFFER *wb, void *ptr); - -static inline int parser_action(PARSER *parser, char *input) { -#ifdef NETDATA_LOG_STREAM_RECEIVE - static __thread char line[PLUGINSD_LINE_MAX + 1]; - strncpyz(line, input, sizeof(line) - 1); -#endif - - parser->line.count++; - - if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { - char command[100 + 1]; - bool has_keyword = find_first_keyword(input, command, 100, isspace_map_pluginsd); - - if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { - if(parser->defer.response) { - buffer_strcat(parser->defer.response, input); - if(buffer_strlen(parser->defer.response) > PLUGINSD_MAX_DEFERRED_SIZE) { - // more than PLUGINSD_MAX_DEFERRED_SIZE of data, - // or a bad plugin that did not send the end_keyword - internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); - return 1; - } - } - return 0; - } - else { - // call the action - parser->defer.action(parser, parser->defer.action_data); - - // empty everything - parser->defer.action = NULL; - parser->defer.action_data = NULL; - parser->defer.end_keyword = NULL; - parser->defer.response = NULL; - parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; - } - return 0; - } - - parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS); - const char *command = get_word(parser->line.words, parser->line.num_words, 0); - - if(unlikely(!command)) { - line_splitter_reset(&parser->line); - return 0; - } - - PARSER_RC rc; - parser->keyword = parser_find_keyword(parser, command); - if(likely(parser->keyword)) { - worker_is_busy(parser->keyword->worker_job_id); - -#ifdef NETDATA_LOG_STREAM_RECEIVE - if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire) - fprintf(parser->user.stream_log_fp, "%s", line); -#endif - - rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words); - // rc = (*t->func)(words, num_words, parser); - worker_is_idle(); - } - else - rc = PARSER_RC_ERROR; - - if(rc == PARSER_RC_ERROR) { - CLEAN_BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); - line_splitter_reconstruct_line(wb, &parser->line); - netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", - command, parser->line.count, buffer_tostring(wb)); - } - - line_splitter_reset(&parser->line); - return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); -} - -#endif //NETDATA_PLUGINSD_PARSER_H |