diff options
Diffstat (limited to 'parser')
-rw-r--r-- | parser/parser.c | 160 | ||||
-rw-r--r-- | parser/parser.h | 109 |
2 files changed, 172 insertions, 97 deletions
diff --git a/parser/parser.c b/parser/parser.c index c37d1e2c4..5b4c528de 100644 --- a/parser/parser.c +++ b/parser/parser.c @@ -1,10 +1,11 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "parser.h" +#include "collectors/plugins.d/pluginsd_parser.h" -static inline int find_keyword(char *str, char *keyword, int max_size, int (*custom_isspace)(char)) +inline int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)) { - char *s = str, *keyword_start; + const char *s = str, *keyword_start; while (unlikely(custom_isspace(*s))) s++; keyword_start = s; @@ -28,16 +29,22 @@ static inline int find_keyword(char *str, char *keyword, int max_size, int (*cus * */ -PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags) +PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) { PARSER *parser; parser = callocz(1, sizeof(*parser)); - parser->plugins_action = callocz(1, sizeof(PLUGINSD_ACTION)); parser->user = user; - parser->input = input; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; +#ifdef ENABLE_HTTPS + parser->ssl_output = ssl; +#endif parser->flags = flags; parser->host = host; + parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB; + inflight_functions_init(parser); #ifdef ENABLE_HTTPS parser->bytesleft = 0; @@ -45,18 +52,28 @@ PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE fl #endif if (unlikely(!(flags & PARSER_NO_PARSE_INIT))) { - parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); - parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); - parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); - parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); - parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); - parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); - parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); - parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end); - parser_add_keyword(parser, "CLABEL_COMMIT", pluginsd_clabel_commit); - parser_add_keyword(parser, "CLABEL", pluginsd_clabel); - parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); - parser_add_keyword(parser, "SET", pluginsd_set); + parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end); + parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); + parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); + parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); + parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); + parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); + parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); + parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); + parser_add_keyword(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); + + parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); + parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); + + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_rrdset_begin); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end); } return parser; @@ -133,7 +150,7 @@ int parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) tmp_keyword = callocz(1, sizeof(*tmp_keyword)); - tmp_keyword->worker_job_id = parser->worker_job_ids++; + tmp_keyword->worker_job_id = parser->worker_job_next_id++; tmp_keyword->keyword = strdupz(keyword); tmp_keyword->keyword_hash = keyword_hash; tmp_keyword->func[tmp_keyword->func_no++] = (void *) func; @@ -154,6 +171,8 @@ void parser_destroy(PARSER *parser) if (unlikely(!parser)) return; + dictionary_destroy(parser->inflight.functions); + PARSER_KEYWORD *tmp_keyword, *tmp_keyword_next; PARSER_DATA *tmp_parser_data, *tmp_parser_data_next; @@ -175,7 +194,6 @@ void parser_destroy(PARSER *parser) tmp_parser_data = tmp_parser_data_next; } - freez(parser->plugins_action); freez(parser); } @@ -205,19 +223,21 @@ int parser_next(PARSER *parser) } if (unlikely(parser->read_function)) - tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->input); + tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->fp_input); + else if(likely(parser->fp_input)) + tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->fp_input); else - tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->input); + tmp = NULL; if (unlikely(!tmp)) { if (unlikely(parser->eof_function)) { - int rc = parser->eof_function(parser->input); + int rc = parser->eof_function(parser->fp_input); error("read failed: user defined function returned %d", rc); } else { - if (feof((FILE *)parser->input)) + if (feof((FILE *)parser->fp_input)) error("read failed: end of file"); - else if (ferror((FILE *)parser->input)) + else if (ferror((FILE *)parser->fp_input)) error("read failed: input error"); else error("read failed: unknown error"); @@ -236,64 +256,97 @@ int parser_next(PARSER *parser) inline int parser_action(PARSER *parser, char *input) { - PARSER_RC rc = PARSER_RC_OK; - char *words[PLUGINSD_MAX_WORDS] = { NULL }; + parser->line++; + + PARSER_RC rc = PARSER_RC_OK; + char *words[PLUGINSD_MAX_WORDS]; char command[PLUGINSD_LINE_MAX + 1]; keyword_function action_function; keyword_function *action_function_list = NULL; - if (unlikely(!parser)) + if (unlikely(!parser)) { + internal_error(true, "parser is NULL"); return 1; + } + parser->recover_location[0] = 0x0; // if not direct input check if we have reprocessed this if (unlikely(!input && parser->flags & PARSER_INPUT_PROCESSED)) return 0; - PARSER_KEYWORD *tmp_keyword = parser->keyword; + PARSER_KEYWORD *tmp_keyword = parser->keyword; if (unlikely(!tmp_keyword)) { + internal_error(true, "called without a keyword"); return 1; } if (unlikely(!input)) input = parser->buffer; - if (unlikely(!find_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space))) + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + if (unlikely(!find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space))) return 0; - if ((parser->flags & PARSER_INPUT_ORIGINAL) == PARSER_INPUT_ORIGINAL) - pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS); + size_t num_words = 0; + if ((parser->flags & PARSER_INPUT_KEEP_ORIGINAL) == PARSER_INPUT_KEEP_ORIGINAL) + num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS); else - pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); uint32_t command_hash = simple_hash(command); - size_t worker_job_id = 0; + size_t worker_job_id = WORKER_UTILIZATION_MAX_JOB_TYPES + 1; // set an invalid value by default while(tmp_keyword) { - if (command_hash == tmp_keyword->keyword_hash && - (!strcmp(command, tmp_keyword->keyword))) { - action_function_list = &tmp_keyword->func[0]; - worker_job_id = tmp_keyword->worker_job_id; - break; + if (command_hash == tmp_keyword->keyword_hash && (!strcmp(command, tmp_keyword->keyword))) { + action_function_list = &tmp_keyword->func[0]; + worker_job_id = tmp_keyword->worker_job_id; + break; } tmp_keyword = tmp_keyword->next; } if (unlikely(!action_function_list)) { if (unlikely(parser->unknown_function)) - rc = parser->unknown_function(words, parser->user, NULL); + rc = parser->unknown_function(words, num_words, parser->user); else rc = PARSER_RC_ERROR; -#ifdef NETDATA_INTERNAL_CHECKS - error("Unknown keyword [%s]", input); -#endif } else { worker_is_busy(worker_job_id); while ((action_function = *action_function_list) != NULL) { - rc = action_function(words, parser->user, parser->plugins_action); + rc = action_function(words, num_words, parser->user); if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) - break; + break; + action_function_list++; } worker_is_idle(); @@ -302,6 +355,25 @@ inline int parser_action(PARSER *parser, char *input) if (likely(input == parser->buffer)) parser->flags |= PARSER_INPUT_PROCESSED; +#ifdef NETDATA_INTERNAL_CHECKS + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } +#endif + return (rc == PARSER_RC_ERROR); } diff --git a/parser/parser.h b/parser/parser.h index 1887318fc..ad7488389 100644 --- a/parser/parser.h +++ b/parser/parser.h @@ -7,6 +7,10 @@ #define PARSER_MAX_CALLBACKS 20 #define PARSER_MAX_RECOVER_KEYWORDS 128 +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) // PARSER return codes typedef enum parser_rc { @@ -15,43 +19,18 @@ typedef enum parser_rc { PARSER_RC_ERROR // Callback failed (abort rest of callbacks) } PARSER_RC; -typedef struct pluginsd_action { - PARSER_RC (*set_action)(void *user, RRDSET *st, RRDDIM *rd, long long int value); - PARSER_RC (*begin_action)(void *user, RRDSET *st, usec_t microseconds, int trust_durations); - PARSER_RC (*end_action)(void *user, RRDSET *st); - PARSER_RC (*chart_action) - (void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin, - char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options); - PARSER_RC (*dimension_action) - (void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options, - RRD_ALGORITHM algorithm_type); - - PARSER_RC (*flush_action)(void *user, RRDSET *st); - PARSER_RC (*disable_action)(void *user); - PARSER_RC (*variable_action)(void *user, RRDHOST *host, RRDSET *st, char *name, int global, NETDATA_DOUBLE value); - PARSER_RC (*label_action)(void *user, char *key, char *value, RRDLABEL_SRC source); - PARSER_RC (*overwrite_action)(void *user, RRDHOST *host, DICTIONARY *new_labels); - PARSER_RC (*clabel_action)(void *user, char *key, char *value, RRDLABEL_SRC source); - PARSER_RC (*clabel_commit_action)(void *user, RRDHOST *host, DICTIONARY *new_labels); - - PARSER_RC (*guid_action)(void *user, uuid_t *uuid); - PARSER_RC (*context_action)(void *user, uuid_t *uuid); - PARSER_RC (*tombstone_action)(void *user, uuid_t *uuid); - PARSER_RC (*host_action)(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, - char *timezone, char *tags); -} PLUGINSD_ACTION; - typedef enum parser_input_type { - PARSER_INPUT_SPLIT = 1 << 1, - PARSER_INPUT_ORIGINAL = 1 << 2, - PARSER_INPUT_PROCESSED = 1 << 3, - PARSER_NO_PARSE_INIT = 1 << 4, - PARSER_NO_ACTION_INIT = 1 << 5, + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_INPUT_KEEP_ORIGINAL = (1 << 2), + PARSER_INPUT_PROCESSED = (1 << 3), + PARSER_NO_PARSE_INIT = (1 << 4), + PARSER_NO_ACTION_INIT = (1 << 5), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 6), } PARSER_INPUT_TYPE; #define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL) -typedef PARSER_RC (*keyword_function)(char **, void *, PLUGINSD_ACTION *plugins_action); +typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data); typedef struct parser_keyword { size_t worker_job_id; @@ -68,15 +47,20 @@ typedef struct parser_data { } PARSER_DATA; typedef struct parser { - size_t worker_job_ids; + size_t worker_job_next_id; uint8_t version; // Parser version RRDHOST *host; - void *input; // Input source e.g. stream + int fd; // Socket + FILE *fp_input; // Input source e.g. stream + FILE *fp_output; // Stream to send commands to plugin +#ifdef ENABLE_HTTPS + struct netdata_ssl *ssl_output; +#endif PARSER_DATA *data; // extra input PARSER_KEYWORD *keyword; // List of parse keywords and functions - PLUGINSD_ACTION *plugins_action; void *user; // User defined structure to hold extra state between calls uint32_t flags; + size_t line; char *(*read_function)(char *buffer, long unsigned int, void *input); int (*eof_function)(void *input); @@ -89,9 +73,24 @@ typedef struct parser { char tmpbuffer[PLUGINSD_LINE_MAX]; char *readfrom; #endif + + struct { + const char *end_keyword; + BUFFER *response; + void (*action)(struct parser *parser, void *action_data); + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_timeout; + } inflight; + } PARSER; -PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags); +int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)); + +PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); int parser_next(PARSER *working_parser); int parser_action(PARSER *working_parser, char *input); @@ -99,22 +98,26 @@ int parser_push(PARSER *working_parser, char *line); void parser_destroy(PARSER *working_parser); int parser_recover_input(PARSER *working_parser); -extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations); - -extern PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action); -extern PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action); +size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); + +PARSER_RC pluginsd_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_flush(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_disable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_label(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user); + +PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user); #endif |