diff options
Diffstat (limited to '')
-rw-r--r-- | libnetdata/parser/Makefile.am (renamed from parser/Makefile.am) | 0 | ||||
-rw-r--r-- | libnetdata/parser/README.md | 28 | ||||
-rw-r--r-- | libnetdata/parser/parser.c | 225 | ||||
-rw-r--r-- | libnetdata/parser/parser.h (renamed from parser/parser.h) | 76 |
4 files changed, 280 insertions, 49 deletions
diff --git a/parser/Makefile.am b/libnetdata/parser/Makefile.am index 02fe3a314..02fe3a314 100644 --- a/parser/Makefile.am +++ b/libnetdata/parser/Makefile.am diff --git a/libnetdata/parser/README.md b/libnetdata/parser/README.md new file mode 100644 index 000000000..136c23c69 --- /dev/null +++ b/libnetdata/parser/README.md @@ -0,0 +1,28 @@ +<!-- +title: "Parser" +custom_edit_url: https://github.com/netdata/netdata/blob/master/parser/README.md +sidebar_label: "Parser" +learn_status: "Published" +learn_topic_type: "References" +learn_rel_path: "Developers/Database" +--> + +# Parser + +## Introduction + +Generic parser that is used to register keywords and a corresponding function that will be executed when that +keyword is encountered in the command stream (either from plugins or via streaming) + +To use a parser do the following: + +1. Define a structure that will be used to share user state across calls (user defined `void *user`) +2. Initialize the parser using `parser_init` +3. Register keywords with their associated callback function using `parser_add_keyword` +4. Start a loop for as long there is input (or parser_action returns error) + 1. Fetch the next line using `parser_next` (if needed) + 2. Process the line using `parser_action` +5. Release the parser using `parser_destroy` +6. Release the user structure + +See examples in receiver.c / pluginsd_parser.c diff --git a/libnetdata/parser/parser.c b/libnetdata/parser/parser.c new file mode 100644 index 000000000..c3eebcd16 --- /dev/null +++ b/libnetdata/parser/parser.c @@ -0,0 +1,225 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "parser.h" +#include "collectors/plugins.d/pluginsd_parser.h" + +static inline int find_first_keyword(const char *src, char *dst, int dst_size, int (*custom_isspace)(char)) { + const char *s = src, *keyword_start; + + while (unlikely(custom_isspace(*s))) s++; + keyword_start = s; + + while (likely(*s && !custom_isspace(*s)) && dst_size > 1) { + *dst++ = *s++; + dst_size--; + } + *dst = '\0'; + return dst_size == 0 ? 0 : (int) (s - keyword_start); +} + +/* + * Initialize a parser + * user : as defined by the user, will be shared across calls + * input : main input stream (auto detect stream -- file, socket, pipe) + * buffer : This is the buffer to be used (if null a buffer of size will be allocated) + * size : buffer size either passed or will be allocated + * If the buffer is auto allocated, it will auto freed when the parser is destroyed + * + * + */ + +PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, + PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) +{ + PARSER *parser; + + parser = callocz(1, sizeof(*parser)); + parser->user = user; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; +#ifdef ENABLE_HTTPS + parser->ssl_output = ssl; +#endif + parser->flags = flags; + parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB; + + return parser; +} + + +static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) { + uint32_t hash = parser_hash_function(command); + uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE; + PARSER_KEYWORD *t = parser->keywords.hashtable[slot]; + + if(likely(t && strcmp(t->keyword, command) == 0)) + return t; + + return NULL; +} + +/* + * Add a keyword and the corresponding function that will be called + * Multiple functions may be added + * Input : keyword + * : callback function + * : flags + * Output: > 0 registered function number + * : 0 Error + */ + +void parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) { + if(unlikely(!parser || !keyword || !*keyword || !func)) + fatal("PARSER: invalid parameters"); + + PARSER_KEYWORD *t = callocz(1, sizeof(*t)); + t->worker_job_id = parser->worker_job_next_id++; + t->keyword = strdupz(keyword); + t->func = func; + + uint32_t hash = parser_hash_function(keyword); + uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE; + + if(unlikely(parser->keywords.hashtable[slot])) + fatal("PARSER: hashtable collision between keyword '%s' and '%s' on slot %u. " + "Change the hashtable size and / or the hashing function. " + "Run the unit test to find the optimal values.", + parser->keywords.hashtable[slot]->keyword, + t->keyword, + slot + ); + + parser->keywords.hashtable[slot] = t; + + worker_register_job_name(t->worker_job_id, t->keyword); +} + +/* + * Cleanup a previously allocated parser + */ + +void parser_destroy(PARSER *parser) +{ + if (unlikely(!parser)) + return; + + dictionary_destroy(parser->inflight.functions); + + // Remove keywords + for(size_t i = 0 ; i < PARSER_KEYWORDS_HASHTABLE_SIZE; i++) { + PARSER_KEYWORD *t = parser->keywords.hashtable[i]; + if (t) { + freez(t->keyword); + freez(t); + } + } + + freez(parser); +} + + +/* + * Fetch the next line to process + * + */ + +int parser_next(PARSER *parser, char *buffer, size_t buffer_size) +{ + char *tmp = fgets(buffer, (int)buffer_size, (FILE *)parser->fp_input); + + if (unlikely(!tmp)) { + if (feof((FILE *)parser->fp_input)) + error("PARSER: read failed: end of file"); + + else if (ferror((FILE *)parser->fp_input)) + error("PARSER: read failed: input error"); + + else + error("PARSER: read failed: unknown error"); + + return 1; + } + + return 0; +} + + +/* +* Takes an initialized parser object that has an unprocessed entry (by calling parser_next) +* and if it contains a valid keyword, it will execute all the callbacks +* +*/ + +inline int parser_action(PARSER *parser, char *input) +{ + parser->line++; + + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + char command[PLUGINSD_LINE_MAX + 1]; + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + char *words[PLUGINSD_MAX_WORDS]; + size_t num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS); + const char *command = get_word(words, num_words, 0); + + if(unlikely(!command)) + return 0; + + PARSER_RC rc; + PARSER_KEYWORD *t = parser_find_keyword(parser, command); + if(likely(t)) { + worker_is_busy(t->worker_job_id); + rc = (*t->func)(words, num_words, parser->user); + worker_is_idle(); + } + else + rc = PARSER_RC_ERROR; + +#ifdef NETDATA_INTERNAL_CHECKS + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } +#endif + + return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); +} diff --git a/parser/parser.h b/libnetdata/parser/parser.h index ad7488389..9e0d3480d 100644 --- a/parser/parser.h +++ b/libnetdata/parser/parser.h @@ -3,76 +3,56 @@ #ifndef NETDATA_INCREMENTAL_PARSER_H #define NETDATA_INCREMENTAL_PARSER_H 1 -#include "daemon/common.h" +#include "../libnetdata.h" -#define PARSER_MAX_CALLBACKS 20 -#define PARSER_MAX_RECOVER_KEYWORDS 128 #define WORKER_PARSER_FIRST_JOB 3 // this has to be in-sync with the same at receiver.c #define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) +#define PARSER_KEYWORDS_HASHTABLE_SIZE 73 // unittest finds this magic number +//#define parser_hash_function(s) djb2_hash32(s) +//#define parser_hash_function(s) fnv1_hash32(s) +//#define parser_hash_function(s) fnv1a_hash32(s) +//#define parser_hash_function(s) larson_hash32(s) +#define parser_hash_function(s) pluginsd_parser_hash32(s) + // PARSER return codes -typedef enum parser_rc { +typedef enum __attribute__ ((__packed__)) parser_rc { PARSER_RC_OK, // Callback was successful, go on PARSER_RC_STOP, // Callback says STOP PARSER_RC_ERROR // Callback failed (abort rest of callbacks) } PARSER_RC; -typedef enum parser_input_type { +typedef enum __attribute__ ((__packed__)) parser_input_type { PARSER_INPUT_SPLIT = (1 << 1), - PARSER_INPUT_KEEP_ORIGINAL = (1 << 2), - PARSER_INPUT_PROCESSED = (1 << 3), - PARSER_NO_PARSE_INIT = (1 << 4), - PARSER_NO_ACTION_INIT = (1 << 5), - PARSER_DEFER_UNTIL_KEYWORD = (1 << 6), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 2), } PARSER_INPUT_TYPE; -#define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL) - typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data); typedef struct parser_keyword { - size_t worker_job_id; - char *keyword; - uint32_t keyword_hash; - int func_no; - keyword_function func[PARSER_MAX_CALLBACKS+1]; - struct parser_keyword *next; + size_t worker_job_id; + char *keyword; + keyword_function func; } PARSER_KEYWORD; -typedef struct parser_data { - char *line; - struct parser_data *next; -} PARSER_DATA; - typedef struct parser { size_t worker_job_next_id; uint8_t version; // Parser version - RRDHOST *host; int fd; // Socket FILE *fp_input; // Input source e.g. stream FILE *fp_output; // Stream to send commands to plugin #ifdef ENABLE_HTTPS struct netdata_ssl *ssl_output; #endif - PARSER_DATA *data; // extra input - PARSER_KEYWORD *keyword; // List of parse keywords and functions - void *user; // User defined structure to hold extra state between calls + void *user; // User defined structure to hold extra state between calls uint32_t flags; size_t line; - char *(*read_function)(char *buffer, long unsigned int, void *input); - int (*eof_function)(void *input); - keyword_function unknown_function; - char buffer[PLUGINSD_LINE_MAX]; - char *recover_location[PARSER_MAX_RECOVER_KEYWORDS+1]; - char recover_input[PARSER_MAX_RECOVER_KEYWORDS]; -#ifdef ENABLE_HTTPS - int bytesleft; - char tmpbuffer[PLUGINSD_LINE_MAX]; - char *readfrom; -#endif + struct { + PARSER_KEYWORD *hashtable[PARSER_KEYWORDS_HASHTABLE_SIZE]; + } keywords; struct { const char *end_keyword; @@ -85,20 +65,13 @@ typedef struct parser { DICTIONARY *functions; usec_t smaller_timeout; } inflight; - } PARSER; -int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)); - -PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); -int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); -int parser_next(PARSER *working_parser); +PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); +void parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); +int parser_next(PARSER *working_parser, char *buffer, size_t buffer_size); int parser_action(PARSER *working_parser, char *input); -int parser_push(PARSER *working_parser, char *line); void parser_destroy(PARSER *working_parser); -int parser_recover_input(PARSER *working_parser); - -size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); PARSER_RC pluginsd_set(char **words, size_t num_words, void *user); PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user); @@ -114,10 +87,15 @@ PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user); PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user); PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user); PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user); PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user); PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user); PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_end_v2(char **words, size_t num_words, void *user); +void pluginsd_cleanup_v2(void *user); + #endif |