From ab1bb5b7f1c3c3a7b240ab7fc8661459ecd7decb Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 20 Jul 2023 06:49:55 +0200 Subject: Adding upstream version 1.41.0. Signed-off-by: Daniel Baumann --- collectors/plugins.d/pluginsd_parser.h | 169 ++++++++++++++++++++++++++++++++- 1 file changed, 164 insertions(+), 5 deletions(-) (limited to 'collectors/plugins.d/pluginsd_parser.h') diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 1fdc23a0e..5e1ea1242 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -5,13 +5,39 @@ #include "daemon/common.h" +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +// PARSER return codes +typedef enum __attribute__ ((__packed__)) parser_rc { + PARSER_RC_OK, // Callback was successful, go on + PARSER_RC_STOP, // Callback says STOP + PARSER_RC_ERROR // Callback failed (abort rest of callbacks) +} PARSER_RC; + +typedef enum __attribute__ ((__packed__)) parser_input_type { + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 2), +} PARSER_INPUT_TYPE; + typedef enum __attribute__ ((__packed__)) { PARSER_INIT_PLUGINSD = (1 << 1), PARSER_INIT_STREAMING = (1 << 2), -} PLUGINSD_KEYWORDS; +} PARSER_REPERTOIRE; + +struct parser; +typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser); + +typedef struct parser_keyword { + char *keyword; + size_t id; + PARSER_REPERTOIRE repertoire; + size_t worker_job_id; +} PARSER_KEYWORD; typedef struct parser_user_object { - PARSER *parser; RRDSET *st; RRDHOST *host; void *opaque; @@ -54,9 +80,142 @@ typedef struct parser_user_object { } v2; } PARSER_USER_OBJECT; -PARSER_RC pluginsd_function(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user); +typedef struct parser { + uint8_t version; // Parser version + PARSER_REPERTOIRE repertoire; + uint32_t flags; + int fd; // Socket + size_t line; + FILE *fp_input; // Input source e.g. stream + FILE *fp_output; // Stream to send commands to plugin + +#ifdef ENABLE_HTTPS + NETDATA_SSL *ssl_output; +#endif + + PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls + + struct buffered_reader reader; + + struct { + const char *end_keyword; + BUFFER *response; + void (*action)(struct parser *parser, void *action_data); + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_timeout; + } inflight; + + struct { + SPINLOCK spinlock; + } writer; + +} PARSER; + +PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); +void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire); +void parser_destroy(PARSER *working_parser); +void pluginsd_cleanup_v2(PARSER *parser); void inflight_functions_init(PARSER *parser); -void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types); +void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire); +PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words); + +static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) { + const char *s = src, *keyword_start; + + while (unlikely(isspace_map[(uint8_t)*s])) s++; + keyword_start = s; + + while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) { + *dst++ = *s++; + dst_size--; + } + *dst = '\0'; + return dst_size == 0 ? 0 : (int) (s - keyword_start); +} + +PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len); + +static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) { + PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command)); + if(t && (t->repertoire & parser->repertoire)) + return t; + + return NULL; +} + +static inline int parser_action(PARSER *parser, char *input) { + parser->line++; + + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + char command[PLUGINSD_LINE_MAX + 1]; + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, isspace_map_pluginsd); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + char *words[PLUGINSD_MAX_WORDS]; + size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); + const char *command = get_word(words, num_words, 0); + + if(unlikely(!command)) + return 0; + + PARSER_RC rc; + PARSER_KEYWORD *t = parser_find_keyword(parser, command); + if(likely(t)) { + worker_is_busy(t->worker_job_id); + rc = parser_execute(parser, t, words, num_words); + // rc = (*t->func)(words, num_words, parser); + worker_is_idle(); + } + else + rc = PARSER_RC_ERROR; + + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } + + return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); +} #endif //NETDATA_PLUGINSD_PARSER_H -- cgit v1.2.3