summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /collectors/plugins.d/pluginsd_parser.h
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.h')
-rw-r--r--collectors/plugins.d/pluginsd_parser.h69
1 files changed, 45 insertions, 24 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index 74767569b..1fce9a89a 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -11,8 +11,11 @@
#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
// this controls the max response size of a function
-#define PLUGINSD_MAX_DEFERRED_SIZE (20 * 1024 * 1024)
+#define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024)
+#define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024
+
+#define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral"
// PARSER return codes
typedef enum __attribute__ ((__packed__)) parser_rc {
PARSER_RC_OK, // Callback was successful, go on
@@ -28,6 +31,7 @@ typedef enum __attribute__ ((__packed__)) parser_input_type {
typedef enum __attribute__ ((__packed__)) {
PARSER_INIT_PLUGINSD = (1 << 1),
PARSER_INIT_STREAMING = (1 << 2),
+ PARSER_REP_METADATA = (1 << 3),
} PARSER_REPERTOIRE;
struct parser;
@@ -41,6 +45,7 @@ typedef struct parser_keyword {
} PARSER_KEYWORD;
typedef struct parser_user_object {
+ bool cleanup_slots;
RRDSET *st;
RRDHOST *host;
void *opaque;
@@ -51,6 +56,11 @@ typedef struct parser_user_object {
size_t data_collections_count;
int enabled;
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ FILE *stream_log_fp;
+ PARSER_REPERTOIRE stream_log_repertoire;
+#endif
+
STREAM_CAPABILITIES capabilities; // receiver capabilities
struct {
@@ -88,17 +98,21 @@ typedef struct parser {
PARSER_REPERTOIRE repertoire;
uint32_t flags;
int fd; // Socket
- size_t line;
FILE *fp_input; // Input source e.g. stream
FILE *fp_output; // Stream to send commands to plugin
#ifdef ENABLE_HTTPS
NETDATA_SSL *ssl_output;
#endif
+#ifdef ENABLE_H2O
+ void *h2o_ctx; // if set we use h2o_stream functions to send data
+#endif
PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
struct buffered_reader reader;
+ struct line_splitter line;
+ PARSER_KEYWORD *keyword;
struct {
const char *end_keyword;
@@ -150,8 +164,17 @@ static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *co
return NULL;
}
+bool parser_reconstruct_node(BUFFER *wb, void *ptr);
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr);
+bool parser_reconstruct_context(BUFFER *wb, void *ptr);
+
static inline int parser_action(PARSER *parser, char *input) {
- parser->line++;
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ static __thread char line[PLUGINSD_LINE_MAX + 1];
+ strncpyz(line, input, sizeof(line) - 1);
+#endif
+
+ parser->line.count++;
if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
char command[100 + 1];
@@ -183,18 +206,25 @@ static inline int parser_action(PARSER *parser, char *input) {
return 0;
}
- static __thread char *words[PLUGINSD_MAX_WORDS];
- size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
- const char *command = get_word(words, num_words, 0);
+ parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(parser->line.words, parser->line.num_words, 0);
- if(unlikely(!command))
+ if(unlikely(!command)) {
+ line_splitter_reset(&parser->line);
return 0;
+ }
PARSER_RC rc;
- PARSER_KEYWORD *t = parser_find_keyword(parser, command);
- if(likely(t)) {
- worker_is_busy(t->worker_job_id);
- rc = parser_execute(parser, t, words, num_words);
+ parser->keyword = parser_find_keyword(parser, command);
+ if(likely(parser->keyword)) {
+ worker_is_busy(parser->keyword->worker_job_id);
+
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire)
+ fprintf(parser->user.stream_log_fp, "%s", line);
+#endif
+
+ rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words);
// rc = (*t->func)(words, num_words, parser);
worker_is_idle();
}
@@ -202,22 +232,13 @@ static inline int parser_action(PARSER *parser, char *input) {
rc = PARSER_RC_ERROR;
if(rc == PARSER_RC_ERROR) {
- BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
- for(size_t i = 0; i < num_words ;i++) {
- if(i) buffer_fast_strcat(wb, " ", 1);
-
- buffer_fast_strcat(wb, "\"", 1);
- const char *s = get_word(words, num_words, i);
- buffer_strcat(wb, s?s:"");
- buffer_fast_strcat(wb, "\"", 1);
- }
-
+ CLEAN_BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
+ line_splitter_reconstruct_line(wb, &parser->line);
netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
- command, parser->line, buffer_tostring(wb));
-
- buffer_free(wb);
+ command, parser->line.count, buffer_tostring(wb));
}
+ line_splitter_reset(&parser->line);
return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
}