diff options
Diffstat (limited to 'src/plugins.d/pluginsd_parser.h')
-rw-r--r-- | src/plugins.d/pluginsd_parser.h | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/src/plugins.d/pluginsd_parser.h b/src/plugins.d/pluginsd_parser.h new file mode 100644 index 000000000..983da7d13 --- /dev/null +++ b/src/plugins.d/pluginsd_parser.h @@ -0,0 +1,245 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_PLUGINSD_PARSER_H +#define NETDATA_PLUGINSD_PARSER_H + +#include "daemon/common.h" + +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +// this controls the max response size of a function +#define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024) + +#define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024 + +#define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral" +// PARSER return codes +typedef enum __attribute__ ((__packed__)) parser_rc { + PARSER_RC_OK, // Callback was successful, go on + PARSER_RC_STOP, // Callback says STOP + PARSER_RC_ERROR // Callback failed (abort rest of callbacks) +} PARSER_RC; + +typedef enum __attribute__ ((__packed__)) parser_input_type { + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 2), +} PARSER_INPUT_TYPE; + +typedef enum __attribute__ ((__packed__)) { + PARSER_INIT_PLUGINSD = (1 << 1), + PARSER_INIT_STREAMING = (1 << 2), + PARSER_REP_METADATA = (1 << 3), +} PARSER_REPERTOIRE; + +struct parser; +typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser); + +typedef struct parser_keyword { + char *keyword; + size_t id; + PARSER_REPERTOIRE repertoire; + size_t worker_job_id; +} PARSER_KEYWORD; + +typedef struct parser_user_object { + bool cleanup_slots; + RRDSET *st; + RRDHOST *host; + void *opaque; + struct plugind *cd; + int trust_durations; + RRDLABELS *new_host_labels; + RRDLABELS *chart_rrdlabels_linked_temporarily; + size_t data_collections_count; + int enabled; + +#ifdef NETDATA_LOG_STREAM_RECEIVE + FILE *stream_log_fp; + PARSER_REPERTOIRE stream_log_repertoire; +#endif + + STREAM_CAPABILITIES capabilities; // receiver capabilities + + struct { + bool parsing_host; + nd_uuid_t machine_guid; + char machine_guid_str[UUID_STR_LEN]; + STRING *hostname; + RRDLABELS *rrdlabels; + } host_define; + + struct parser_user_object_replay { + time_t start_time; + time_t end_time; + + usec_t start_time_ut; + usec_t end_time_ut; + + time_t wall_clock_time; + + bool rset_enabled; + } replay; + + struct parser_user_object_v2 { + bool locked_data_collection; + RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this + time_t update_every; + time_t end_time; + time_t wall_clock_time; + bool ml_locked; + } v2; +} PARSER_USER_OBJECT; + +typedef void (*parser_deferred_action_t)(struct parser *parser, void *action_data); + +struct parser { + uint8_t version; // Parser version + PARSER_REPERTOIRE repertoire; + uint32_t flags; + int fd_input; + int fd_output; + + NETDATA_SSL *ssl_output; + +#ifdef ENABLE_H2O + void *h2o_ctx; // if set we use h2o_stream functions to send data +#endif + + PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls + + struct buffered_reader reader; + struct line_splitter line; + const PARSER_KEYWORD *keyword; + + struct { + const char *end_keyword; + BUFFER *response; + parser_deferred_action_t action; + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_monotonic_timeout_ut; + } inflight; + + struct { + SPINLOCK spinlock; + } writer; +}; + +typedef struct parser PARSER; + +PARSER *parser_init(struct parser_user_object *user, int fd_input, int fd_output, PARSER_INPUT_TYPE flags, void *ssl); +void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire); +void parser_destroy(PARSER *working_parser); +void pluginsd_cleanup_v2(PARSER *parser); +void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire); +PARSER_RC parser_execute(PARSER *parser, const PARSER_KEYWORD *keyword, char **words, size_t num_words); + +static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) { + const char *s = src, *keyword_start; + + while (unlikely(isspace_map[(uint8_t)*s])) s++; + keyword_start = s; + + while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) { + *dst++ = *s++; + dst_size--; + } + *dst = '\0'; + return dst_size == 0 ? 0 : (int) (s - keyword_start); +} + +const PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len); + +static inline const PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) { + const PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command)); + if(t && (t->repertoire & parser->repertoire)) + return t; + + return NULL; +} + +bool parser_reconstruct_node(BUFFER *wb, void *ptr); +bool parser_reconstruct_instance(BUFFER *wb, void *ptr); +bool parser_reconstruct_context(BUFFER *wb, void *ptr); + +static inline int parser_action(PARSER *parser, char *input) { +#ifdef NETDATA_LOG_STREAM_RECEIVE + static __thread char line[PLUGINSD_LINE_MAX + 1]; + strncpyz(line, input, sizeof(line) - 1); +#endif + + parser->line.count++; + + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + char command[100 + 1]; + bool has_keyword = find_first_keyword(input, command, 100, isspace_map_pluginsd); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > PLUGINSD_MAX_DEFERRED_SIZE) { + // more than PLUGINSD_MAX_DEFERRED_SIZE of data, + // or a bad plugin that did not send the end_keyword + nd_log(NDLS_DAEMON, NDLP_ERR, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS); + const char *command = get_word(parser->line.words, parser->line.num_words, 0); + + if(unlikely(!command)) { + line_splitter_reset(&parser->line); + return 0; + } + + PARSER_RC rc; + parser->keyword = parser_find_keyword(parser, command); + if(likely(parser->keyword)) { + worker_is_busy(parser->keyword->worker_job_id); + +#ifdef NETDATA_LOG_STREAM_RECEIVE + if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire) + fprintf(parser->user.stream_log_fp, "%s", line); +#endif + + rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words); + // rc = (*t->func)(words, num_words, parser); + worker_is_idle(); + } + else + rc = PARSER_RC_ERROR; + + if(rc == PARSER_RC_ERROR) { + CLEAN_BUFFER *wb = buffer_create(1024, NULL); + line_splitter_reconstruct_line(wb, &parser->line); + netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line.count, buffer_tostring(wb)); + } + + line_splitter_reset(&parser->line); + return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); +} + +#endif //NETDATA_PLUGINSD_PARSER_H |