From 8020f71afd34d7696d7933659df2d763ab05542f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 16:31:17 +0200 Subject: Adding upstream version 1.37.1. Signed-off-by: Daniel Baumann --- parser/Makefile.am | 9 ++ parser/README.md | 147 ++++++++++++++++++++ parser/parser.c | 391 +++++++++++++++++++++++++++++++++++++++++++++++++++++ parser/parser.h | 123 +++++++++++++++++ 4 files changed, 670 insertions(+) create mode 100644 parser/Makefile.am create mode 100644 parser/README.md create mode 100644 parser/parser.c create mode 100644 parser/parser.h (limited to 'parser') diff --git a/parser/Makefile.am b/parser/Makefile.am new file mode 100644 index 0000000..02fe3a3 --- /dev/null +++ b/parser/Makefile.am @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) + diff --git a/parser/README.md b/parser/README.md new file mode 100644 index 0000000..c01972d --- /dev/null +++ b/parser/README.md @@ -0,0 +1,147 @@ +#### Introduction + +The parser will be used to process streaming and plugins input as well as metadata + +Usage + +1. Define a structure that will be used to share user state across calls +1. Initialize the parser using `parser_init` +2. Register keywords and associated callback function using `parser_add_keyword` +3. Register actions on the keywords +4. Start a loop until EOF + 1. Fetch the next line using `parser_next` + 2. Process the line using `parser_action` + 1. The registered callbacks are executed to parse the input + 2. The registered action for the callback is called for processing +4. Release the parser using `parser_destroy` +5. Release the user structure + +#### Functions + +---- +##### parse_init(RRDHOST *host, void *user, void *input, int flags) + +Initialize an internal parser with the specified user defined data structure that will be shared across calls. + +Input +- Host + - The host this parser will be dealing with. For streaming with SSL enabled for this host +- user + - User defined structure that is passed in all the calls +- input + - Where the parser will get the input from +- flags + - flags to define processing on the input + +Output +- A parser structure + + + +---- +##### parse_push(PARSER *parser, char *line) + +Push a new line for processing + +Input + +- parser + - The parser object as returned by the `parser_init` +- line + - The new line to process + + +Output +- The line will be injected into the stream and will be the next one to be processed + +Returns +- 0 line added +- 1 error detected + +---- +##### parse_add_keyword(PARSER *parser, char *keyword, keyword_function callback_function) + +The function will add callbacks for keywords. The callback function is defined as + +`typedef PARSER_RC (*keyword_function)(char **, void *);` + +Input + +- parser + - The parser object as returned by the `parser_init` +- keyword + - The keyword to register +- keyword_function + - The callback that will handle the keyword processing + * The callback function should return one of the following + * PARSER_RC_OK -- Callback was successful (continue with other callbacks) + * PARSER_RC_STOP -- Stop processing callbacks (return OK) + * PARSER_RC_ERROR -- Callback failed, exit + +Output +- The corresponding keyword and callback will be registered + +Returns +- 0 maximum callbacks already registered for this keyword +- > 0 which is the number of callbacks associated with this keyword. + + +---- +##### parser_next(PARSER *parser) +Return the next item to parse + +Input +- parser + - The parser object as returned by the `parser_init` + +Output +- The parser will store internally the next item to parse + +Returns +- 0 Next item fetched successfully +- 1 No more items to parse + +---- +##### parser_action(PARSER *parser, char *input) +Return the next item to parse + +Input +- parser + - The parser object as returned by the `parser_init` +- input + - Process the input specified instead of using the internal buffer + +Output +- The current keyword will be processed by calling all the registered callbacks + +Returns +- 0 Callbacks called successfully +- 1 Failed + +---- +##### parser_destroy(PARSER *parser) +Cleanup a previously allocated parser + +Input +- parser + - The parser object as returned by the `parser_init` + +Output +- The parser is deallocated + +Returns +- none + +---- +##### parser_recover_input(PARSER *parser) +Cleanup a previously allocated parser + +Input +- parser + - The parser object as returned by the `parser_init` + +Output +- The parser is deallocated + +Returns +- none diff --git a/parser/parser.c b/parser/parser.c new file mode 100644 index 0000000..5b4c528 --- /dev/null +++ b/parser/parser.c @@ -0,0 +1,391 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "parser.h" +#include "collectors/plugins.d/pluginsd_parser.h" + +inline int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)) +{ + const char *s = str, *keyword_start; + + while (unlikely(custom_isspace(*s))) s++; + keyword_start = s; + + while (likely(*s && !custom_isspace(*s)) && max_size > 1) { + *keyword++ = *s++; + max_size--; + } + *keyword = '\0'; + return max_size == 0 ? 0 : (int) (s - keyword_start); +} + +/* + * Initialize a parser + * user : as defined by the user, will be shared across calls + * input : main input stream (auto detect stream -- file, socket, pipe) + * buffer : This is the buffer to be used (if null a buffer of size will be allocated) + * size : buffer size either passed or will be allocated + * If the buffer is auto allocated, it will auto freed when the parser is destroyed + * + * + */ + +PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) +{ + PARSER *parser; + + parser = callocz(1, sizeof(*parser)); + parser->user = user; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; +#ifdef ENABLE_HTTPS + parser->ssl_output = ssl; +#endif + parser->flags = flags; + parser->host = host; + parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB; + inflight_functions_init(parser); + +#ifdef ENABLE_HTTPS + parser->bytesleft = 0; + parser->readfrom = NULL; +#endif + + if (unlikely(!(flags & PARSER_NO_PARSE_INIT))) { + parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end); + parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); + parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); + parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); + parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); + parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); + parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); + parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); + parser_add_keyword(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); + + parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); + parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); + + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_rrdset_begin); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state); + parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end); + } + + return parser; +} + + +/* + * Push a new line into the parsing stream + * + * This line will be the next one to process ie the next fetch will get this one + * + */ + +int parser_push(PARSER *parser, char *line) +{ + PARSER_DATA *tmp_parser_data; + + if (unlikely(!parser)) + return 1; + + if (unlikely(!line)) + return 0; + + tmp_parser_data = callocz(1, sizeof(*tmp_parser_data)); + tmp_parser_data->line = strdupz(line); + tmp_parser_data->next = parser->data; + parser->data = tmp_parser_data; + + return 0; +} + +/* + * Add a keyword and the corresponding function that will be called + * Multiple functions may be added + * Input : keyword + * : callback function + * : flags + * Output: > 0 registered function number + * : 0 Error + */ + +int parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) +{ + PARSER_KEYWORD *tmp_keyword; + + if (strcmp(keyword, "_read") == 0) { + parser->read_function = (void *) func; + return 0; + } + + if (strcmp(keyword, "_eof") == 0) { + parser->eof_function = (void *) func; + return 0; + } + + if (strcmp(keyword, "_unknown") == 0) { + parser->unknown_function = (void *) func; + return 0; + } + + uint32_t keyword_hash = simple_hash(keyword); + + tmp_keyword = parser->keyword; + + while (tmp_keyword) { + if (tmp_keyword->keyword_hash == keyword_hash && (!strcmp(tmp_keyword->keyword, keyword))) { + if (tmp_keyword->func_no == PARSER_MAX_CALLBACKS) + return 0; + tmp_keyword->func[tmp_keyword->func_no++] = (void *) func; + return tmp_keyword->func_no; + } + tmp_keyword = tmp_keyword->next; + } + + tmp_keyword = callocz(1, sizeof(*tmp_keyword)); + + tmp_keyword->worker_job_id = parser->worker_job_next_id++; + tmp_keyword->keyword = strdupz(keyword); + tmp_keyword->keyword_hash = keyword_hash; + tmp_keyword->func[tmp_keyword->func_no++] = (void *) func; + + worker_register_job_name(tmp_keyword->worker_job_id, tmp_keyword->keyword); + + tmp_keyword->next = parser->keyword; + parser->keyword = tmp_keyword; + return tmp_keyword->func_no; +} + +/* + * Cleanup a previously allocated parser + */ + +void parser_destroy(PARSER *parser) +{ + if (unlikely(!parser)) + return; + + dictionary_destroy(parser->inflight.functions); + + PARSER_KEYWORD *tmp_keyword, *tmp_keyword_next; + PARSER_DATA *tmp_parser_data, *tmp_parser_data_next; + + // Remove keywords + tmp_keyword = parser->keyword; + while (tmp_keyword) { + tmp_keyword_next = tmp_keyword->next; + freez(tmp_keyword->keyword); + freez(tmp_keyword); + tmp_keyword = tmp_keyword_next; + } + + // Remove pushed data if any + tmp_parser_data = parser->data; + while (tmp_parser_data) { + tmp_parser_data_next = tmp_parser_data->next; + freez(tmp_parser_data->line); + freez(tmp_parser_data); + tmp_parser_data = tmp_parser_data_next; + } + + freez(parser); +} + + +/* + * Fetch the next line to process + * + */ + +int parser_next(PARSER *parser) +{ + char *tmp = NULL; + + if (unlikely(!parser)) + return 1; + + parser->flags &= ~(PARSER_INPUT_PROCESSED); + + PARSER_DATA *tmp_parser_data = parser->data; + + if (unlikely(tmp_parser_data)) { + strncpyz(parser->buffer, tmp_parser_data->line, PLUGINSD_LINE_MAX); + parser->data = tmp_parser_data->next; + freez(tmp_parser_data->line); + freez(tmp_parser_data); + return 0; + } + + if (unlikely(parser->read_function)) + tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->fp_input); + else if(likely(parser->fp_input)) + tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->fp_input); + else + tmp = NULL; + + if (unlikely(!tmp)) { + if (unlikely(parser->eof_function)) { + int rc = parser->eof_function(parser->fp_input); + error("read failed: user defined function returned %d", rc); + } + else { + if (feof((FILE *)parser->fp_input)) + error("read failed: end of file"); + else if (ferror((FILE *)parser->fp_input)) + error("read failed: input error"); + else + error("read failed: unknown error"); + } + return 1; + } + return 0; +} + + +/* +* Takes an initialized parser object that has an unprocessed entry (by calling parser_next) +* and if it contains a valid keyword, it will execute all the callbacks +* +*/ + +inline int parser_action(PARSER *parser, char *input) +{ + parser->line++; + + PARSER_RC rc = PARSER_RC_OK; + char *words[PLUGINSD_MAX_WORDS]; + char command[PLUGINSD_LINE_MAX + 1]; + keyword_function action_function; + keyword_function *action_function_list = NULL; + + if (unlikely(!parser)) { + internal_error(true, "parser is NULL"); + return 1; + } + + parser->recover_location[0] = 0x0; + + // if not direct input check if we have reprocessed this + if (unlikely(!input && parser->flags & PARSER_INPUT_PROCESSED)) + return 0; + + PARSER_KEYWORD *tmp_keyword = parser->keyword; + if (unlikely(!tmp_keyword)) { + internal_error(true, "called without a keyword"); + return 1; + } + + if (unlikely(!input)) + input = parser->buffer; + + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + if (unlikely(!find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space))) + return 0; + + size_t num_words = 0; + if ((parser->flags & PARSER_INPUT_KEEP_ORIGINAL) == PARSER_INPUT_KEEP_ORIGINAL) + num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS); + else + num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + + uint32_t command_hash = simple_hash(command); + + size_t worker_job_id = WORKER_UTILIZATION_MAX_JOB_TYPES + 1; // set an invalid value by default + while(tmp_keyword) { + if (command_hash == tmp_keyword->keyword_hash && (!strcmp(command, tmp_keyword->keyword))) { + action_function_list = &tmp_keyword->func[0]; + worker_job_id = tmp_keyword->worker_job_id; + break; + } + tmp_keyword = tmp_keyword->next; + } + + if (unlikely(!action_function_list)) { + if (unlikely(parser->unknown_function)) + rc = parser->unknown_function(words, num_words, parser->user); + else + rc = PARSER_RC_ERROR; + } + else { + worker_is_busy(worker_job_id); + while ((action_function = *action_function_list) != NULL) { + rc = action_function(words, num_words, parser->user); + if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) + break; + + action_function_list++; + } + worker_is_idle(); + } + + if (likely(input == parser->buffer)) + parser->flags |= PARSER_INPUT_PROCESSED; + +#ifdef NETDATA_INTERNAL_CHECKS + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } +#endif + + return (rc == PARSER_RC_ERROR); +} + +inline int parser_recover_input(PARSER *parser) +{ + if (unlikely(!parser)) + return 1; + + for(int i=0; i < PARSER_MAX_RECOVER_KEYWORDS && parser->recover_location[i]; i++) + *(parser->recover_location[i]) = parser->recover_input[i]; + + parser->recover_location[0] = 0x0; + + return 0; +} diff --git a/parser/parser.h b/parser/parser.h new file mode 100644 index 0000000..ad74883 --- /dev/null +++ b/parser/parser.h @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_INCREMENTAL_PARSER_H +#define NETDATA_INCREMENTAL_PARSER_H 1 + +#include "daemon/common.h" + +#define PARSER_MAX_CALLBACKS 20 +#define PARSER_MAX_RECOVER_KEYWORDS 128 +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +// PARSER return codes +typedef enum parser_rc { + PARSER_RC_OK, // Callback was successful, go on + PARSER_RC_STOP, // Callback says STOP + PARSER_RC_ERROR // Callback failed (abort rest of callbacks) +} PARSER_RC; + +typedef enum parser_input_type { + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_INPUT_KEEP_ORIGINAL = (1 << 2), + PARSER_INPUT_PROCESSED = (1 << 3), + PARSER_NO_PARSE_INIT = (1 << 4), + PARSER_NO_ACTION_INIT = (1 << 5), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 6), +} PARSER_INPUT_TYPE; + +#define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL) + +typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data); + +typedef struct parser_keyword { + size_t worker_job_id; + char *keyword; + uint32_t keyword_hash; + int func_no; + keyword_function func[PARSER_MAX_CALLBACKS+1]; + struct parser_keyword *next; +} PARSER_KEYWORD; + +typedef struct parser_data { + char *line; + struct parser_data *next; +} PARSER_DATA; + +typedef struct parser { + size_t worker_job_next_id; + uint8_t version; // Parser version + RRDHOST *host; + int fd; // Socket + FILE *fp_input; // Input source e.g. stream + FILE *fp_output; // Stream to send commands to plugin +#ifdef ENABLE_HTTPS + struct netdata_ssl *ssl_output; +#endif + PARSER_DATA *data; // extra input + PARSER_KEYWORD *keyword; // List of parse keywords and functions + void *user; // User defined structure to hold extra state between calls + uint32_t flags; + size_t line; + + char *(*read_function)(char *buffer, long unsigned int, void *input); + int (*eof_function)(void *input); + keyword_function unknown_function; + char buffer[PLUGINSD_LINE_MAX]; + char *recover_location[PARSER_MAX_RECOVER_KEYWORDS+1]; + char recover_input[PARSER_MAX_RECOVER_KEYWORDS]; +#ifdef ENABLE_HTTPS + int bytesleft; + char tmpbuffer[PLUGINSD_LINE_MAX]; + char *readfrom; +#endif + + struct { + const char *end_keyword; + BUFFER *response; + void (*action)(struct parser *parser, void *action_data); + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_timeout; + } inflight; + +} PARSER; + +int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)); + +PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); +int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); +int parser_next(PARSER *working_parser); +int parser_action(PARSER *working_parser, char *input); +int parser_push(PARSER *working_parser, char *line); +void parser_destroy(PARSER *working_parser); +int parser_recover_input(PARSER *working_parser); + +size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); + +PARSER_RC pluginsd_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_flush(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_disable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_label(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user); + +PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user); + +#endif -- cgit v1.2.3