diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /collectors/plugins.d/pluginsd_parser.h | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.h')
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 69 |
1 files changed, 45 insertions, 24 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 74767569b..1fce9a89a 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -11,8 +11,11 @@ #define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) // this controls the max response size of a function -#define PLUGINSD_MAX_DEFERRED_SIZE (20 * 1024 * 1024) +#define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024) +#define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024 + +#define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral" // PARSER return codes typedef enum __attribute__ ((__packed__)) parser_rc { PARSER_RC_OK, // Callback was successful, go on @@ -28,6 +31,7 @@ typedef enum __attribute__ ((__packed__)) parser_input_type { typedef enum __attribute__ ((__packed__)) { PARSER_INIT_PLUGINSD = (1 << 1), PARSER_INIT_STREAMING = (1 << 2), + PARSER_REP_METADATA = (1 << 3), } PARSER_REPERTOIRE; struct parser; @@ -41,6 +45,7 @@ typedef struct parser_keyword { } PARSER_KEYWORD; typedef struct parser_user_object { + bool cleanup_slots; RRDSET *st; RRDHOST *host; void *opaque; @@ -51,6 +56,11 @@ typedef struct parser_user_object { size_t data_collections_count; int enabled; +#ifdef NETDATA_LOG_STREAM_RECEIVE + FILE *stream_log_fp; + PARSER_REPERTOIRE stream_log_repertoire; +#endif + STREAM_CAPABILITIES capabilities; // receiver capabilities struct { @@ -88,17 +98,21 @@ typedef struct parser { PARSER_REPERTOIRE repertoire; uint32_t flags; int fd; // Socket - size_t line; FILE *fp_input; // Input source e.g. stream FILE *fp_output; // Stream to send commands to plugin #ifdef ENABLE_HTTPS NETDATA_SSL *ssl_output; #endif +#ifdef ENABLE_H2O + void *h2o_ctx; // if set we use h2o_stream functions to send data +#endif PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls struct buffered_reader reader; + struct line_splitter line; + PARSER_KEYWORD *keyword; struct { const char *end_keyword; @@ -150,8 +164,17 @@ static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *co return NULL; } +bool parser_reconstruct_node(BUFFER *wb, void *ptr); +bool parser_reconstruct_instance(BUFFER *wb, void *ptr); +bool parser_reconstruct_context(BUFFER *wb, void *ptr); + static inline int parser_action(PARSER *parser, char *input) { - parser->line++; +#ifdef NETDATA_LOG_STREAM_RECEIVE + static __thread char line[PLUGINSD_LINE_MAX + 1]; + strncpyz(line, input, sizeof(line) - 1); +#endif + + parser->line.count++; if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { char command[100 + 1]; @@ -183,18 +206,25 @@ static inline int parser_action(PARSER *parser, char *input) { return 0; } - static __thread char *words[PLUGINSD_MAX_WORDS]; - size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); - const char *command = get_word(words, num_words, 0); + parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS); + const char *command = get_word(parser->line.words, parser->line.num_words, 0); - if(unlikely(!command)) + if(unlikely(!command)) { + line_splitter_reset(&parser->line); return 0; + } PARSER_RC rc; - PARSER_KEYWORD *t = parser_find_keyword(parser, command); - if(likely(t)) { - worker_is_busy(t->worker_job_id); - rc = parser_execute(parser, t, words, num_words); + parser->keyword = parser_find_keyword(parser, command); + if(likely(parser->keyword)) { + worker_is_busy(parser->keyword->worker_job_id); + +#ifdef NETDATA_LOG_STREAM_RECEIVE + if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire) + fprintf(parser->user.stream_log_fp, "%s", line); +#endif + + rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words); // rc = (*t->func)(words, num_words, parser); worker_is_idle(); } @@ -202,22 +232,13 @@ static inline int parser_action(PARSER *parser, char *input) { rc = PARSER_RC_ERROR; if(rc == PARSER_RC_ERROR) { - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); - for(size_t i = 0; i < num_words ;i++) { - if(i) buffer_fast_strcat(wb, " ", 1); - - buffer_fast_strcat(wb, "\"", 1); - const char *s = get_word(words, num_words, i); - buffer_strcat(wb, s?s:""); - buffer_fast_strcat(wb, "\"", 1); - } - + CLEAN_BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + line_splitter_reconstruct_line(wb, &parser->line); netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", - command, parser->line, buffer_tostring(wb)); - - buffer_free(wb); + command, parser->line.count, buffer_tostring(wb)); } + line_splitter_reset(&parser->line); return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); } |