diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
commit | a836a244a3d2bdd4da1ee2641e3e957850668cea (patch) | |
tree | cb87c75b3677fab7144f868435243f864048a1e6 /libnetdata/parser | |
parent | Adding upstream version 1.38.1. (diff) | |
download | netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip |
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'libnetdata/parser')
-rw-r--r-- | libnetdata/parser/Makefile.am | 9 | ||||
-rw-r--r-- | libnetdata/parser/README.md | 28 | ||||
-rw-r--r-- | libnetdata/parser/parser.c | 225 | ||||
-rw-r--r-- | libnetdata/parser/parser.h | 101 |
4 files changed, 363 insertions, 0 deletions
diff --git a/libnetdata/parser/Makefile.am b/libnetdata/parser/Makefile.am new file mode 100644 index 00000000..02fe3a31 --- /dev/null +++ b/libnetdata/parser/Makefile.am @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) + diff --git a/libnetdata/parser/README.md b/libnetdata/parser/README.md new file mode 100644 index 00000000..136c23c6 --- /dev/null +++ b/libnetdata/parser/README.md @@ -0,0 +1,28 @@ +<!-- +title: "Parser" +custom_edit_url: https://github.com/netdata/netdata/blob/master/parser/README.md +sidebar_label: "Parser" +learn_status: "Published" +learn_topic_type: "References" +learn_rel_path: "Developers/Database" +--> + +# Parser + +## Introduction + +Generic parser that is used to register keywords and a corresponding function that will be executed when that +keyword is encountered in the command stream (either from plugins or via streaming) + +To use a parser do the following: + +1. Define a structure that will be used to share user state across calls (user defined `void *user`) +2. Initialize the parser using `parser_init` +3. Register keywords with their associated callback function using `parser_add_keyword` +4. Start a loop for as long there is input (or parser_action returns error) + 1. Fetch the next line using `parser_next` (if needed) + 2. Process the line using `parser_action` +5. Release the parser using `parser_destroy` +6. Release the user structure + +See examples in receiver.c / pluginsd_parser.c diff --git a/libnetdata/parser/parser.c b/libnetdata/parser/parser.c new file mode 100644 index 00000000..c3eebcd1 --- /dev/null +++ b/libnetdata/parser/parser.c @@ -0,0 +1,225 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "parser.h" +#include "collectors/plugins.d/pluginsd_parser.h" + +static inline int find_first_keyword(const char *src, char *dst, int dst_size, int (*custom_isspace)(char)) { + const char *s = src, *keyword_start; + + while (unlikely(custom_isspace(*s))) s++; + keyword_start = s; + + while (likely(*s && !custom_isspace(*s)) && dst_size > 1) { + *dst++ = *s++; + dst_size--; + } + *dst = '\0'; + return dst_size == 0 ? 0 : (int) (s - keyword_start); +} + +/* + * Initialize a parser + * user : as defined by the user, will be shared across calls + * input : main input stream (auto detect stream -- file, socket, pipe) + * buffer : This is the buffer to be used (if null a buffer of size will be allocated) + * size : buffer size either passed or will be allocated + * If the buffer is auto allocated, it will auto freed when the parser is destroyed + * + * + */ + +PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, + PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) +{ + PARSER *parser; + + parser = callocz(1, sizeof(*parser)); + parser->user = user; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; +#ifdef ENABLE_HTTPS + parser->ssl_output = ssl; +#endif + parser->flags = flags; + parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB; + + return parser; +} + + +static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) { + uint32_t hash = parser_hash_function(command); + uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE; + PARSER_KEYWORD *t = parser->keywords.hashtable[slot]; + + if(likely(t && strcmp(t->keyword, command) == 0)) + return t; + + return NULL; +} + +/* + * Add a keyword and the corresponding function that will be called + * Multiple functions may be added + * Input : keyword + * : callback function + * : flags + * Output: > 0 registered function number + * : 0 Error + */ + +void parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) { + if(unlikely(!parser || !keyword || !*keyword || !func)) + fatal("PARSER: invalid parameters"); + + PARSER_KEYWORD *t = callocz(1, sizeof(*t)); + t->worker_job_id = parser->worker_job_next_id++; + t->keyword = strdupz(keyword); + t->func = func; + + uint32_t hash = parser_hash_function(keyword); + uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE; + + if(unlikely(parser->keywords.hashtable[slot])) + fatal("PARSER: hashtable collision between keyword '%s' and '%s' on slot %u. " + "Change the hashtable size and / or the hashing function. " + "Run the unit test to find the optimal values.", + parser->keywords.hashtable[slot]->keyword, + t->keyword, + slot + ); + + parser->keywords.hashtable[slot] = t; + + worker_register_job_name(t->worker_job_id, t->keyword); +} + +/* + * Cleanup a previously allocated parser + */ + +void parser_destroy(PARSER *parser) +{ + if (unlikely(!parser)) + return; + + dictionary_destroy(parser->inflight.functions); + + // Remove keywords + for(size_t i = 0 ; i < PARSER_KEYWORDS_HASHTABLE_SIZE; i++) { + PARSER_KEYWORD *t = parser->keywords.hashtable[i]; + if (t) { + freez(t->keyword); + freez(t); + } + } + + freez(parser); +} + + +/* + * Fetch the next line to process + * + */ + +int parser_next(PARSER *parser, char *buffer, size_t buffer_size) +{ + char *tmp = fgets(buffer, (int)buffer_size, (FILE *)parser->fp_input); + + if (unlikely(!tmp)) { + if (feof((FILE *)parser->fp_input)) + error("PARSER: read failed: end of file"); + + else if (ferror((FILE *)parser->fp_input)) + error("PARSER: read failed: input error"); + + else + error("PARSER: read failed: unknown error"); + + return 1; + } + + return 0; +} + + +/* +* Takes an initialized parser object that has an unprocessed entry (by calling parser_next) +* and if it contains a valid keyword, it will execute all the callbacks +* +*/ + +inline int parser_action(PARSER *parser, char *input) +{ + parser->line++; + + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + char command[PLUGINSD_LINE_MAX + 1]; + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + char *words[PLUGINSD_MAX_WORDS]; + size_t num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS); + const char *command = get_word(words, num_words, 0); + + if(unlikely(!command)) + return 0; + + PARSER_RC rc; + PARSER_KEYWORD *t = parser_find_keyword(parser, command); + if(likely(t)) { + worker_is_busy(t->worker_job_id); + rc = (*t->func)(words, num_words, parser->user); + worker_is_idle(); + } + else + rc = PARSER_RC_ERROR; + +#ifdef NETDATA_INTERNAL_CHECKS + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } +#endif + + return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); +} diff --git a/libnetdata/parser/parser.h b/libnetdata/parser/parser.h new file mode 100644 index 00000000..9e0d3480 --- /dev/null +++ b/libnetdata/parser/parser.h @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_INCREMENTAL_PARSER_H +#define NETDATA_INCREMENTAL_PARSER_H 1 + +#include "../libnetdata.h" + +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +#define PARSER_KEYWORDS_HASHTABLE_SIZE 73 // unittest finds this magic number +//#define parser_hash_function(s) djb2_hash32(s) +//#define parser_hash_function(s) fnv1_hash32(s) +//#define parser_hash_function(s) fnv1a_hash32(s) +//#define parser_hash_function(s) larson_hash32(s) +#define parser_hash_function(s) pluginsd_parser_hash32(s) + +// PARSER return codes +typedef enum __attribute__ ((__packed__)) parser_rc { + PARSER_RC_OK, // Callback was successful, go on + PARSER_RC_STOP, // Callback says STOP + PARSER_RC_ERROR // Callback failed (abort rest of callbacks) +} PARSER_RC; + +typedef enum __attribute__ ((__packed__)) parser_input_type { + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 2), +} PARSER_INPUT_TYPE; + +typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data); + +typedef struct parser_keyword { + size_t worker_job_id; + char *keyword; + keyword_function func; +} PARSER_KEYWORD; + +typedef struct parser { + size_t worker_job_next_id; + uint8_t version; // Parser version + int fd; // Socket + FILE *fp_input; // Input source e.g. stream + FILE *fp_output; // Stream to send commands to plugin +#ifdef ENABLE_HTTPS + struct netdata_ssl *ssl_output; +#endif + void *user; // User defined structure to hold extra state between calls + uint32_t flags; + size_t line; + + struct { + PARSER_KEYWORD *hashtable[PARSER_KEYWORDS_HASHTABLE_SIZE]; + } keywords; + + struct { + const char *end_keyword; + BUFFER *response; + void (*action)(struct parser *parser, void *action_data); + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_timeout; + } inflight; +} PARSER; + +PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); +void parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); +int parser_next(PARSER *working_parser, char *buffer, size_t buffer_size); +int parser_action(PARSER *working_parser, char *input); +void parser_destroy(PARSER *working_parser); + +PARSER_RC pluginsd_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_flush(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_disable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_label(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user); + +PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user); + +PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_end_v2(char **words, size_t num_words, void *user); +void pluginsd_cleanup_v2(void *user); + +#endif |