diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-11-30 18:47:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-11-30 18:47:05 +0000 |
commit | 97e01009d69b8fbebfebf68f51e3d126d0ed43fc (patch) | |
tree | 02e8b836c3a9d89806f3e67d4a5fe9f52dbb0061 /collectors/plugins.d | |
parent | Releasing debian version 1.36.1-1. (diff) | |
download | netdata-97e01009d69b8fbebfebf68f51e3d126d0ed43fc.tar.xz netdata-97e01009d69b8fbebfebf68f51e3d126d0ed43fc.zip |
Merging upstream version 1.37.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/README.md | 86 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 128 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 78 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 1476 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 33 |
5 files changed, 1167 insertions, 634 deletions
diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md index 0741636b..2ecf233f 100644 --- a/collectors/plugins.d/README.md +++ b/collectors/plugins.d/README.md @@ -116,17 +116,19 @@ For example, if your plugin wants to monitor `squid`, you can search for it on p Any program that can print a few values to its standard output can become a Netdata external plugin. -Netdata parses 9 lines starting with: +Netdata parses lines starting with: - `CHART` - create or update a chart - `DIMENSION` - add or update a dimension to the chart just created +- `VARIABLE` - define a variable (to be used in health calculations) +- `CLABEL` - add a label to a chart +- `CLABEL_COMMIT` - commit added labels to the chart +- `FUNCTION` - define a function that can be called later to execute it - `BEGIN` - initialize data collection for a chart - `SET` - set the value of a dimension for the initialized chart - `END` - complete data collection for the initialized chart - `FLUSH` - ignore the last collected values - `DISABLE` - disable this plugin -- `CLABEL` - add a label to a chart -- `CLABEL_COMMIT` - commit added labels to the chart. a single program can produce any number of charts with any number of dimensions each. @@ -362,6 +364,80 @@ The `source` is an integer field that can have the following values: `CLABEL_COMMIT` indicates that all labels were defined and the chart can be updated. +#### FUNCTION + +> FUNCTION [GLOBAL] "name and parameters of the function" timeout "help string for users" + +A function can be used by users to ask for more information from the collector. Netdata maintains a registry of functions in 2 levels: + +- per node +- per chart + +Both node and chart functions are exactly the same, but chart functions allow Netdata to relate functions with charts and therefore present a context sensitive menu of functions related to the chart the user is using. + +A function is identified by a string. The allowed characters in the function definition are: + +| Character | Symbol | In Functions | +|-------------------|:------:|:------------:| +| UTF-8 character | UTF-8 | keep | +| Lower case letter | [a-z] | keep | +| Upper case letter | [A-Z] | keep | +| Digit | [0-9] | keep | +| Underscore | _ | keep | +| Comma | , | keep | +| Minus | - | keep | +| Period | . | keep | +| Colon | : | keep | +| Slash | / | keep | +| Space | ' ' | keep | +| Semicolon | ; | : | +| Equal | = | : | +| Backslash | \ | / | +| Anything else | | _ | + +Uses can get a list of all the registered functions using the `/api/v1/functions` end point of Netdata. + +Users can call functions using the `/api/v1/function` end point of Netdata. +Once a function is called, the plugin will receive at its standard input a command that looks like this: + +> FUNCTION transaction_id timeout "name and parameters of the function" + +The plugin is expected to parse and validate `name and parameters of the function`. Netdata allows users to edit this string, append more parameters or even change the ones the plugin originally exposed. To minimize the security risk, Netdata guarantees that only the characters shown above are accepted in function definitions, but still the plugin should carefully inspect the `name and parameters of the function` to ensure that it is valid and not harmful. + +If the plugin rejects the request, it should respond with this: + +``` +FUNCTION_RESULT_BEGIN transaction_id 400 application/json +{ + "status": 400, + "error_message": "description of the rejection reasons" +} +FUNCTION_RESULT_END +``` + +If the plugin prepares a response, it should send (via its standard output, together with the collected data, but not interleaved with them): + +> FUNCTION_RESULT_BEGIN transaction_id http_error_code content_type expiration + +Where: + + - `transaction_id` is the transaction id that Netdata sent for this function execution + - `http_error` is the http error code Netdata should respond with, 200 is the "ok" response + - `content_type` is the content type of the response + - `expiration` is the absolute timestamp (number, unix epoch) this response expires + +Immediately after this, all text is assumed to be the response content. +The content is text and line oriented. The maximum line length accepted is 15kb. Longer lines will be truncated. +The type of the context itself depends on the plugin and the UI. + +To terminate the message, Netdata seeks a line with just this: + +> FUNCTION_RESULT_END + +This defines the end of the message. `FUNCTION_RESULT_END` should appear in a line alone, without any other text, so it is wise to add `\n` before and after it. + +After this line, Netdata resumes processing collected metrics from the plugin. + ## Data collection data collection is defined as a series of `BEGIN` -> `SET` -> `END` lines @@ -463,7 +539,7 @@ There are a few rules for writing plugins properly: readConfiguration(); if(!verifyWeCanCollectValues()) { - print "DISABLE"; + print("DISABLE"); exit(1); } @@ -475,7 +551,7 @@ There are a few rules for writing plugins properly: var dt_since_last_run = 0; var now = 0; - FOREVER { + while(true) { /* find the current time in milliseconds */ now = currentTimeStampInMilliseconds(); diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 377ec140..79abc707 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -6,118 +6,7 @@ char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL }; struct plugind *pluginsd_root = NULL; -inline int pluginsd_space(char c) { - switch(c) { - case ' ': - case '\t': - case '\r': - case '\n': - case '=': - return 1; - - default: - return 0; - } -} - -inline int config_isspace(char c) -{ - switch (c) { - case ' ': - case '\t': - case '\r': - case '\n': - case ',': - return 1; - - default: - return 0; - } -} - -// split a text into words, respecting quotes -inline int quoted_strings_splitter(char *str, char **words, int max_words, int (*custom_isspace)(char), char *recover_input, char **recover_location, int max_recover) -{ - char *s = str, quote = 0; - int i = 0, rec = 0; - char *recover = recover_input; - - // skip all white space - while (unlikely(custom_isspace(*s))) - s++; - - // check for quote - if (unlikely(*s == '\'' || *s == '"')) { - quote = *s; // remember the quote - s++; // skip the quote - } - - // store the first word - words[i++] = s; - - // while we have something - while (likely(*s)) { - // if it is escape - if (unlikely(*s == '\\' && s[1])) { - s += 2; - continue; - } - - // if it is quote - else if (unlikely(*s == quote)) { - quote = 0; - if (recover && rec < max_recover) { - recover_location[rec++] = s; - *recover++ = *s; - } - *s = ' '; - continue; - } - - // if it is a space - else if (unlikely(quote == 0 && custom_isspace(*s))) { - // terminate the word - if (recover && rec < max_recover) { - if (!rec || (rec && recover_location[rec-1] != s)) { - recover_location[rec++] = s; - *recover++ = *s; - } - } - *s++ = '\0'; - - // skip all white space - while (likely(custom_isspace(*s))) - s++; - - // check for quote - if (unlikely(*s == '\'' || *s == '"')) { - quote = *s; // remember the quote - s++; // skip the quote - } - - // if we reached the end, stop - if (unlikely(!*s)) - break; - - // store the next word - if (likely(i < max_words)) - words[i++] = s; - else - break; - } - - // anything else - else - s++; - } - - // terminate the words - memset(&words[i], 0, (max_words - i) * sizeof (char *)); - - return i; -} - -inline int pluginsd_initialize_plugin_directories() +inline size_t pluginsd_initialize_plugin_directories() { char plugins_dirs[(FILENAME_MAX * 2) + 1]; static char *plugins_dir_list = NULL; @@ -132,12 +21,6 @@ inline int pluginsd_initialize_plugin_directories() return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0); } -inline int pluginsd_split_words(char *str, char **words, int max_words, char *recover_input, char **recover_location, int max_recover) -{ - return quoted_strings_splitter(str, words, max_words, pluginsd_space, recover_input, recover_location, max_recover); -} - - static void pluginsd_worker_thread_cleanup(void *arg) { struct plugind *cd = (struct plugind *)arg; @@ -238,18 +121,19 @@ void *pluginsd_worker_thread(void *arg) size_t count = 0; while (!netdata_exit) { - FILE *fp = mypopen(cd->cmd, &cd->pid); - if (unlikely(!fp)) { + FILE *fp_child_input = NULL; + FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input); + if (unlikely(!fp_child_input || !fp_child_output)) { error("Cannot popen(\"%s\", \"r\").", cd->cmd); break; } info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid); - count = pluginsd_process(localhost, cd, fp, 0); + count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0); error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count); killpid(cd->pid); - int worker_ret_code = mypclose(fp, cd->pid); + int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->pid); if (likely(worker_ret_code == 0)) pluginsd_worker_thread_handle_success(cd); diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index e0b8ac57..a8acf038 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -10,23 +10,34 @@ #define PLUGINSD_CMD_MAX (FILENAME_MAX*2) #define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0 -#define PLUGINSD_KEYWORD_CHART "CHART" -#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" -#define PLUGINSD_KEYWORD_BEGIN "BEGIN" -#define PLUGINSD_KEYWORD_END "END" -#define PLUGINSD_KEYWORD_FLUSH "FLUSH" -#define PLUGINSD_KEYWORD_DISABLE "DISABLE" -#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" -#define PLUGINSD_KEYWORD_LABEL "LABEL" -#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" -#define PLUGINSD_KEYWORD_GUID "GUID" -#define PLUGINSD_KEYWORD_CONTEXT "CONTEXT" -#define PLUGINSD_KEYWORD_TOMBSTONE "TOMBSTONE" -#define PLUGINSD_KEYWORD_HOST "HOST" - - -#define PLUGINSD_LINE_MAX 1024 +#define PLUGINSD_KEYWORD_CHART "CHART" +#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END" +#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" +#define PLUGINSD_KEYWORD_BEGIN "BEGIN" +#define PLUGINSD_KEYWORD_SET "SET" +#define PLUGINSD_KEYWORD_END "END" +#define PLUGINSD_KEYWORD_FLUSH "FLUSH" +#define PLUGINSD_KEYWORD_DISABLE "DISABLE" +#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" +#define PLUGINSD_KEYWORD_LABEL "LABEL" +#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" +#define PLUGINSD_KEYWORD_CLABEL "CLABEL" +#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT" +#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION" +#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN" +#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END" + +#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART" +#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN" +#define PLUGINSD_KEYWORD_REPLAY_SET "RSET" +#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE" +#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" +#define PLUGINSD_KEYWORD_REPLAY_END "REND" + +#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds + #define PLUGINSD_LINE_MAX_SSL_READ 512 + #define PLUGINSD_MAX_WORDS 20 #define PLUGINSD_MAX_DIRECTORIES 20 @@ -53,19 +64,40 @@ struct plugind { volatile sig_atomic_t enabled; // if this is enabled or not time_t started_t; - uint32_t version; + uint32_t capabilities; // follows the same principles as streaming capabilities struct plugind *next; }; extern struct plugind *pluginsd_root; -extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations); -extern int pluginsd_split_words(char *str, char **words, int max_words, char *recover_string, char **recover_location, int max_recover); +size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); + +size_t pluginsd_initialize_plugin_directories(); + + + +#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \ + buffer_sprintf(wb \ + , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ + , (transaction) ? (transaction) : "" \ + , (int)(code) \ + , (content_type) ? (content_type) : "" \ + , (long int)(expires) \ + ) + +#define pluginsd_function_result_end_to_buffer(wb) \ + buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") -extern int pluginsd_initialize_plugin_directories(); +#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \ + fprintf(stdout \ + , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ + , (transaction) ? (transaction) : "" \ + , (int)(code) \ + , (content_type) ? (content_type) : "" \ + , (long int)(expires) \ + ) -extern int config_isspace(char c); -extern int pluginsd_space(char c); -int quoted_strings_splitter(char *str, char **words, int max_words, int (*custom_isspace)(char), char *recover_input, char **recover_location, int max_recover); +#define pluginsd_function_result_end_to_stdout() \ + fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") #endif /* NETDATA_PLUGINS_D_H */ diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 88e07fab..5501c12f 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -2,336 +2,223 @@ #include "pluginsd_parser.h" -/* - * This is the action defined for the FLUSH command - */ -PARSER_RC pluginsd_set_action(void *user, RRDSET *st, RRDDIM *rd, long long int value) -{ - UNUSED(user); - - rrddim_set_by_pointer(st, rd, value); - return PARSER_RC_OK; -} +#define LOG_FUNCTIONS false -PARSER_RC pluginsd_flush_action(void *user, RRDSET *st) -{ - UNUSED(user); - UNUSED(st); - return PARSER_RC_OK; -} +static int send_to_plugin(const char *txt, void *data) { + PARSER *parser = data; -PARSER_RC pluginsd_begin_action(void *user, RRDSET *st, usec_t microseconds, int trust_durations) -{ - UNUSED(user); - if (likely(st->counter_done)) { - if (likely(microseconds)) { - if (trust_durations) - rrdset_next_usec_unfiltered(st, microseconds); - else - rrdset_next_usec(st, microseconds); - } else - rrdset_next(st); - } - return PARSER_RC_OK; -} - - -PARSER_RC pluginsd_end_action(void *user, RRDSET *st) -{ - UNUSED(user); - - rrdset_done(st); - return PARSER_RC_OK; -} + if(!txt || !*txt) + return 0; -PARSER_RC pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin, - char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options) -{ - RRDSET *st = NULL; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; +#ifdef ENABLE_HTTPS + struct netdata_ssl *ssl = parser->ssl_output; + if(ssl) { + if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt)); - st = rrdset_create( - host, type, id, name, family, context, title, units, - plugin, module, priority, update_every, - chart_type); + error("PLUGINSD: cannot send command (SSL)"); + return -1; + } +#endif - if (options && *options) { - if (strstr(options, "obsolete")) - rrdset_is_obsolete(st); - else - rrdset_isnot_obsolete(st); + if(parser->fp_output) { + int bytes = fprintf(parser->fp_output, "%s", txt); + if(bytes <= 0) { + error("PLUGINSD: cannot send command (FILE)"); + return -2; + } + fflush(parser->fp_output); + return bytes; + } - if (strstr(options, "detail")) - rrdset_flag_set(st, RRDSET_FLAG_DETAIL); - else - rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); + if(parser->fd != -1) { + size_t bytes = 0; + size_t total = strlen(txt); + ssize_t sent; - if (strstr(options, "hidden")) - rrdset_flag_set(st, RRDSET_FLAG_HIDDEN); - else - rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN); + do { + sent = write(parser->fd, &txt[bytes], total - bytes); + if(sent <= 0) { + error("PLUGINSD: cannot send command (fd)"); + return -3; + } + bytes += sent; + } + while(bytes < total); - if (strstr(options, "store_first")) - rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST); - else - rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); - } else { - rrdset_isnot_obsolete(st); - rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); - rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); + return (int)bytes; } - ((PARSER_USER_OBJECT *)user)->st = st; - return PARSER_RC_OK; + error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); + return -4; } +static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) { + RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; -PARSER_RC pluginsd_disable_action(void *user) -{ - UNUSED(user); + if(unlikely(!host)) + error("PLUGINSD: command %s requires a host, but is not set.", cmd); - info("called DISABLE. Disabling it."); - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; + return host; } +static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) { + RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; -PARSER_RC pluginsd_variable_action(void *user, RRDHOST *host, RRDSET *st, char *name, int global, NETDATA_DOUBLE value) -{ - UNUSED(user); + if(unlikely(!st)) + error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd); - if (global) { - RRDVAR *rv = rrdvar_custom_host_variable_create(host, name); - if (rv) - rrdvar_custom_host_variable_set(host, rv, value); - else - error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, host->hostname); - } else { - RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(st, name); - if (rs) - rrdsetvar_custom_chart_variable_set(rs, value); - else - error("cannot find/create CHART VARIABLE '%s' on host '%s', chart '%s'", name, host->hostname, st->id); - } - return PARSER_RC_OK; + return st; } - - -PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options, - RRD_ALGORITHM algorithm_type) -{ - UNUSED(user); - UNUSED(algorithm); - - RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, algorithm_type); - int unhide_dimension = 1; - - rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS); - if (options && *options) { - if (strstr(options, "obsolete") != NULL) - rrddim_is_obsolete(st, rd); - else - rrddim_isnot_obsolete(st, rd); - - unhide_dimension = !strstr(options, "hidden"); - - if (strstr(options, "noreset") != NULL) - rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS); - if (strstr(options, "nooverflow") != NULL) - rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS); - } else - rrddim_isnot_obsolete(st, rd); - - if (likely(unhide_dimension)) { - rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN); - if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - (void)sql_set_dimension_option(&rd->metric_uuid, NULL); - rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); - } - } else { - rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN); - if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - (void)sql_set_dimension_option(&rd->metric_uuid, "hidden"); - rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); - } +static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { + if (unlikely(!dimension || !*dimension)) { + error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", + rrdhost_hostname(host), rrdset_id(st), cmd); + return NULL; } - return PARSER_RC_OK; -} - -PARSER_RC pluginsd_label_action(void *user, char *key, char *value, RRDLABEL_SRC source) -{ - - if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels)) - ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create(); - rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, key, value, source); + RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); - return PARSER_RC_OK; -} + if (unlikely(!rda)) + error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", + rrdhost_hostname(host), rrdset_id(st), dimension, cmd); -PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, RRDLABEL_SRC source) -{ - if(unlikely(!((PARSER_USER_OBJECT *) user)->new_chart_labels)) - ((PARSER_USER_OBJECT *) user)->new_chart_labels = rrdlabels_create(); - - rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_chart_labels, key, value, source); - - return PARSER_RC_OK; + return rda; } -PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, DICTIONARY *new_chart_labels) -{ - RRDSET *st = ((PARSER_USER_OBJECT *)user)->st; - if (unlikely(!st)) { - error("requested CLABEL_COMMIT on host '%s', without a BEGIN, ignoring it.", host->hostname); - return PARSER_RC_OK; +static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { + if (unlikely(!chart || !*chart)) { + error("PLUGINSD: 'host:%s' got a %s without a chart id.", + rrdhost_hostname(host), cmd); + return NULL; } - rrdset_update_rrdlabels(st, new_chart_labels); + RRDSET *st = rrdset_find(host, chart); + if (unlikely(!st)) + error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.", + rrdhost_hostname(host), chart, cmd); - return PARSER_RC_OK; + return st; } -PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, DICTIONARY *new_host_labels) -{ - UNUSED(user); - - if(!host->host_labels) - host->host_labels = rrdlabels_create(); - - rrdlabels_migrate_to_these(host->host_labels, new_host_labels); - sql_store_host_labels(host); - - return PARSER_RC_OK; +static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) { + ((PARSER_USER_OBJECT *) user)->enabled = 0; + return PARSER_RC_ERROR; } -PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) { - char *dimension = words[1]; - char *value = words[2]; + char *dimension = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!dimension || !*dimension)) { - error("requested a SET on chart '%s' of host '%s', without a dimension. Disabling it.", st->id, host->hostname); - goto disable; - } + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!value || !*value)) - value = NULL; + RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); + if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!st)) { - error( - "requested a SET on dimension %s with value %s on host '%s', without a BEGIN. Disabling it.", dimension, - value ? value : "<nothing>", host->hostname); - goto disable; - } + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "is setting dimension %s/%s to %s", st->id, dimension, value ? value : "<nothing>"); - - if (value) { - RRDDIM *rd = rrddim_find(st, dimension); - if (unlikely(!rd)) { - error( - "requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.", - dimension, st->name, st->id, st->rrdhost->hostname); - goto disable; - } else { - if (plugins_action->set_action) { - return plugins_action->set_action( - user, st, rd, strtoll(value, NULL, 0)); - } - } - } - return PARSER_RC_OK; + debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", + rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET"); -disable: - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; + if (value && *value) + rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0)); + + rrddim_acquired_release(rda); + return PARSER_RC_OK; } -PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) { - char *id = words[1]; - char *microseconds_txt = words[2]; + char *id = get_word(words, num_words, 1); + char *microseconds_txt = get_word(words, num_words, 2); - RRDSET *st = NULL; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!id)) { - error("requested a BEGIN without a chart id for host '%s'. Disabling it.", host->hostname); - goto disable; - } + RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - st = rrdset_find(host, id); - if (unlikely(!st)) { - error("requested a BEGIN on chart '%s', which does not exist on host '%s'. Disabling it.", id, host->hostname); - goto disable; - } ((PARSER_USER_OBJECT *)user)->st = st; usec_t microseconds = 0; if (microseconds_txt && *microseconds_txt) microseconds = str2ull(microseconds_txt); - if (plugins_action->begin_action) { - return plugins_action->begin_action(user, st, microseconds, - ((PARSER_USER_OBJECT *)user)->trust_durations); +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if(st->replay.log_next_data_collection) { + st->replay.log_next_data_collection = false; + + internal_error(true, + "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu", + rrdhost_hostname(host), rrdset_id(st), + st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec, + st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec, + microseconds + ); + } +#endif + + if (likely(st->counter_done)) { + if (likely(microseconds)) { + if (((PARSER_USER_OBJECT *)user)->trust_durations) + rrdset_next_usec_unfiltered(st, microseconds); + else + rrdset_next_usec(st, microseconds); + } + else + rrdset_next(st); } return PARSER_RC_OK; -disable: - ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PARSER_RC_ERROR; } -PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) { UNUSED(words); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + UNUSED(num_words); - if (unlikely(!st)) { - error("requested an END, without a BEGIN on host '%s'. Disabling it.", host->hostname); - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; - } + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "requested an END on chart %s", st->id); + debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); ((PARSER_USER_OBJECT *) user)->st = NULL; ((PARSER_USER_OBJECT *) user)->count++; - if (plugins_action->end_action) { - return plugins_action->end_action(user, st); - } + + struct timeval now; + now_realtime_timeval(&now); + rrdset_timed_done(st, now, /* pending_rrdset_next = */ false); + return PARSER_RC_OK; } -PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) { - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; - if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) { - debug(D_PLUGINSD, "Ignoring chart belonging to missing or ignored host."); - return PARSER_RC_OK; - } - - char *type = words[1]; - char *name = words[2]; - char *title = words[3]; - char *units = words[4]; - char *family = words[5]; - char *context = words[6]; - char *chart = words[7]; - char *priority_s = words[8]; - char *update_every_s = words[9]; - char *options = words[10]; - char *plugin = words[11]; - char *module = words[12]; - - int have_action = ((plugins_action->chart_action) != NULL); + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + char *type = get_word(words, num_words, 1); + char *name = get_word(words, num_words, 2); + char *title = get_word(words, num_words, 3); + char *units = get_word(words, num_words, 4); + char *family = get_word(words, num_words, 5); + char *context = get_word(words, num_words, 6); + char *chart = get_word(words, num_words, 7); + char *priority_s = get_word(words, num_words, 8); + char *update_every_s = get_word(words, num_words, 9); + char *options = get_word(words, num_words, 10); + char *plugin = get_word(words, num_words, 11); + char *module = get_word(words, num_words, 12); // parse the id from type char *id = NULL; @@ -342,10 +229,9 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act // make sure we have the required variables if (unlikely((!type || !*type || !id || !*id))) { - if (likely(host)) - error("requested a CHART, without a type.id, on host '%s'. Disabling it.", host->hostname); - else - error("requested a CHART, without a type.id. Disabling it."); + error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.", + rrdhost_hostname(host)); + ((PARSER_USER_OBJECT *) user)->enabled = 0; return PARSER_RC_ERROR; } @@ -396,42 +282,120 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type), priority, update_every); - if (have_action) { - return plugins_action->chart_action( - user, type, id, name, family, context, title, units, - (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, module, priority, update_every, - chart_type, options); + RRDSET *st = NULL; + + st = rrdset_create( + host, type, id, name, family, context, title, units, + (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, + module, priority, update_every, + chart_type); + + if (likely(st)) { + if (options && *options) { + if (strstr(options, "obsolete")) + rrdset_is_obsolete(st); + else + rrdset_isnot_obsolete(st); + + if (strstr(options, "detail")) + rrdset_flag_set(st, RRDSET_FLAG_DETAIL); + else + rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); + + if (strstr(options, "hidden")) + rrdset_flag_set(st, RRDSET_FLAG_HIDDEN); + else + rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN); + + if (strstr(options, "store_first")) + rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST); + else + rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); + } else { + rrdset_isnot_obsolete(st); + rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); + rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); + } } + ((PARSER_USER_OBJECT *)user)->st = st; return PARSER_RC_OK; } -PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user) { - char *id = words[1]; - char *name = words[2]; - char *algorithm = words[3]; - char *multiplier_s = words[4]; - char *divisor_s = words[5]; - char *options = words[6]; - - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; - if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) { - debug(D_PLUGINSD, "Ignoring dimension belonging to missing or ignored host."); - return PARSER_RC_OK; + const char *first_entry_txt = get_word(words, num_words, 1); + const char *last_entry_txt = get_word(words, num_words, 2); + const char *world_time_txt = get_word(words, num_words, 3); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; + time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; + time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec(); + + if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0)) + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).", + rrdhost_hostname(host), rrdset_id(st), + first_entry_child, last_entry_child, child_world_time); + + bool ok = true; + if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); + + PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; + ok = replicate_chart_request(send_to_plugin, parser, host, st, + first_entry_child, last_entry_child, child_world_time, + 0, 0); + } +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + else { + internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); } +#endif + + return ok ? PARSER_RC_OK : PARSER_RC_ERROR; +} + +PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) +{ + char *id = get_word(words, num_words, 1); + char *name = get_word(words, num_words, 2); + char *algorithm = get_word(words, num_words, 3); + char *multiplier_s = get_word(words, num_words, 4); + char *divisor_s = get_word(words, num_words, 5); + char *options = get_word(words, num_words, 6); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); if (unlikely(!id)) { - error( - "requested a DIMENSION, without an id, host '%s' and chart '%s'. Disabling it.", host->hostname, - st ? st->id : "UNSET"); - goto disable; + error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.", + rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET"); + return PLUGINSD_DISABLE_PLUGIN(user); } if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) { - error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", host->hostname); - goto disable; + error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.", + rrdhost_hostname(host)); + return PLUGINSD_DISABLE_PLUGIN(user); } long multiplier = 1; @@ -455,316 +419,906 @@ PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins debug( D_PLUGINSD, "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'", - st->id, id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor, + rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor, options ? options : ""); - if (plugins_action->dimension_action) { - return plugins_action->dimension_action( - user, st, id, name, algorithm, - multiplier, divisor, (options && *options)?options:NULL, rrd_algorithm_id(algorithm)); + RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm)); + int unhide_dimension = 1; + + rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + if (options && *options) { + if (strstr(options, "obsolete") != NULL) + rrddim_is_obsolete(st, rd); + else + rrddim_isnot_obsolete(st, rd); + + unhide_dimension = !strstr(options, "hidden"); + + if (strstr(options, "noreset") != NULL) + rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + if (strstr(options, "nooverflow") != NULL) + rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + } else + rrddim_isnot_obsolete(st, rd); + + if (likely(unhide_dimension)) { + rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN); + if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { + rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); + metaqueue_dimension_update_flags(rd); + } + } + else { + rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN); + if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { + rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); + metaqueue_dimension_update_flags(rd); + } + } + + return PARSER_RC_OK; +} + +// ---------------------------------------------------------------------------- +// execution of functions + +struct inflight_function { + int code; + int timeout; + BUFFER *destination_wb; + STRING *function; + void (*callback)(BUFFER *wb, int code, void *callback_data); + void *callback_data; + usec_t timeout_ut; + usec_t started_ut; + usec_t sent_ut; +}; + +static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { + struct inflight_function *pf = func; + + PARSER *parser = parser_ptr; + + // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller + pf->code = HTTP_RESP_GATEWAY_TIMEOUT; + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n", + dictionary_acquired_item_name(item), + pf->timeout, + string2str(pf->function)); + + // send the command to the plugin + int ret = send_to_plugin(buffer, parser); + + pf->sent_ut = now_realtime_usec(); + + if(ret < 0) { + error("FUNCTION: failed to send function to plugin, error %d", ret); + rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); + } + else { + internal_error(LOG_FUNCTIONS, + "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + string2str(pf->function), dictionary_acquired_item_name(item), ret, + pf->sent_ut - pf->started_ut); + } +} + +static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) { + struct inflight_function *pf = new_func; + + error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); + pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); + pf->callback(pf->destination_wb, pf->code, pf->callback_data); + string_freez(pf->function); + + return false; +} + +static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) { + struct inflight_function *pf = func; + + internal_error(LOG_FUNCTIONS, + "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)", + string2str(pf->function), dictionary_acquired_item_name(item), + buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + + pf->callback(pf->destination_wb, pf->code, pf->callback_data); + string_freez(pf->function); +} + +void inflight_functions_init(PARSER *parser) { + parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser); + dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser); + dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser); +} + +static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { + parser->inflight.smaller_timeout = 0; + struct inflight_function *pf; + dfe_start_write(parser->inflight.functions, pf) { + if (pf->timeout_ut < now) { + internal_error(true, + "FUNCTION '%s' removing expired transaction '%s', after %llu usec.", + string2str(pf->function), pf_dfe.name, now - pf->started_ut); + + if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK) + pf->code = rrd_call_function_error(pf->destination_wb, + "Timeout waiting for collector response.", + HTTP_RESP_GATEWAY_TIMEOUT); + + dictionary_del(parser->inflight.functions, pf_dfe.name); + } + + else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = pf->timeout_ut; + } + dfe_done(pf); +} + +// this is the function that is called from +// rrd_call_function_and_wait() and rrd_call_function_async() +static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) { + PARSER *parser = collector_data; + + usec_t now = now_realtime_usec(); + + struct inflight_function tmp = { + .started_ut = now, + .timeout_ut = now + timeout * USEC_PER_SEC, + .destination_wb = destination_wb, + .timeout = timeout, + .function = string_strdupz(function), + .callback = callback, + .callback_data = callback_data, + }; + + uuid_t uuid; + uuid_generate_time(uuid); + + char key[UUID_STR_LEN]; + uuid_unparse_lower(uuid, key); + + dictionary_write_lock(parser->inflight.functions); + + // if there is any error, our dictionary callbacks will call the caller callback to notify + // the caller about the error - no need for error handling here. + dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + + if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = tmp.timeout_ut; + + // garbage collect stale inflight functions + if(parser->inflight.smaller_timeout < now) + inflight_functions_garbage_collect(parser, now); + + dictionary_write_unlock(parser->inflight.functions); + + return HTTP_RESP_OK; +} + +PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) +{ + bool global = false; + size_t i = 1; + if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { + i++; + global = true; + } + + char *name = get_word(words, num_words, i++); + char *timeout_s = get_word(words, num_words, i++); + char *help = get_word(words, num_words, i++); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION); + if(!host) return PARSER_RC_ERROR; + + RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART); + if(!st) global = true; + + if (unlikely(!timeout_s || !name || !help || (!global && !st))) { + error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.", + rrdhost_hostname(host), + st?rrdset_id(st):"(unset)", + global?"yes":"no", + name?name:"(unset)", + timeout_s?timeout_s:"(unset)", + help?help:"(unset)" + ); + return PARSER_RC_ERROR; + } + + int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; + if (timeout_s && *timeout_s) { + timeout = str2i(timeout_s); + if (unlikely(timeout <= 0)) + timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; } + PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; + rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser); + + return PARSER_RC_OK; +} + +static void pluginsd_function_result_end(struct parser *parser, void *action_data) { + STRING *key = action_data; + if(key) + dictionary_del(parser->inflight.functions, string2str(key)); + string_freez(key); +} + +PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user) +{ + char *key = get_word(words, num_words, 1); + char *status = get_word(words, num_words, 2); + char *format = get_word(words, num_words, 3); + char *expires = get_word(words, num_words, 4); + + if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) { + error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." + , key ? key : "(unset)" + , status ? status : "(unset)" + , format ? format : "(unset)" + , expires ? expires : "(unset)" + ); + } + + int code = (status && *status) ? str2i(status) : 0; + if (code <= 0) + code = HTTP_RESP_BACKEND_RESPONSE_INVALID; + + time_t expiration = (expires && *expires) ? str2l(expires) : 0; + + PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; + + struct inflight_function *pf = NULL; + + if(key && *key) + pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key); + + if(!pf) { + error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)"); + } + else { + if(format && *format) + pf->destination_wb->contenttype = functions_format_to_content_type(format); + + pf->code = code; + + pf->destination_wb->expires = expiration; + if(expiration <= now_realtime_sec()) + buffer_no_cacheable(pf->destination_wb); + else + buffer_cacheable(pf->destination_wb); + } + + parser->defer.response = (pf) ? pf->destination_wb : NULL; + parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END; + parser->defer.action = pluginsd_function_result_end; + parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL + parser->flags |= PARSER_DEFER_UNTIL_KEYWORD; + return PARSER_RC_OK; -disable: - ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PARSER_RC_ERROR; } -PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action) +// ---------------------------------------------------------------------------- + +PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) { - char *name = words[1]; - char *value = words[2]; + char *name = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); NETDATA_DOUBLE v; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; int global = (st) ? 0 : 1; if (name && *name) { if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) { global = 1; - name = words[2]; - value = words[3]; + name = get_word(words, num_words, 2); + value = get_word(words, num_words, 3); } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) { global = 0; - name = words[2]; - value = words[3]; + name = get_word(words, num_words, 2); + value = get_word(words, num_words, 3); } } if (unlikely(!name || !*name)) { - error("requested a VARIABLE on host '%s', without a variable name. Disabling it.", host->hostname); + error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.", + rrdhost_hostname(host), st ? rrdset_id(st):"UNSET"); + ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PARSER_RC_ERROR; + return PLUGINSD_DISABLE_PLUGIN(user); } if (unlikely(!value || !*value)) value = NULL; if (unlikely(!value)) { - error("cannot set %s VARIABLE '%s' on host '%s' to an empty value", (global) ? "HOST" : "CHART", name, - host->hostname); + error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + (global) ? "HOST" : "CHART", + name); return PARSER_RC_OK; } if (!global && !st) { - error("cannot find/create CHART VARIABLE '%s' on host '%s' without a chart", name, host->hostname); - return PARSER_RC_OK; + error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + name + ); + return PLUGINSD_DISABLE_PLUGIN(user); } char *endptr = NULL; v = (NETDATA_DOUBLE)str2ndd(value, &endptr); if (unlikely(endptr && *endptr)) { if (endptr == value) - error( - "the value '%s' of VARIABLE '%s' on host '%s' cannot be parsed as a number", value, name, - host->hostname); + error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name); else - error( - "the value '%s' of VARIABLE '%s' on host '%s' has leftovers: '%s'", value, name, host->hostname, - endptr); + error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name, + endptr); } - if (plugins_action->variable_action) { - return plugins_action->variable_action(user, host, st, name, global, v); + if (global) { + const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name); + if (rva) { + rrdvar_custom_host_variable_set(host, rva, v); + rrdvar_custom_host_variable_release(host, rva); + } + else + error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'", + rrdhost_hostname(host), + name); + } else { + const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name); + if (rsa) { + rrdsetvar_custom_chart_variable_set(st, rsa, v); + rrdsetvar_custom_chart_variable_release(st, rsa); + } + else + error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'", + rrdhost_hostname(host), rrdset_id(st), name); } return PARSER_RC_OK; } -PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - UNUSED(words); debug(D_PLUGINSD, "requested a FLUSH"); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; ((PARSER_USER_OBJECT *) user)->st = NULL; - if (plugins_action->flush_action) { - return plugins_action->flush_action(user, st); - } + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; return PARSER_RC_OK; } -PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused) { - UNUSED(user); - UNUSED(words); - - if (plugins_action->disable_action) { - return plugins_action->disable_action(user); - } + info("PLUGINSD: plugin called DISABLE. Disabling it."); + ((PARSER_USER_OBJECT *) user)->enabled = 0; return PARSER_RC_ERROR; } -PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) { - char *store; + const char *name = get_word(words, num_words, 1); + const char *label_source = get_word(words, num_words, 2); + const char *value = get_word(words, num_words, 3); - if (!words[1] || !words[2] || !words[3]) { - error("Ignoring malformed or empty LABEL command."); - return PARSER_RC_OK; + if (!name || !label_source || !value) { + error("PLUGINSD: ignoring malformed or empty LABEL command."); + return PLUGINSD_DISABLE_PLUGIN(user); } - if (!words[4]) - store = words[3]; - else { - store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char)); + + char *store = (char *)value; + bool allocated_store = false; + + if(unlikely(num_words > 4)) { + allocated_store = true; + store = mallocz(PLUGINSD_LINE_MAX + 1); size_t remaining = PLUGINSD_LINE_MAX; char *move = store; - int i = 3; - while (i < PLUGINSD_MAX_WORDS) { - size_t length = strlen(words[i]); - if ((length + 1) >= remaining) - break; - - remaining -= (length + 1); - memcpy(move, words[i], length); - move += length; - *move++ = ' '; + char *word; + for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) { + if(i > 3) { + *move++ = ' '; + *move = '\0'; + remaining--; + } + + size_t length = strlen(word); + if (length > remaining) + length = remaining; - i++; - if (!words[i]) - break; + remaining -= length; + memcpy(move, word, length); + move += length; + *move = '\0'; } } - if (plugins_action->label_action) { - PARSER_RC rc = plugins_action->label_action(user, words[1], store, strtol(words[2], NULL, 10)); - if (store != words[3]) - freez(store); - return rc; - } + if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels)) + ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create(); + + rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, + name, + store, + str2l(label_source)); - if (store != words[3]) + if (allocated_store) freez(store); + + return PARSER_RC_OK; +} + +PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) +{ + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + debug(D_PLUGINSD, "requested to OVERWRITE host labels"); + + if(unlikely(!host->rrdlabels)) + host->rrdlabels = rrdlabels_create(); + + rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels)); + metaqueue_store_host_labels(host->machine_guid); + + rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); + ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; return PARSER_RC_OK; } -PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action) + +PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) { - if (!words[1] || !words[2] || !words[3]) { + const char *name = get_word(words, num_words, 1); + const char *value = get_word(words, num_words, 2); + const char *label_source = get_word(words, num_words, 3); + + if (!name || !value || !*label_source) { error("Ignoring malformed or empty CHART LABEL command."); - return PARSER_RC_OK; + return PLUGINSD_DISABLE_PLUGIN(user); } - if (plugins_action->clabel_action) { - PARSER_RC rc = plugins_action->clabel_action(user, words[1], words[2], strtol(words[3], NULL, 10)); - return rc; + if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) { + ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels; + rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); } + rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, + name, value, str2l(label_source)); + return PARSER_RC_OK; } -PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - UNUSED(words); + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; debug(D_PLUGINSD, "requested to commit chart labels"); - PARSER_RC rc = PARSER_RC_OK; + if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) { + error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", + rrdhost_hostname(host)); + return PLUGINSD_DISABLE_PLUGIN(user); + } - if (plugins_action->clabel_commit_action) - rc = plugins_action->clabel_commit_action(user, host, ((PARSER_USER_OBJECT *)user)->new_chart_labels); + rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); - rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_chart_labels); - ((PARSER_USER_OBJECT *)user)->new_chart_labels = NULL; + rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); + rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - return rc; + ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL; + return PARSER_RC_OK; } -PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user) { - UNUSED(words); + char *id = get_word(words, num_words, 1); + char *start_time_str = get_word(words, num_words, 2); + char *end_time_str = get_word(words, num_words, 3); + char *child_now_str = get_word(words, num_words, 4); - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; - debug(D_PLUGINSD, "requested to OVERWRITE host labels"); + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); - PARSER_RC rc = PARSER_RC_OK; + RRDSET *st; + if (likely(!id || !*id)) + st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); + else + st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if (plugins_action->overwrite_action) - rc = plugins_action->overwrite_action(user, host, ((PARSER_USER_OBJECT *)user)->new_host_labels); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + ((PARSER_USER_OBJECT *) user)->st = st; - rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); - ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; + if(start_time_str && end_time_str) { + time_t start_time = (time_t)str2ul(start_time_str); + time_t end_time = (time_t)str2ul(end_time_str); - return rc; -} + time_t wall_clock_time = 0, tolerance; + bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child; + if(child_now_str) { + wall_clock_time = (time_t)str2ul(child_now_str); + tolerance = st->update_every + 1; + wall_clock_comes_from_child = true; + } -PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action) -{ - char *uuid_str = words[1]; - uuid_t uuid; + if(wall_clock_time <= 0) { + wall_clock_time = now_realtime_sec(); + tolerance = st->update_every + 5; + wall_clock_comes_from_child = false; + } - if (unlikely(!uuid_str)) { - error("requested a GUID, without a uuid."); - return PARSER_RC_ERROR; - } - if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) { - error("requested a GUID, without a valid uuid string."); - return PARSER_RC_ERROR; - } +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error( + (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), + "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).", + rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before); + + internal_error( + true, + "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time", + st->replay.after, st->replay.before); +#endif + + if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { + if (unlikely(end_time - start_time != st->update_every)) + rrdset_set_update_every(st, end_time - start_time); + + st->last_collected_time.tv_sec = end_time; + st->last_collected_time.tv_usec = 0; + + st->last_updated.tv_sec = end_time; + st->last_updated.tv_usec = 0; + + st->counter++; + st->counter_done++; + + // these are only needed for db mode RAM, SAVE, MAP, ALLOC + st->current_entry++; + if(st->current_entry >= st->entries) + st->current_entry -= st->entries; + + ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; + ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true; + + return PARSER_RC_OK; + } - debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str); - if (plugins_action->guid_action) { - return plugins_action->guid_action(user, &uuid); + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, + rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, + wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); } + // the child sends an RBEGIN without any parameters initially + // setting rset_enabled to false, means the RSET should not store any metrics + // to store metrics, the RBEGIN needs to have timestamps + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; return PARSER_RC_OK; } -PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) { - char *uuid_str = words[1]; - uuid_t uuid; + char *dimension = get_word(words, num_words, 1); + char *value_str = get_word(words, num_words, 2); + char *flags_str = get_word(words, num_words, 3); - if (unlikely(!uuid_str)) { - error("requested a CONTEXT, without a uuid."); - return PARSER_RC_ERROR; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) { + error_limit_static_thread_var(erl, 1, 0); + error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors", + rrdhost_hostname(host), rrdset_id(st)); + + // we have to return OK here + return PARSER_RC_OK; } - if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) { - error("requested a CONTEXT, without a valid uuid string."); - return PARSER_RC_ERROR; + + RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); + if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + + if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) { + error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + rrdhost_hostname(host), + rrdset_id(st), + dimension, + ((PARSER_USER_OBJECT *) user)->replay.start_time, + ((PARSER_USER_OBJECT *) user)->replay.end_time); + return PLUGINSD_DISABLE_PLUGIN(user); + } + + if (unlikely(!value_str || !*value_str)) + value_str = "NAN"; + + if(unlikely(!flags_str)) + flags_str = ""; + + if (likely(value_str)) { + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + + RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); + + if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) { + NETDATA_DOUBLE value = strtondd(value_str, NULL); + SN_FLAGS flags = SN_FLAG_NONE; + + char c; + while ((c = *flags_str++)) { + switch (c) { + case 'R': + flags |= SN_FLAG_RESET; + break; + + case 'E': + flags |= SN_EMPTY_SLOT; + value = NAN; + break; + + default: + error("unknown flag '%c'", c); + break; + } + } + + if (!netdata_double_isnumber(value)) { + value = NAN; + flags = SN_EMPTY_SLOT; + } + + rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags); + rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time; + rd->last_collected_time.tv_usec = 0; + rd->collections_counter++; + } + else { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd)); + } } - debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str); - if (plugins_action->context_action) { - return plugins_action->context_action(user, &uuid); + rrddim_acquired_release(rda); + return PARSER_RC_OK; +} + +PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user) +{ + char *dimension = get_word(words, num_words, 1); + char *last_collected_ut_str = get_word(words, num_words, 2); + char *last_collected_value_str = get_word(words, num_words, 3); + char *last_calculated_value_str = get_word(words, num_words, 4); + char *last_stored_value_str = get_word(words, num_words, 5); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec; + usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0; + if(last_collected_ut > dim_last_collected_ut) { + rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC; + rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC; } + rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0; + rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0; + rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0; + rrddim_acquired_release(rda); return PARSER_RC_OK; } -PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user) { - char *uuid_str = words[1]; - uuid_t uuid; + char *last_collected_ut_str = get_word(words, num_words, 1); + char *last_updated_ut_str = get_word(words, num_words, 2); - if (unlikely(!uuid_str)) { - error("requested a TOMBSTONE, without a uuid."); - return PARSER_RC_ERROR; - } - if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) { - error("requested a TOMBSTONE, without a valid uuid string."); - return PARSER_RC_ERROR; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec; + usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0; + if(last_collected_ut > chart_last_collected_ut) { + st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC; + st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC; } - debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str); - if (plugins_action->tombstone_action) { - return plugins_action->tombstone_action(user, &uuid); + usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec; + usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0; + if(last_updated_ut > chart_last_updated_ut) { + st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC; + st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC; } + st->counter++; + st->counter_done++; + return PARSER_RC_OK; } -PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) { - char *machine_guid = words[1]; - char *hostname = words[2]; - char *registry_hostname = words[3]; - char *update_every_s = words[4]; - char *os = words[5]; - char *timezone = words[6]; - char *tags = words[7]; - - int update_every = 1; - if (likely(update_every_s && *update_every_s)) - update_every = str2i(update_every_s); - if (unlikely(!update_every)) - update_every = 1; + if (num_words < 7) { // accepts 7, but the 7th is optional + error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); + return PARSER_RC_ERROR; + } - debug(D_PLUGINSD, "HOST PARSED: guid=%s, hostname=%s, reg_host=%s, update=%d, os=%s, timezone=%s, tags=%s", - machine_guid, hostname, registry_hostname, update_every, os, timezone, tags); + const char *update_every_child_txt = get_word(words, num_words, 1); + const char *first_entry_child_txt = get_word(words, num_words, 2); + const char *last_entry_child_txt = get_word(words, num_words, 3); + const char *start_streaming_txt = get_word(words, num_words, 4); + const char *first_entry_requested_txt = get_word(words, num_words, 5); + const char *last_entry_requested_txt = get_word(words, num_words, 6); + const char *child_world_time_txt = get_word(words, num_words, 7); // optional - if (plugins_action->host_action) { - return plugins_action->host_action( - user, machine_guid, hostname, registry_hostname, update_every, os, timezone, tags); + time_t update_every_child = (time_t)str2ul(update_every_child_txt); + time_t first_entry_child = (time_t)str2ul(first_entry_child_txt); + time_t last_entry_child = (time_t)str2ul(last_entry_child_txt); + + bool start_streaming = (strcmp(start_streaming_txt, "true") == 0); + time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt); + time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt); + + // the optional child world time + time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec(); + + PARSER_USER_OBJECT *user_object = user; + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, + "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu", + rrdhost_hostname(host), rrdset_id(st), + (unsigned long long)first_entry_child, (unsigned long long)last_entry_child, + start_streaming?"true":"false", + (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested, + (unsigned long long)child_world_time + ); +#endif + + ((PARSER_USER_OBJECT *) user)->st = NULL; + ((PARSER_USER_OBJECT *) user)->count++; + + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) { + time_t now = now_realtime_sec(); + time_t started = st->rrdhost->receiver->replication_first_time_t; + time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; + + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started)); } - return PARSER_RC_OK; + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + + st->counter++; + st->counter_done++; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; + if(start_streaming) + st->replay.log_next_data_collection = true; +#endif + + if (start_streaming) { + if (st->update_every != update_every_child) + rrdset_set_update_every(st, update_every_child); + + if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK); + rrdhost_receiver_replicating_charts_minus_one(st->rrdhost); + } +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + else + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.", + rrdhost_hostname(host), rrdset_id(st)); +#endif + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0); + + return PARSER_RC_OK; + } + + rrdcontext_updated_retention_rrdset(st); + + bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, + first_entry_child, last_entry_child, child_world_time, + first_entry_requested, last_entry_requested); + return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } static void pluginsd_process_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; + rrd_collector_finished(); parser_destroy(parser); } // New plugins.d parser -inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations) +inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) { int enabled = cd->enabled; - if (!fp || !enabled) { + if (!fp_plugin_input || !fp_plugin_output || !enabled) { cd->enabled = 0; return 0; } - if (unlikely(fileno(fp) == -1)) { - error("file descriptor given is not a valid stream"); + if (unlikely(fileno(fp_plugin_input) == -1)) { + error("input file descriptor given is not a valid stream"); cd->serial_failures++; return 0; } - clearerr(fp); + + if (unlikely(fileno(fp_plugin_output) == -1)) { + error("output file descriptor given is not a valid stream"); + cd->serial_failures++; + return 0; + } + + clearerr(fp_plugin_input); + clearerr(fp_plugin_output); PARSER_USER_OBJECT user = { .enabled = cd->enabled, @@ -773,25 +1327,15 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int .trust_durations = trust_durations }; - PARSER *parser = parser_init(host, &user, fp, PARSER_INPUT_SPLIT); + // fp_plugin_output = our input; fp_plugin_input = our output + PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); + + rrd_collector_started(); // this keeps the parser with its current value // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - parser->plugins_action->begin_action = &pluginsd_begin_action; - parser->plugins_action->flush_action = &pluginsd_flush_action; - parser->plugins_action->end_action = &pluginsd_end_action; - parser->plugins_action->disable_action = &pluginsd_disable_action; - parser->plugins_action->variable_action = &pluginsd_variable_action; - parser->plugins_action->dimension_action = &pluginsd_dimension_action; - parser->plugins_action->label_action = &pluginsd_label_action; - parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; - parser->plugins_action->chart_action = &pluginsd_chart_action; - parser->plugins_action->set_action = &pluginsd_set_action; - parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; - parser->plugins_action->clabel_action = &pluginsd_clabel_action; - user.parser = parser; while (likely(!parser_next(parser))) { diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 924d48b7..e18b43e5 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -5,7 +5,6 @@ #include "parser/parser.h" - typedef struct parser_user_object { PARSER *parser; RRDSET *st; @@ -14,29 +13,27 @@ typedef struct parser_user_object { struct plugind *cd; int trust_durations; DICTIONARY *new_host_labels; - DICTIONARY *new_chart_labels; + DICTIONARY *chart_rrdlabels_linked_temporarily; size_t count; int enabled; uint8_t st_exists; uint8_t host_exists; void *private; // the user can set this for private use -} PARSER_USER_OBJECT; -extern PARSER_RC pluginsd_set_action(void *user, RRDSET *st, RRDDIM *rd, long long int value); -extern PARSER_RC pluginsd_flush_action(void *user, RRDSET *st); -extern PARSER_RC pluginsd_begin_action(void *user, RRDSET *st, usec_t microseconds, int trust_durations); -extern PARSER_RC pluginsd_end_action(void *user, RRDSET *st); -extern PARSER_RC pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, - char *title, char *units, char *plugin, char *module, int priority, - int update_every, RRDSET_TYPE chart_type, char *options); -extern PARSER_RC pluginsd_disable_action(void *user); -extern PARSER_RC pluginsd_variable_action(void *user, RRDHOST *host, RRDSET *st, char *name, int global, NETDATA_DOUBLE value); -extern PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, - long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type); -extern PARSER_RC pluginsd_label_action(void *user, char *key, char *value, RRDLABEL_SRC source); -extern PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, DICTIONARY *new_host_labels); -extern PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, DICTIONARY *new_chart_labels); -extern PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, RRDLABEL_SRC source); + struct { + time_t start_time; + time_t end_time; + + usec_t start_time_ut; + usec_t end_time_ut; + time_t wall_clock_time; + + bool rset_enabled; + } replay; +} PARSER_USER_OBJECT; +PARSER_RC pluginsd_function(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user); +void inflight_functions_init(PARSER *parser); #endif //NETDATA_PLUGINSD_PARSER_H |