diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
commit | a836a244a3d2bdd4da1ee2641e3e957850668cea (patch) | |
tree | cb87c75b3677fab7144f868435243f864048a1e6 /parser | |
parent | Adding upstream version 1.38.1. (diff) | |
download | netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip |
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'parser')
-rw-r--r-- | parser/Makefile.am | 9 | ||||
-rw-r--r-- | parser/README.md | 152 | ||||
-rw-r--r-- | parser/parser.c | 391 | ||||
-rw-r--r-- | parser/parser.h | 123 |
4 files changed, 0 insertions, 675 deletions
diff --git a/parser/Makefile.am b/parser/Makefile.am deleted file mode 100644 index 02fe3a314..000000000 --- a/parser/Makefile.am +++ /dev/null @@ -1,9 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -dist_noinst_DATA = \ - README.md \ - $(NULL) - diff --git a/parser/README.md b/parser/README.md deleted file mode 100644 index b7951864f..000000000 --- a/parser/README.md +++ /dev/null @@ -1,152 +0,0 @@ -<!-- -title: "Parser" -custom_edit_url: https://github.com/netdata/netdata/blob/master/parser/README.md ---> - - -#### Introduction - -The parser will be used to process streaming and plugins input as well as metadata - -Usage - -1. Define a structure that will be used to share user state across calls -1. Initialize the parser using `parser_init` -2. Register keywords and associated callback function using `parser_add_keyword` -3. Register actions on the keywords -4. Start a loop until EOF - 1. Fetch the next line using `parser_next` - 2. Process the line using `parser_action` - 1. The registered callbacks are executed to parse the input - 2. The registered action for the callback is called for processing -4. Release the parser using `parser_destroy` -5. Release the user structure - -#### Functions - -TODO: - -##### parse_init(RRDHOST *host, void *user, void *input, int flags) - -Initialize an internal parser with the specified user defined data structure that will be shared across calls. - -Input -- Host - - The host this parser will be dealing with. For streaming with SSL enabled for this host -- user - - User defined structure that is passed in all the calls -- input - - Where the parser will get the input from -- flags - - flags to define processing on the input - -Output -- A parser structure - - - -##### parse_push(PARSER *parser, char *line) - -Push a new line for processing - -Input - -- parser - - The parser object as returned by the `parser_init` -- line - - The new line to process - - -Output -- The line will be injected into the stream and will be the next one to be processed - -Returns -- 0 line added -- 1 error detected - - -##### parse_add_keyword(PARSER *parser, char *keyword, keyword_function callback_function) - -The function will add callbacks for keywords. The callback function is defined as - -`typedef PARSER_RC (*keyword_function)(char **, void *);` - -Input - -- parser - - The parser object as returned by the `parser_init` -- keyword - - The keyword to register -- keyword_function - - The callback that will handle the keyword processing - * The callback function should return one of the following - * PARSER_RC_OK - Callback was successful (continue with other callbacks) - * PARSER_RC_STOP - Stop processing callbacks (return OK) - * PARSER_RC_ERROR - Callback failed, exit - -Output -- The corresponding keyword and callback will be registered - -Returns -- 0 maximum callbacks already registered for this keyword -- > 0 which is the number of callbacks associated with this keyword. - - -##### parser_next(PARSER *parser) -Return the next item to parse - -Input -- parser - - The parser object as returned by the `parser_init` - -Output -- The parser will store internally the next item to parse - -Returns -- 0 Next item fetched successfully -- 1 No more items to parse - - -##### parser_action(PARSER *parser, char *input) -Return the next item to parse - -Input -- parser - - The parser object as returned by the `parser_init` -- input - - Process the input specified instead of using the internal buffer - -Output -- The current keyword will be processed by calling all the registered callbacks - -Returns -- 0 Callbacks called successfully -- 1 Failed - - -##### parser_destroy(PARSER *parser) -Cleanup a previously allocated parser - -Input -- parser - - The parser object as returned by the `parser_init` - -Output -- The parser is deallocated - -Returns -- none - - -##### parser_recover_input(PARSER *parser) -Cleanup a previously allocated parser - -Input -- parser - - The parser object as returned by the `parser_init` - -Output -- The parser is deallocated - -Returns -- none diff --git a/parser/parser.c b/parser/parser.c deleted file mode 100644 index c687c7af4..000000000 --- a/parser/parser.c +++ /dev/null @@ -1,391 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "parser.h" -#include "collectors/plugins.d/pluginsd_parser.h" - -inline int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)) -{ - const char *s = str, *keyword_start; - - while (unlikely(custom_isspace(*s))) s++; - keyword_start = s; - - while (likely(*s && !custom_isspace(*s)) && max_size > 1) { - *keyword++ = *s++; - max_size--; - } - *keyword = '\0'; - return max_size == 0 ? 0 : (int) (s - keyword_start); -} - -/* - * Initialize a parser - * user : as defined by the user, will be shared across calls - * input : main input stream (auto detect stream -- file, socket, pipe) - * buffer : This is the buffer to be used (if null a buffer of size will be allocated) - * size : buffer size either passed or will be allocated - * If the buffer is auto allocated, it will auto freed when the parser is destroyed - * - * - */ - -PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) -{ - PARSER *parser; - - parser = callocz(1, sizeof(*parser)); - parser->user = user; - parser->fd = fd; - parser->fp_input = fp_input; - parser->fp_output = fp_output; -#ifdef ENABLE_HTTPS - parser->ssl_output = ssl; -#endif - parser->flags = flags; - parser->host = host; - parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB; - inflight_functions_init(parser); - -#ifdef ENABLE_HTTPS - parser->bytesleft = 0; - parser->readfrom = NULL; -#endif - - if (unlikely(!(flags & PARSER_NO_PARSE_INIT))) { - parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); - parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); - parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end); - parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); - parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); - parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); - parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); - parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); - parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end); - parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); - parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); - parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); - parser_add_keyword(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); - - parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); - parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); - - parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_rrdset_begin); - parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set); - parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state); - parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state); - parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end); - } - - return parser; -} - - -/* - * Push a new line into the parsing stream - * - * This line will be the next one to process ie the next fetch will get this one - * - */ - -int parser_push(PARSER *parser, char *line) -{ - PARSER_DATA *tmp_parser_data; - - if (unlikely(!parser)) - return 1; - - if (unlikely(!line)) - return 0; - - tmp_parser_data = callocz(1, sizeof(*tmp_parser_data)); - tmp_parser_data->line = strdupz(line); - tmp_parser_data->next = parser->data; - parser->data = tmp_parser_data; - - return 0; -} - -/* - * Add a keyword and the corresponding function that will be called - * Multiple functions may be added - * Input : keyword - * : callback function - * : flags - * Output: > 0 registered function number - * : 0 Error - */ - -int parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) -{ - PARSER_KEYWORD *tmp_keyword; - - if (strcmp(keyword, "_read") == 0) { - parser->read_function = (void *) func; - return 0; - } - - if (strcmp(keyword, "_eof") == 0) { - parser->eof_function = (void *) func; - return 0; - } - - if (strcmp(keyword, "_unknown") == 0) { - parser->unknown_function = (void *) func; - return 0; - } - - uint32_t keyword_hash = simple_hash(keyword); - - tmp_keyword = parser->keyword; - - while (tmp_keyword) { - if (tmp_keyword->keyword_hash == keyword_hash && (!strcmp(tmp_keyword->keyword, keyword))) { - if (tmp_keyword->func_no == PARSER_MAX_CALLBACKS) - return 0; - tmp_keyword->func[tmp_keyword->func_no++] = (void *) func; - return tmp_keyword->func_no; - } - tmp_keyword = tmp_keyword->next; - } - - tmp_keyword = callocz(1, sizeof(*tmp_keyword)); - - tmp_keyword->worker_job_id = parser->worker_job_next_id++; - tmp_keyword->keyword = strdupz(keyword); - tmp_keyword->keyword_hash = keyword_hash; - tmp_keyword->func[tmp_keyword->func_no++] = (void *) func; - - worker_register_job_name(tmp_keyword->worker_job_id, tmp_keyword->keyword); - - tmp_keyword->next = parser->keyword; - parser->keyword = tmp_keyword; - return tmp_keyword->func_no; -} - -/* - * Cleanup a previously allocated parser - */ - -void parser_destroy(PARSER *parser) -{ - if (unlikely(!parser)) - return; - - dictionary_destroy(parser->inflight.functions); - - PARSER_KEYWORD *tmp_keyword, *tmp_keyword_next; - PARSER_DATA *tmp_parser_data, *tmp_parser_data_next; - - // Remove keywords - tmp_keyword = parser->keyword; - while (tmp_keyword) { - tmp_keyword_next = tmp_keyword->next; - freez(tmp_keyword->keyword); - freez(tmp_keyword); - tmp_keyword = tmp_keyword_next; - } - - // Remove pushed data if any - tmp_parser_data = parser->data; - while (tmp_parser_data) { - tmp_parser_data_next = tmp_parser_data->next; - freez(tmp_parser_data->line); - freez(tmp_parser_data); - tmp_parser_data = tmp_parser_data_next; - } - - freez(parser); -} - - -/* - * Fetch the next line to process - * - */ - -int parser_next(PARSER *parser) -{ - char *tmp = NULL; - - if (unlikely(!parser)) - return 1; - - parser->flags &= ~(PARSER_INPUT_PROCESSED); - - PARSER_DATA *tmp_parser_data = parser->data; - - if (unlikely(tmp_parser_data)) { - strncpyz(parser->buffer, tmp_parser_data->line, PLUGINSD_LINE_MAX); - parser->data = tmp_parser_data->next; - freez(tmp_parser_data->line); - freez(tmp_parser_data); - return 0; - } - - if (unlikely(parser->read_function)) - tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->fp_input); - else if(likely(parser->fp_input)) - tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->fp_input); - else - tmp = NULL; - - if (unlikely(!tmp)) { - if (unlikely(parser->eof_function)) { - int rc = parser->eof_function(parser->fp_input); - error("read failed: user defined function returned %d", rc); - } - else { - if (feof((FILE *)parser->fp_input)) - error("read failed: end of file"); - else if (ferror((FILE *)parser->fp_input)) - error("read failed: input error"); - else - error("read failed: unknown error"); - } - return 1; - } - return 0; -} - - -/* -* Takes an initialized parser object that has an unprocessed entry (by calling parser_next) -* and if it contains a valid keyword, it will execute all the callbacks -* -*/ - -inline int parser_action(PARSER *parser, char *input) -{ - parser->line++; - - PARSER_RC rc = PARSER_RC_OK; - char *words[PLUGINSD_MAX_WORDS]; - char command[PLUGINSD_LINE_MAX + 1]; - keyword_function action_function; - keyword_function *action_function_list = NULL; - - if (unlikely(!parser)) { - internal_error(true, "parser is NULL"); - return 1; - } - - parser->recover_location[0] = 0x0; - - // if not direct input check if we have reprocessed this - if (unlikely(!input && parser->flags & PARSER_INPUT_PROCESSED)) - return 0; - - PARSER_KEYWORD *tmp_keyword = parser->keyword; - if (unlikely(!tmp_keyword)) { - internal_error(true, "called without a keyword"); - return 1; - } - - if (unlikely(!input)) - input = parser->buffer; - - if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { - bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space); - - if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { - if(parser->defer.response) { - buffer_strcat(parser->defer.response, input); - if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { - // more than 10MB of data - // a bad plugin that did not send the end_keyword - internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); - return 1; - } - } - return 0; - } - else { - // call the action - parser->defer.action(parser, parser->defer.action_data); - - // empty everything - parser->defer.action = NULL; - parser->defer.action_data = NULL; - parser->defer.end_keyword = NULL; - parser->defer.response = NULL; - parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; - } - return 0; - } - - if (unlikely(!find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space))) - return 0; - - size_t num_words = 0; - if ((parser->flags & PARSER_INPUT_KEEP_ORIGINAL) == PARSER_INPUT_KEEP_ORIGINAL) - num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS); - else - num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); - - uint32_t command_hash = simple_hash(command); - - size_t worker_job_id = WORKER_UTILIZATION_MAX_JOB_TYPES + 1; // set an invalid value by default - while(tmp_keyword) { - if (command_hash == tmp_keyword->keyword_hash && (!strcmp(command, tmp_keyword->keyword))) { - action_function_list = &tmp_keyword->func[0]; - worker_job_id = tmp_keyword->worker_job_id; - break; - } - tmp_keyword = tmp_keyword->next; - } - - if (unlikely(!action_function_list)) { - if (unlikely(parser->unknown_function)) - rc = parser->unknown_function(words, num_words, parser->user); - else - rc = PARSER_RC_ERROR; - } - else { - worker_is_busy(worker_job_id); - while ((action_function = *action_function_list) != NULL) { - rc = action_function(words, num_words, parser->user); - if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) - break; - - action_function_list++; - } - worker_is_idle(); - } - - if (likely(input == parser->buffer)) - parser->flags |= PARSER_INPUT_PROCESSED; - -#ifdef NETDATA_INTERNAL_CHECKS - if(rc == PARSER_RC_ERROR) { - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); - for(size_t i = 0; i < num_words ;i++) { - if(i) buffer_fast_strcat(wb, " ", 1); - - buffer_fast_strcat(wb, "\"", 1); - const char *s = get_word(words, num_words, i); - buffer_strcat(wb, s?s:""); - buffer_fast_strcat(wb, "\"", 1); - } - - internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", - command, parser->line, buffer_tostring(wb)); - - buffer_free(wb); - } -#endif - - return (rc == PARSER_RC_ERROR); -} - -inline int parser_recover_input(PARSER *parser) -{ - if (unlikely(!parser)) - return 1; - - for(int i=0; i < PARSER_MAX_RECOVER_KEYWORDS && parser->recover_location[i]; i++) - *(parser->recover_location[i]) = parser->recover_input[i]; - - parser->recover_location[0] = 0x0; - - return 0; -} diff --git a/parser/parser.h b/parser/parser.h deleted file mode 100644 index ad7488389..000000000 --- a/parser/parser.h +++ /dev/null @@ -1,123 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_INCREMENTAL_PARSER_H -#define NETDATA_INCREMENTAL_PARSER_H 1 - -#include "daemon/common.h" - -#define PARSER_MAX_CALLBACKS 20 -#define PARSER_MAX_RECOVER_KEYWORDS 128 -#define WORKER_PARSER_FIRST_JOB 3 - -// this has to be in-sync with the same at receiver.c -#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) - -// PARSER return codes -typedef enum parser_rc { - PARSER_RC_OK, // Callback was successful, go on - PARSER_RC_STOP, // Callback says STOP - PARSER_RC_ERROR // Callback failed (abort rest of callbacks) -} PARSER_RC; - -typedef enum parser_input_type { - PARSER_INPUT_SPLIT = (1 << 1), - PARSER_INPUT_KEEP_ORIGINAL = (1 << 2), - PARSER_INPUT_PROCESSED = (1 << 3), - PARSER_NO_PARSE_INIT = (1 << 4), - PARSER_NO_ACTION_INIT = (1 << 5), - PARSER_DEFER_UNTIL_KEYWORD = (1 << 6), -} PARSER_INPUT_TYPE; - -#define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL) - -typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data); - -typedef struct parser_keyword { - size_t worker_job_id; - char *keyword; - uint32_t keyword_hash; - int func_no; - keyword_function func[PARSER_MAX_CALLBACKS+1]; - struct parser_keyword *next; -} PARSER_KEYWORD; - -typedef struct parser_data { - char *line; - struct parser_data *next; -} PARSER_DATA; - -typedef struct parser { - size_t worker_job_next_id; - uint8_t version; // Parser version - RRDHOST *host; - int fd; // Socket - FILE *fp_input; // Input source e.g. stream - FILE *fp_output; // Stream to send commands to plugin -#ifdef ENABLE_HTTPS - struct netdata_ssl *ssl_output; -#endif - PARSER_DATA *data; // extra input - PARSER_KEYWORD *keyword; // List of parse keywords and functions - void *user; // User defined structure to hold extra state between calls - uint32_t flags; - size_t line; - - char *(*read_function)(char *buffer, long unsigned int, void *input); - int (*eof_function)(void *input); - keyword_function unknown_function; - char buffer[PLUGINSD_LINE_MAX]; - char *recover_location[PARSER_MAX_RECOVER_KEYWORDS+1]; - char recover_input[PARSER_MAX_RECOVER_KEYWORDS]; -#ifdef ENABLE_HTTPS - int bytesleft; - char tmpbuffer[PLUGINSD_LINE_MAX]; - char *readfrom; -#endif - - struct { - const char *end_keyword; - BUFFER *response; - void (*action)(struct parser *parser, void *action_data); - void *action_data; - } defer; - - struct { - DICTIONARY *functions; - usec_t smaller_timeout; - } inflight; - -} PARSER; - -int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)); - -PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); -int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); -int parser_next(PARSER *working_parser); -int parser_action(PARSER *working_parser, char *input); -int parser_push(PARSER *working_parser, char *line); -void parser_destroy(PARSER *working_parser); -int parser_recover_input(PARSER *working_parser); - -size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); - -PARSER_RC pluginsd_set(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_end(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_flush(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_disable(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_label(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user); - -PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user); - -#endif |