diff options
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/README.md | 58 | ||||
-rw-r--r-- | collectors/plugins.d/gperf-config.txt | 24 | ||||
-rw-r--r-- | collectors/plugins.d/gperf-hashtable.h | 146 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 73 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 651 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 21 |
6 files changed, 663 insertions, 310 deletions
diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md index 1c3b50cb7..0752d389b 100644 --- a/collectors/plugins.d/README.md +++ b/collectors/plugins.d/README.md @@ -14,20 +14,20 @@ from external processes, thus allowing Netdata to use **external plugins**. ## Provided External Plugins -|plugin|language|O/S|description| -|:----:|:------:|:-:|:----------| -|[apps.plugin](https://github.com/netdata/netdata/blob/master/collectors/apps.plugin/README.md)|`C`|linux, freebsd|monitors the whole process tree on Linux and FreeBSD and breaks down system resource usage by **process**, **user** and **user group**.| -|[charts.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/charts.d.plugin/README.md)|`BASH`|all|a **plugin orchestrator** for data collection modules written in `BASH` v4+.| -|[cups.plugin](https://github.com/netdata/netdata/blob/master/collectors/cups.plugin/README.md)|`C`|all|monitors **CUPS**| -|[ebpf.plugin](https://github.com/netdata/netdata/blob/master/collectors/ebpf.plugin/README.md)|`C`|linux|monitors different metrics on environments using kernel internal functions.| -|[go.d.plugin](https://github.com/netdata/go.d.plugin/blob/master/README.md)|`GO`|all|collects metrics from the system, applications, or third-party APIs.| -|[ioping.plugin](https://github.com/netdata/netdata/blob/master/collectors/ioping.plugin/README.md)|`C`|all|measures disk latency.| -|[freeipmi.plugin](https://github.com/netdata/netdata/blob/master/collectors/freeipmi.plugin/README.md)|`C`|linux|collects metrics from enterprise hardware sensors, on Linux servers.| -|[nfacct.plugin](https://github.com/netdata/netdata/blob/master/collectors/nfacct.plugin/README.md)|`C`|linux|collects netfilter firewall, connection tracker and accounting metrics using `libmnl` and `libnetfilter_acct`.| -|[xenstat.plugin](https://github.com/netdata/netdata/blob/master/collectors/xenstat.plugin/README.md)|`C`|linux|collects XenServer and XCP-ng metrics using `lxenstat`.| -|[perf.plugin](https://github.com/netdata/netdata/blob/master/collectors/perf.plugin/README.md)|`C`|linux|collects CPU performance metrics using performance monitoring units (PMU).| -|[python.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/README.md)|`python`|all|a **plugin orchestrator** for data collection modules written in `python` v2 or v3 (both are supported).| -|[slabinfo.plugin](https://github.com/netdata/netdata/blob/master/collectors/slabinfo.plugin/README.md)|`C`|linux|collects kernel internal cache objects (SLAB) metrics.| +| plugin | language | O/S | description | +|:------------------------------------------------------------------------------------------------------:|:--------:|:--------------:|:----------------------------------------------------------------------------------------------------------------------------------------| +| [apps.plugin](https://github.com/netdata/netdata/blob/master/collectors/apps.plugin/README.md) | `C` | linux, freebsd | monitors the whole process tree on Linux and FreeBSD and breaks down system resource usage by **process**, **user** and **user group**. | +| [charts.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/charts.d.plugin/README.md) | `BASH` | all | a **plugin orchestrator** for data collection modules written in `BASH` v4+. | +| [cups.plugin](https://github.com/netdata/netdata/blob/master/collectors/cups.plugin/README.md) | `C` | all | monitors **CUPS** | +| [ebpf.plugin](https://github.com/netdata/netdata/blob/master/collectors/ebpf.plugin/README.md) | `C` | linux | monitors different metrics on environments using kernel internal functions. | +| [go.d.plugin](https://github.com/netdata/go.d.plugin/blob/master/README.md) | `GO` | all | collects metrics from the system, applications, or third-party APIs. | +| [ioping.plugin](https://github.com/netdata/netdata/blob/master/collectors/ioping.plugin/README.md) | `C` | all | measures disk latency. | +| [freeipmi.plugin](https://github.com/netdata/netdata/blob/master/collectors/freeipmi.plugin/README.md) | `C` | linux | collects metrics from enterprise hardware sensors, on Linux servers. | +| [nfacct.plugin](https://github.com/netdata/netdata/blob/master/collectors/nfacct.plugin/README.md) | `C` | linux | collects netfilter firewall, connection tracker and accounting metrics using `libmnl` and `libnetfilter_acct`. | +| [xenstat.plugin](https://github.com/netdata/netdata/blob/master/collectors/xenstat.plugin/README.md) | `C` | linux | collects XenServer and XCP-ng metrics using `lxenstat`. | +| [perf.plugin](https://github.com/netdata/netdata/blob/master/collectors/perf.plugin/README.md) | `C` | linux | collects CPU performance metrics using performance monitoring units (PMU). | +| [python.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/README.md) | `python` | all | a **plugin orchestrator** for data collection modules written in `python` v2 or v3 (both are supported). | +| [slabinfo.plugin](https://github.com/netdata/netdata/blob/master/collectors/slabinfo.plugin/README.md) | `C` | linux | collects kernel internal cache objects (SLAB) metrics. | Plugin orchestrators may also be described as **modular plugins**. They are modular since they accept custom made modules to be included. Writing modules for these plugins is easier than accessing the native Netdata API directly. You will find modules already available for each orchestrator under the directory of the particular modular plugin (e.g. under python.d.plugin for the python orchestrator). Each of these modular plugins has each own methods for defining modules. Please check the examples and their documentation. @@ -154,18 +154,18 @@ every 5 seconds. There are a few environment variables that are set by `netdata` and are available for the plugin to use. -|variable|description| -|:------:|:----------| -|`NETDATA_USER_CONFIG_DIR`|The directory where all Netdata-related user configuration should be stored. If the plugin requires custom user configuration, this is the place the user has saved it (normally under `/etc/netdata`).| -|`NETDATA_STOCK_CONFIG_DIR`|The directory where all Netdata -related stock configuration should be stored. If the plugin is shipped with configuration files, this is the place they can be found (normally under `/usr/lib/netdata/conf.d`).| -|`NETDATA_PLUGINS_DIR`|The directory where all Netdata plugins are stored.| -|`NETDATA_USER_PLUGINS_DIRS`|The list of directories where custom plugins are stored.| -|`NETDATA_WEB_DIR`|The directory where the web files of Netdata are saved.| -|`NETDATA_CACHE_DIR`|The directory where the cache files of Netdata are stored. Use this directory if the plugin requires a place to store data. A new directory should be created for the plugin for this purpose, inside this directory.| -|`NETDATA_LOG_DIR`|The directory where the log files are stored. By default the `stderr` output of the plugin will be saved in the `error.log` file of Netdata.| -|`NETDATA_HOST_PREFIX`|This is used in environments where system directories like `/sys` and `/proc` have to be accessed at a different path.| -|`NETDATA_DEBUG_FLAGS`|This is a number (probably in hex starting with `0x`), that enables certain Netdata debugging features. Check **\[[Tracing Options]]** for more information.| -|`NETDATA_UPDATE_EVERY`|The minimum number of seconds between chart refreshes. This is like the **internal clock** of Netdata (it is user configurable, defaulting to `1`). There is no meaning for a plugin to update its values more frequently than this number of seconds.| +| variable | description | +|:---------------------------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `NETDATA_USER_CONFIG_DIR` | The directory where all Netdata-related user configuration should be stored. If the plugin requires custom user configuration, this is the place the user has saved it (normally under `/etc/netdata`). | +| `NETDATA_STOCK_CONFIG_DIR` | The directory where all Netdata -related stock configuration should be stored. If the plugin is shipped with configuration files, this is the place they can be found (normally under `/usr/lib/netdata/conf.d`). | +| `NETDATA_PLUGINS_DIR` | The directory where all Netdata plugins are stored. | +| `NETDATA_USER_PLUGINS_DIRS` | The list of directories where custom plugins are stored. | +| `NETDATA_WEB_DIR` | The directory where the web files of Netdata are saved. | +| `NETDATA_CACHE_DIR` | The directory where the cache files of Netdata are stored. Use this directory if the plugin requires a place to store data. A new directory should be created for the plugin for this purpose, inside this directory. | +| `NETDATA_LOG_DIR` | The directory where the log files are stored. By default the `stderr` output of the plugin will be saved in the `error.log` file of Netdata. | +| `NETDATA_HOST_PREFIX` | This is used in environments where system directories like `/sys` and `/proc` have to be accessed at a different path. | +| `NETDATA_DEBUG_FLAGS` | This is a number (probably in hex starting with `0x`), that enables certain Netdata debugging features. Check **\[[Tracing Options]]** for more information. | +| `NETDATA_UPDATE_EVERY` | The minimum number of seconds between chart refreshes. This is like the **internal clock** of Netdata (it is user configurable, defaulting to `1`). There is no meaning for a plugin to update its values more frequently than this number of seconds. | ### The output of the plugin @@ -298,7 +298,7 @@ the template is: the context is giving the template of the chart. For example, if multiple charts present the same information for a different family, they should have the same `context` - this is used for looking up rendering information for the chart (colors, sizes, informational texts) and also apply alarms to it + this is used for looking up rendering information for the chart (colors, sizes, informational texts) and also apply alerts to it - `charttype` @@ -388,12 +388,12 @@ the template is: > VARIABLE [SCOPE] name = value -`VARIABLE` defines a variable that can be used in alarms. This is to used for setting constants (like the max connections a server may accept). +`VARIABLE` defines a variable that can be used in alerts. This is to used for setting constants (like the max connections a server may accept). Variables support 2 scopes: - `GLOBAL` or `HOST` to define the variable at the host level. -- `LOCAL` or `CHART` to define the variable at the chart level. Use chart-local variables when the same variable may exist for different charts (i.e. Netdata monitors 2 mysql servers, and you need to set the `max_connections` each server accepts). Using chart-local variables is the ideal to build alarm templates. +- `LOCAL` or `CHART` to define the variable at the chart level. Use chart-local variables when the same variable may exist for different charts (i.e. Netdata monitors 2 mysql servers, and you need to set the `max_connections` each server accepts). Using chart-local variables is the ideal to build alert templates. The position of the `VARIABLE` line, sets its default scope (in case you do not specify a scope). So, defining a `VARIABLE` before any `CHART`, or between `END` and `BEGIN` (outside any chart), sets `GLOBAL` scope, while defining a `VARIABLE` just after a `CHART` or a `DIMENSION`, or within the `BEGIN` - `END` block of a chart, sets `LOCAL` scope. diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt index b8140e66c..a1d0c51ba 100644 --- a/collectors/plugins.d/gperf-config.txt +++ b/collectors/plugins.d/gperf-config.txt @@ -36,20 +36,22 @@ SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_ VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19 DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 -REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 +DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 +REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 +DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 # # Streaming only keywords # -CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 -BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 -SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 -END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26 +CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 +BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26 +SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 +END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 # # Streaming Replication keywords # -CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 -RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 -RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 -REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30 -RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 -RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 +CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 +RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30 +RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 +REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 +RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33 +RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34 diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h index e7d20126f..5bbf9fa98 100644 --- a/collectors/plugins.d/gperf-hashtable.h +++ b/collectors/plugins.d/gperf-hashtable.h @@ -30,12 +30,12 @@ #endif -#define GPERF_PARSER_TOTAL_KEYWORDS 32 +#define GPERF_PARSER_TOTAL_KEYWORDS 34 #define GPERF_PARSER_MIN_WORD_LENGTH 3 #define GPERF_PARSER_MAX_WORD_LENGTH 22 #define GPERF_PARSER_MIN_HASH_VALUE 3 -#define GPERF_PARSER_MAX_HASH_VALUE 41 -/* maximum key range = 39, duplicates = 0 */ +#define GPERF_PARSER_MAX_HASH_VALUE 36 +/* maximum key range = 34, duplicates = 0 */ #ifdef __GNUC__ __inline @@ -49,32 +49,32 @@ gperf_keyword_hash_function (register const char *str, register size_t len) { static unsigned char asso_values[] = { - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 16, 7, 2, 11, 0, - 8, 42, 3, 9, 42, 42, 9, 42, 0, 2, - 42, 42, 1, 3, 42, 7, 17, 42, 27, 2, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42 + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 12, 28, 5, 2, 0, + 0, 37, 3, 13, 37, 37, 14, 37, 0, 2, + 37, 37, 1, 3, 37, 6, 10, 37, 32, 2, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37 }; return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]]; } @@ -84,70 +84,72 @@ static PARSER_KEYWORD gperf_keywords[] = {(char*)0}, {(char*)0}, {(char*)0}, #line 30 "gperf-config.txt" {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, -#line 46 "gperf-config.txt" - {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26}, -#line 53 "gperf-config.txt" - {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30}, +#line 48 "gperf-config.txt" + {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, +#line 55 "gperf-config.txt" + {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, #line 35 "gperf-config.txt" {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, -#line 45 "gperf-config.txt" - {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, -#line 54 "gperf-config.txt" - {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, +#line 47 "gperf-config.txt" + {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, +#line 56 "gperf-config.txt" + {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33}, #line 18 "gperf-config.txt" {"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4}, +#line 54 "gperf-config.txt" + {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, +#line 57 "gperf-config.txt" + {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34}, +#line 41 "gperf-config.txt" + {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, #line 26 "gperf-config.txt" {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9}, -#line 55 "gperf-config.txt" - {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, -#line 25 "gperf-config.txt" - {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, -#line 44 "gperf-config.txt" - {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, -#line 51 "gperf-config.txt" - {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, +#line 31 "gperf-config.txt" + {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14}, #line 21 "gperf-config.txt" {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7}, #line 19 "gperf-config.txt" {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5}, -#line 27 "gperf-config.txt" - {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10}, -#line 39 "gperf-config.txt" - {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, -#line 52 "gperf-config.txt" - {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, -#line 20 "gperf-config.txt" - {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6}, -#line 43 "gperf-config.txt" - {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, -#line 15 "gperf-config.txt" - {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, -#line 31 "gperf-config.txt" - {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14}, -#line 28 "gperf-config.txt" - {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11}, -#line 50 "gperf-config.txt" - {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, #line 37 "gperf-config.txt" {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, -#line 16 "gperf-config.txt" - {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, +#line 40 "gperf-config.txt" + {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, +#line 15 "gperf-config.txt" + {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, +#line 20 "gperf-config.txt" + {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6}, #line 34 "gperf-config.txt" {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17}, +#line 16 "gperf-config.txt" + {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, +#line 39 "gperf-config.txt" + {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, #line 29 "gperf-config.txt" {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12}, -#line 33 "gperf-config.txt" - {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16}, -#line 17 "gperf-config.txt" - {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}, - {(char*)0}, {(char*)0}, {(char*)0}, +#line 27 "gperf-config.txt" + {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10}, #line 38 "gperf-config.txt" {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, #line 32 "gperf-config.txt" {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15}, - {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, +#line 52 "gperf-config.txt" + {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, +#line 45 "gperf-config.txt" + {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, #line 36 "gperf-config.txt" - {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19} + {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19}, +#line 33 "gperf-config.txt" + {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16}, +#line 28 "gperf-config.txt" + {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11}, +#line 25 "gperf-config.txt" + {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, +#line 46 "gperf-config.txt" + {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26}, +#line 53 "gperf-config.txt" + {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30}, +#line 17 "gperf-config.txt" + {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3} }; PARSER_KEYWORD * diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 4988b5071..7c5df4168 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -10,47 +10,15 @@ #define PLUGINSD_CMD_MAX (FILENAME_MAX*2) #define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0 -#define PLUGINSD_KEYWORD_CHART "CHART" -#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END" -#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" -#define PLUGINSD_KEYWORD_BEGIN "BEGIN" -#define PLUGINSD_KEYWORD_SET "SET" -#define PLUGINSD_KEYWORD_END "END" -#define PLUGINSD_KEYWORD_FLUSH "FLUSH" -#define PLUGINSD_KEYWORD_DISABLE "DISABLE" -#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" -#define PLUGINSD_KEYWORD_LABEL "LABEL" -#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" -#define PLUGINSD_KEYWORD_CLABEL "CLABEL" -#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT" -#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION" -#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN" -#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END" - -#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART" -#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN" -#define PLUGINSD_KEYWORD_REPLAY_SET "RSET" -#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE" -#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" -#define PLUGINSD_KEYWORD_REPLAY_END "REND" - -#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2" -#define PLUGINSD_KEYWORD_SET_V2 "SET2" -#define PLUGINSD_KEYWORD_END_V2 "END2" - -#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE" -#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END" -#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL" -#define PLUGINSD_KEYWORD_HOST "HOST" +#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD "FUNCTION_PAYLOAD" +#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "FUNCTION_PAYLOAD_END" #define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE" #define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE" +#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB "DYNCFG_REGISTER_JOB" #define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS" - -#define PLUGINSD_KEYWORD_EXIT "EXIT" - -#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds +#define PLUGINSD_KEYWORD_DELETE_JOB "DELETE_JOB" #define PLUGINSD_LINE_MAX_SSL_READ 512 @@ -99,37 +67,4 @@ void pluginsd_process_thread_cleanup(void *ptr); size_t pluginsd_initialize_plugin_directories(); -#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \ - buffer_sprintf(wb \ - , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ - , (transaction) ? (transaction) : "" \ - , (int)(code) \ - , (content_type) ? (content_type) : "" \ - , (long int)(expires) \ - ) - -#define pluginsd_function_result_end_to_buffer(wb) \ - buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") - -#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \ - fprintf(stdout \ - , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ - , (transaction) ? (transaction) : "" \ - , (int)(code) \ - , (content_type) ? (content_type) : "" \ - , (long int)(expires) \ - ) - -#define pluginsd_function_result_end_to_stdout() \ - fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") - -static inline void pluginsd_function_json_error(const char *transaction, int code, const char *msg) { - char buffer[PLUGINSD_LINE_MAX + 1]; - json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); - - pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); - fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); - pluginsd_function_result_end_to_stdout(); -} - #endif /* NETDATA_PLUGINS_D_H */ diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index bc265a3af..68667c785 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,6 +4,9 @@ #define LOG_FUNCTIONS false +#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING) +#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD) + static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; @@ -353,7 +356,7 @@ static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *par static void pluginsd_host_define_cleanup(PARSER *parser) { string_freez(parser->user.host_define.hostname); - dictionary_destroy(parser->user.host_define.rrdlabels); + rrdlabels_destroy(parser->user.host_define.rrdlabels); parser->user.host_define.hostname = NULL; parser->user.host_define.rrdlabels = NULL; @@ -390,17 +393,17 @@ static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PAR return PARSER_RC_OK; } -static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) { +static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) { char *name = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); if(!name || !*name || !value) return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters"); - if(!parser->user.host_define.parsing_host || !dict) + if(!parser->user.host_define.parsing_host || !labels) return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); - rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG); + rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG); return PARSER_RC_OK; } @@ -733,14 +736,16 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE struct inflight_function { int code; int timeout; - BUFFER *destination_wb; STRING *function; - void (*callback)(BUFFER *wb, int code, void *callback_data); - void *callback_data; + BUFFER *result_body_wb; + rrd_function_result_callback_t result_cb; + void *result_cb_data; usec_t timeout_ut; usec_t started_ut; usec_t sent_ut; const char *payload; + PARSER *parser; + bool virtual; }; static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { @@ -751,42 +756,44 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller pf->code = HTTP_RESP_GATEWAY_TIMEOUT; + const char *transaction = dictionary_acquired_item_name(item); + char buffer[2048 + 1]; snprintfz(buffer, 2048, "%s %s %d \"%s\"\n", pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION", - dictionary_acquired_item_name(item), + transaction, pf->timeout, string2str(pf->function)); // send the command to the plugin - int ret = send_to_plugin(buffer, parser); + ssize_t ret = send_to_plugin(buffer, parser); pf->sent_ut = now_realtime_usec(); if(ret < 0) { - netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret); - rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); + netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret); + rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); } else { internal_error(LOG_FUNCTIONS, - "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), ret, pf->sent_ut - pf->started_ut); } if (!pf->payload) return; - + // send the payload to the plugin ret = send_to_plugin(pf->payload, parser); if(ret < 0) { - netdata_log_error("FUNCTION_PAYLOAD: failed to send function to plugin, error %d", ret); - rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); + netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret); + rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); } else { internal_error(LOG_FUNCTIONS, - "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), ret, pf->sent_ut - pf->started_ut); } @@ -798,23 +805,90 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m struct inflight_function *pf = new_func; netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); - pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); - pf->callback(pf->destination_wb, pf->code, pf->callback_data); + pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); string_freez(pf->function); return false; } -static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) { +void delete_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job"); + freez(params_local); + return; + } + + const char *module = words[1]; + const char *job = words[2]; + + delete_job(plug, module, job); + + unlink_job(plug->name, module, job); + + rrdpush_send_job_deleted(localhost, plug->name, module, job); + + freez(params_local); +} + +void set_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug __maybe_unused, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config"); + freez(params_local); + return; + } + + const char *module_name = get_word(words, words_c, 1); + const char *job_name = get_word(words, words_c, 2); + + if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) { + freez(params_local); + return; + } + + // only send this if it is not existing already (register_job cares for that) + rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED); + + freez(params_local); +} + +static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) { struct inflight_function *pf = func; + struct parser *parser = (struct parser *)parser_ptr; internal_error(LOG_FUNCTIONS, - "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)", + "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), - buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + + if (pf->virtual && SERVING_PLUGINSD(parser)) { + if (pf->payload) { + if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) + set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration); + } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) { + delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + } + } + + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); - pf->callback(pf->destination_wb, pf->code, pf->callback_data); string_freez(pf->function); + freez((void *)pf->payload); } void inflight_functions_init(PARSER *parser) { @@ -830,11 +904,11 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { dfe_start_write(parser->inflight.functions, pf) { if (pf->timeout_ut < now) { internal_error(true, - "FUNCTION '%s' removing expired transaction '%s', after %llu usec.", + "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.", string2str(pf->function), pf_dfe.name, now - pf->started_ut); - if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK) - pf->code = rrd_call_function_error(pf->destination_wb, + if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK) + pf->code = rrd_call_function_error(pf->result_body_wb, "Timeout waiting for collector response.", HTTP_RESP_GATEWAY_TIMEOUT); @@ -847,35 +921,73 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { dfe_done(pf); } +void pluginsd_function_cancel(void *data) { + struct inflight_function *look_for = data, *t; + + bool sent = false; + dfe_start_read(look_for->parser->inflight.functions, t) { + if(look_for == t) { + const char *transaction = t_dfe.name; + + internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction); + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, "%s %s\n", + PLUGINSD_KEYWORD_FUNCTION_CANCEL, + transaction); + + // send the command to the plugin + ssize_t ret = send_to_plugin(buffer, t->parser); + if(ret < 0) + sent = true; + + break; + } + } + dfe_done(t); + + if(sent <= 0) + netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); +} + // this is the function that is called from // rrd_call_function_and_wait() and rrd_call_function_async() -static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) { - PARSER *parser = collector_data; +static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function, + void *execute_cb_data, + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused, + void *is_cancelled_cb_data __maybe_unused, + rrd_function_register_canceller_cb_t register_canceller_cb, + void *register_canceller_db_data) { + PARSER *parser = execute_cb_data; usec_t now = now_realtime_usec(); struct inflight_function tmp = { .started_ut = now, - .timeout_ut = now + timeout * USEC_PER_SEC, - .destination_wb = destination_wb, + .timeout_ut = now + timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT, + .result_body_wb = result_body_wb, .timeout = timeout, .function = string_strdupz(function), - .callback = callback, - .callback_data = callback_data, - .payload = NULL + .result_cb = result_cb, + .result_cb_data = result_cb_data, + .payload = NULL, + .parser = parser, }; uuid_t uuid; - uuid_generate_time(uuid); + uuid_generate_random(uuid); - char key[UUID_STR_LEN]; - uuid_unparse_lower(uuid, key); + char transaction[UUID_STR_LEN]; + uuid_unparse_lower(uuid, transaction); dictionary_write_lock(parser->inflight.functions); // if there is any error, our dictionary callbacks will call the caller callback to notify // the caller about the error - no need for error handling here. - dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function)); + if(register_canceller_cb) + register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t); if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) parser->inflight.smaller_timeout = tmp.timeout_ut; @@ -890,6 +1002,8 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou } static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) { + // a plugin or a child is registering a function + bool global = false; size_t i = 1; if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { @@ -926,7 +1040,7 @@ static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; } - rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser); + rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser); parser->user.data_collections_count++; @@ -973,18 +1087,18 @@ static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_ } else { if(format && *format) - pf->destination_wb->content_type = functions_format_to_content_type(format); + pf->result_body_wb->content_type = functions_format_to_content_type(format); pf->code = code; - pf->destination_wb->expires = expiration; + pf->result_body_wb->expires = expiration; if(expiration <= now_realtime_sec()) - buffer_no_cacheable(pf->destination_wb); + buffer_no_cacheable(pf->result_body_wb); else - buffer_cacheable(pf->destination_wb); + buffer_cacheable(pf->result_body_wb); } - parser->defer.response = (pf) ? pf->destination_wb : NULL; + parser->defer.response = (pf) ? pf->result_body_wb : NULL; parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END; parser->defer.action = pluginsd_function_result_end; parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL @@ -1163,7 +1277,7 @@ static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER * const char *value = get_word(words, num_words, 2); const char *label_source = get_word(words, num_words, 3); - if (!name || !value || !*label_source) { + if (!name || !value || !label_source) { netdata_log_error("Ignoring malformed or empty CHART LABEL command."); return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } @@ -1604,7 +1718,7 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED))) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) rrdset_isnot_obsolete(st); timing_step(TIMING_STEP_BEGIN2_FIND_CHART); @@ -1894,7 +2008,7 @@ struct mutex_cond { int rc; }; -static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data) +static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *callback_data) { struct mutex_cond *ctx = callback_data; pthread_mutex_lock(&ctx->lock); @@ -1904,9 +2018,81 @@ static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data) } #define VIRT_FNC_TIMEOUT 1 +#define VIRT_FNC_BUF_SIZE (4096) +void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) { + PARSER *parser = NULL; + + //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all) + char *words[PLUGINSD_MAX_WORDS]; + char *function_with_params = strdupz(name); + size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd); + + if (num_words < 2) { + netdata_log_error("PLUGINSD: virtual function name is empty."); + freez(function_with_params); + return; + } + + const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1)); + if (unlikely(cpi == NULL)) { + netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name); + freez(function_with_params); + return; + } + struct configurable_plugin *cp = dictionary_acquired_item_value(cpi); + parser = (PARSER *)cp->cb_usr_ctx; + + BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL); + // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name) + buffer_strcat(function_out, get_word(words, num_words, 0)); + for (size_t i = 1; i < num_words; i++) { + if (i == 1 && SERVING_PLUGINSD(parser)) + continue; + buffer_sprintf(function_out, " %s", get_word(words, num_words, i)); + } + freez(function_with_params); + + usec_t now = now_realtime_usec(); + + struct inflight_function tmp = { + .started_ut = now, + .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, + .result_body_wb = wb, + .timeout = VIRT_FNC_TIMEOUT * 10, + .function = string_strdupz(buffer_tostring(function_out)), + .result_cb = callback, + .result_cb_data = callback_data, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, + }; + buffer_free(function_out); + + uuid_t uuid; + uuid_generate_time(uuid); + + char key[UUID_STR_LEN]; + uuid_unparse_lower(uuid, key); + + dictionary_write_lock(parser->inflight.functions); + + // if there is any error, our dictionary callbacks will call the caller callback to notify + // the caller about the error - no need for error handling here. + dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + + if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = tmp.timeout_ut; + + // garbage collect stale inflight functions + if(parser->inflight.smaller_timeout < now) + inflight_functions_garbage_collect(parser, now); + + dictionary_write_unlock(parser->inflight.functions); +} + + dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) { usec_t now = now_realtime_usec(); - BUFFER *wb = buffer_create(4096, NULL); + BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL); struct mutex_cond cond = { .lock = PTHREAD_MUTEX_INITIALIZER, @@ -1916,12 +2102,13 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, struct inflight_function tmp = { .started_ut = now, .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, - .destination_wb = wb, + .result_body_wb = wb, .timeout = VIRT_FNC_TIMEOUT, .function = string_strdupz(name), - .callback = virt_fnc_got_data_cb, - .callback_data = &cond, - .payload = payload, + .result_cb = virt_fnc_got_data_cb, + .result_cb_data = &cond, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, }; uuid_t uuid; @@ -1968,98 +2155,188 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, return cfg; } -static dyncfg_config_t get_plugin_config_cb(void *usr_ctx) +#define CVF_MAX_LEN (1024) +static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; - return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL); + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + + return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL); } -static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx) +static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL); } -static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name) +static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg) +enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + int rc; - call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data); - if(rc != 1) + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); + + buffer_free(wb); + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg) +enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_module_config %s", module_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg) +enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name) +enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL); - char buf[1024]; - snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, NULL); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } @@ -2079,37 +2356,65 @@ static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, si cfg->get_config_schema_cb = get_plugin_config_schema_cb; cfg->cb_usr_ctx = parser; - parser->user.cd->cfg_dict_item = register_plugin(cfg); - - if (unlikely(parser->user.cd->cfg_dict_item == NULL)) { + const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser)); + if (unlikely(di == NULL)) { freez(cfg->name); freez(cfg); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin"); } - parser->user.cd->configuration = cfg; + if (SERVING_PLUGINSD(parser)) { + // this is optimization for pluginsd to avoid extra dictionary lookup + // as we know which plugin is comunicating with us + parser->user.cd->cfg_dict_item = di; + parser->user.cd->configuration = cfg; + } else { + // register_plugin keeps the item acquired, so we need to release it + dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); + } + + rrdpush_send_dyncfg_enable(parser->user.host, cfg->name); + return PARSER_RC_OK; } +#define LOG_MSG_SIZE (1024) +#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2) +#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3) static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE"); - struct configurable_plugin *plug_cfg = parser->user.cd->configuration; - if (unlikely(plug_cfg == NULL)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); - - if (unlikely(num_words != 3)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type"); + size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4; + + if (unlikely(num_words != expected_num_words)) { + char log[LOG_MSG_SIZE + 1]; + snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log); + } + + struct configurable_plugin *plug_cfg; + const DICTIONARY_ITEM *di = NULL; + if (SERVING_PLUGINSD(parser)) { + plug_cfg = parser->user.cd->configuration; + if (unlikely(plug_cfg == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); + } else { + di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); + if (unlikely(di == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found"); + + plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di); + } struct module *mod = callocz(1, sizeof(struct module)); - mod->type = str2_module_type(words[2]); + mod->type = str2_module_type(words[MODULE_TYPE_IDX]); if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) { freez(mod); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)"); } - mod->name = strdupz(words[1]); + mod->name = strdupz(words[MODULE_NAME_IDX]); mod->set_config_cb = set_module_config_cb; mod->get_config_cb = get_module_config_cb; @@ -2122,27 +2427,111 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si mod->delete_job_cb = delete_job_cb; mod->job_config_cb_usr_ctx = parser; - register_module(plug_cfg, mod); + register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser)); + + if (di != NULL) + dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); + + rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type); + return PARSER_RC_OK; } -// job_status <module_name> <job_name> <status_code> <state> <message> -static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) -{ - if (unlikely(num_words != 6 && num_words != 5)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); +static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) { + if (atol(words[3]) < 0) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags"); + dyncfg_job_flg_t flags = atol(words[3]); + if (SERVING_PLUGINSD(parser)) + flags |= JOB_FLG_PLUGIN_PUSHED; + else + flags |= JOB_FLG_STREAMING_PUSHED; - int state = atoi(words[4]); + enum job_type job_type = str2job_type(words[2]); + if (job_type == JOB_TYPE_UNKNOWN) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type"); + if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)"); - enum job_status job_status = str2job_state(words[3]); - if (unlikely(job_status == JOB_STATUS_UNKNOWN)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state"); + if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job"); + + rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags); + return PARSER_RC_OK; +} + +static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6; + + if (unlikely(num_words != expected_num_words)) { + char log[LOG_MSG_SIZE + 1]; + snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log); + } + + if (SERVING_PLUGINSD(parser)) { + return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); + } + return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]); +} + +static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) { + int state = str2i(words[3]); + + enum job_status status = str2job_state(words[2]); + if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status"); char *message = NULL; - if (num_words == 6) - message = strdupz(words[5]); + if (num_words == 5) + message = words[4]; + + const DICTIONARY_ITEM *plugin_item; + DICTIONARY *job_dict; + const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message); + + if (job_item != NULL) { + struct job *job = dictionary_acquired_item_value(job_item); + rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job); + + pthread_mutex_unlock(&job->lock); + dictionary_acquired_item_release(job_dict, job_item); + dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item); + } + + return PARSER_RC_OK; +} - report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message); +// job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message] +static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) { + if (SERVING_PLUGINSD(parser)) { + if (unlikely(num_words != 5 && num_words != 6)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); + } else { + if (unlikely(num_words != 6 && num_words != 7)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]"); + } + + if (SERVING_PLUGINSD(parser)) { + return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); + } + return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]); +} + +static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) { + // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function + // they are of opossite direction + if (num_words != 4) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name"); + + const char *plugin_name = get_word(words, num_words, 1); + const char *module_name = get_word(words, num_words, 2); + const char *job_name = get_word(words, num_words, 3); + + if (SERVING_STREAMING(parser)) + delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name); + + // forward to parent if any + rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name); return PARSER_RC_OK; } @@ -2309,15 +2698,22 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); buffered_reader_init(&parser->reader); - char buffer[PLUGINSD_LINE_MAX + 2]; + BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); while(likely(service_running(SERVICE_COLLECTORS))) { - if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) { + if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC))) break; + + continue; } - else if(unlikely(parser_action(parser, buffer))) + + if(unlikely(parser_action(parser, buffer->buffer))) break; + + buffer->len = 0; + buffer->buffer[0] = '\0'; } + buffer_free(buffer); cd->unsafe.enabled = parser->user.enabled; count = parser->user.data_collections_count; @@ -2452,10 +2848,19 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, case 101: return pluginsd_register_plugin(words, num_words, parser); - + case 102: return pluginsd_register_module(words, num_words, parser); + case 103: + return pluginsd_register_job(words, num_words, parser); + + case 110: + return pluginsd_job_status(words, num_words, parser); + + case 111: + return pluginsd_delete_job(words, num_words, parser); + default: fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id); } @@ -2472,14 +2877,20 @@ void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) { } } +static void parser_destroy_dyncfg(PARSER *parser) { + if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { + unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item); + parser->user.cd->configuration = NULL; + } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){ + dictionary_flush(parser->user.host->configurable_plugins); + } +} + void parser_destroy(PARSER *parser) { if (unlikely(!parser)) return; - if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { - unregister_plugin(parser->user.cd->cfg_dict_item); - parser->user.cd->configuration = NULL; - } + parser_destroy_dyncfg(parser); dictionary_destroy(parser->inflight.functions); freez(parser); diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 5e1ea1242..74767569b 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -10,6 +10,9 @@ // this has to be in-sync with the same at receiver.c #define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) +// this controls the max response size of a function +#define PLUGINSD_MAX_DEFERRED_SIZE (20 * 1024 * 1024) + // PARSER return codes typedef enum __attribute__ ((__packed__)) parser_rc { PARSER_RC_OK, // Callback was successful, go on @@ -43,8 +46,8 @@ typedef struct parser_user_object { void *opaque; struct plugind *cd; int trust_durations; - DICTIONARY *new_host_labels; - DICTIONARY *chart_rrdlabels_linked_temporarily; + RRDLABELS *new_host_labels; + RRDLABELS *chart_rrdlabels_linked_temporarily; size_t data_collections_count; int enabled; @@ -55,7 +58,7 @@ typedef struct parser_user_object { uuid_t machine_guid; char machine_guid_str[UUID_STR_LEN]; STRING *hostname; - DICTIONARY *rrdlabels; + RRDLABELS *rrdlabels; } host_define; struct parser_user_object_replay { @@ -151,15 +154,15 @@ static inline int parser_action(PARSER *parser, char *input) { parser->line++; if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { - char command[PLUGINSD_LINE_MAX + 1]; - bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, isspace_map_pluginsd); + char command[100 + 1]; + bool has_keyword = find_first_keyword(input, command, 100, isspace_map_pluginsd); if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { if(parser->defer.response) { buffer_strcat(parser->defer.response, input); - if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { - // more than 10MB of data - // a bad plugin that did not send the end_keyword + if(buffer_strlen(parser->defer.response) > PLUGINSD_MAX_DEFERRED_SIZE) { + // more than PLUGINSD_MAX_DEFERRED_SIZE of data, + // or a bad plugin that did not send the end_keyword internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); return 1; } @@ -180,7 +183,7 @@ static inline int parser_action(PARSER *parser, char *input) { return 0; } - char *words[PLUGINSD_MAX_WORDS]; + static __thread char *words[PLUGINSD_MAX_WORDS]; size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); const char *command = get_word(words, num_words, 0); |