summaryrefslogtreecommitdiffstats
path: root/parser
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--parser/parser.c160
-rw-r--r--parser/parser.h109
2 files changed, 172 insertions, 97 deletions
diff --git a/parser/parser.c b/parser/parser.c
index c37d1e2c..5b4c528d 100644
--- a/parser/parser.c
+++ b/parser/parser.c
@@ -1,10 +1,11 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "parser.h"
+#include "collectors/plugins.d/pluginsd_parser.h"
-static inline int find_keyword(char *str, char *keyword, int max_size, int (*custom_isspace)(char))
+inline int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char))
{
- char *s = str, *keyword_start;
+ const char *s = str, *keyword_start;
while (unlikely(custom_isspace(*s))) s++;
keyword_start = s;
@@ -28,16 +29,22 @@ static inline int find_keyword(char *str, char *keyword, int max_size, int (*cus
*
*/
-PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags)
+PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused)
{
PARSER *parser;
parser = callocz(1, sizeof(*parser));
- parser->plugins_action = callocz(1, sizeof(PLUGINSD_ACTION));
parser->user = user;
- parser->input = input;
+ parser->fd = fd;
+ parser->fp_input = fp_input;
+ parser->fp_output = fp_output;
+#ifdef ENABLE_HTTPS
+ parser->ssl_output = ssl;
+#endif
parser->flags = flags;
parser->host = host;
+ parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB;
+ inflight_functions_init(parser);
#ifdef ENABLE_HTTPS
parser->bytesleft = 0;
@@ -45,18 +52,28 @@ PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE fl
#endif
if (unlikely(!(flags & PARSER_NO_PARSE_INIT))) {
- parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
- parser_add_keyword(parser, "CLABEL_COMMIT", pluginsd_clabel_commit);
- parser_add_keyword(parser, "CLABEL", pluginsd_clabel);
- parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
- parser_add_keyword(parser, "SET", pluginsd_set);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_SET, pluginsd_set);
+
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin);
+
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_rrdset_begin);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state);
+ parser_add_keyword(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end);
}
return parser;
@@ -133,7 +150,7 @@ int parser_add_keyword(PARSER *parser, char *keyword, keyword_function func)
tmp_keyword = callocz(1, sizeof(*tmp_keyword));
- tmp_keyword->worker_job_id = parser->worker_job_ids++;
+ tmp_keyword->worker_job_id = parser->worker_job_next_id++;
tmp_keyword->keyword = strdupz(keyword);
tmp_keyword->keyword_hash = keyword_hash;
tmp_keyword->func[tmp_keyword->func_no++] = (void *) func;
@@ -154,6 +171,8 @@ void parser_destroy(PARSER *parser)
if (unlikely(!parser))
return;
+ dictionary_destroy(parser->inflight.functions);
+
PARSER_KEYWORD *tmp_keyword, *tmp_keyword_next;
PARSER_DATA *tmp_parser_data, *tmp_parser_data_next;
@@ -175,7 +194,6 @@ void parser_destroy(PARSER *parser)
tmp_parser_data = tmp_parser_data_next;
}
- freez(parser->plugins_action);
freez(parser);
}
@@ -205,19 +223,21 @@ int parser_next(PARSER *parser)
}
if (unlikely(parser->read_function))
- tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->input);
+ tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->fp_input);
+ else if(likely(parser->fp_input))
+ tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->fp_input);
else
- tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->input);
+ tmp = NULL;
if (unlikely(!tmp)) {
if (unlikely(parser->eof_function)) {
- int rc = parser->eof_function(parser->input);
+ int rc = parser->eof_function(parser->fp_input);
error("read failed: user defined function returned %d", rc);
}
else {
- if (feof((FILE *)parser->input))
+ if (feof((FILE *)parser->fp_input))
error("read failed: end of file");
- else if (ferror((FILE *)parser->input))
+ else if (ferror((FILE *)parser->fp_input))
error("read failed: input error");
else
error("read failed: unknown error");
@@ -236,64 +256,97 @@ int parser_next(PARSER *parser)
inline int parser_action(PARSER *parser, char *input)
{
- PARSER_RC rc = PARSER_RC_OK;
- char *words[PLUGINSD_MAX_WORDS] = { NULL };
+ parser->line++;
+
+ PARSER_RC rc = PARSER_RC_OK;
+ char *words[PLUGINSD_MAX_WORDS];
char command[PLUGINSD_LINE_MAX + 1];
keyword_function action_function;
keyword_function *action_function_list = NULL;
- if (unlikely(!parser))
+ if (unlikely(!parser)) {
+ internal_error(true, "parser is NULL");
return 1;
+ }
+
parser->recover_location[0] = 0x0;
// if not direct input check if we have reprocessed this
if (unlikely(!input && parser->flags & PARSER_INPUT_PROCESSED))
return 0;
- PARSER_KEYWORD *tmp_keyword = parser->keyword;
+ PARSER_KEYWORD *tmp_keyword = parser->keyword;
if (unlikely(!tmp_keyword)) {
+ internal_error(true, "called without a keyword");
return 1;
}
if (unlikely(!input))
input = parser->buffer;
- if (unlikely(!find_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space)))
+ if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
+ bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space);
+
+ if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
+ if(parser->defer.response) {
+ buffer_strcat(parser->defer.response, input);
+ if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) {
+ // more than 10MB of data
+ // a bad plugin that did not send the end_keyword
+ internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
+ return 1;
+ }
+ }
+ return 0;
+ }
+ else {
+ // call the action
+ parser->defer.action(parser, parser->defer.action_data);
+
+ // empty everything
+ parser->defer.action = NULL;
+ parser->defer.action_data = NULL;
+ parser->defer.end_keyword = NULL;
+ parser->defer.response = NULL;
+ parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD;
+ }
+ return 0;
+ }
+
+ if (unlikely(!find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space)))
return 0;
- if ((parser->flags & PARSER_INPUT_ORIGINAL) == PARSER_INPUT_ORIGINAL)
- pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS);
+ size_t num_words = 0;
+ if ((parser->flags & PARSER_INPUT_KEEP_ORIGINAL) == PARSER_INPUT_KEEP_ORIGINAL)
+ num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS);
else
- pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+ num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
uint32_t command_hash = simple_hash(command);
- size_t worker_job_id = 0;
+ size_t worker_job_id = WORKER_UTILIZATION_MAX_JOB_TYPES + 1; // set an invalid value by default
while(tmp_keyword) {
- if (command_hash == tmp_keyword->keyword_hash &&
- (!strcmp(command, tmp_keyword->keyword))) {
- action_function_list = &tmp_keyword->func[0];
- worker_job_id = tmp_keyword->worker_job_id;
- break;
+ if (command_hash == tmp_keyword->keyword_hash && (!strcmp(command, tmp_keyword->keyword))) {
+ action_function_list = &tmp_keyword->func[0];
+ worker_job_id = tmp_keyword->worker_job_id;
+ break;
}
tmp_keyword = tmp_keyword->next;
}
if (unlikely(!action_function_list)) {
if (unlikely(parser->unknown_function))
- rc = parser->unknown_function(words, parser->user, NULL);
+ rc = parser->unknown_function(words, num_words, parser->user);
else
rc = PARSER_RC_ERROR;
-#ifdef NETDATA_INTERNAL_CHECKS
- error("Unknown keyword [%s]", input);
-#endif
}
else {
worker_is_busy(worker_job_id);
while ((action_function = *action_function_list) != NULL) {
- rc = action_function(words, parser->user, parser->plugins_action);
+ rc = action_function(words, num_words, parser->user);
if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP))
- break;
+ break;
+
action_function_list++;
}
worker_is_idle();
@@ -302,6 +355,25 @@ inline int parser_action(PARSER *parser, char *input)
if (likely(input == parser->buffer))
parser->flags |= PARSER_INPUT_PROCESSED;
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(rc == PARSER_RC_ERROR) {
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX);
+ for(size_t i = 0; i < num_words ;i++) {
+ if(i) buffer_fast_strcat(wb, " ", 1);
+
+ buffer_fast_strcat(wb, "\"", 1);
+ const char *s = get_word(words, num_words, i);
+ buffer_strcat(wb, s?s:"");
+ buffer_fast_strcat(wb, "\"", 1);
+ }
+
+ internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
+ command, parser->line, buffer_tostring(wb));
+
+ buffer_free(wb);
+ }
+#endif
+
return (rc == PARSER_RC_ERROR);
}
diff --git a/parser/parser.h b/parser/parser.h
index 1887318f..ad748838 100644
--- a/parser/parser.h
+++ b/parser/parser.h
@@ -7,6 +7,10 @@
#define PARSER_MAX_CALLBACKS 20
#define PARSER_MAX_RECOVER_KEYWORDS 128
+#define WORKER_PARSER_FIRST_JOB 3
+
+// this has to be in-sync with the same at receiver.c
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
// PARSER return codes
typedef enum parser_rc {
@@ -15,43 +19,18 @@ typedef enum parser_rc {
PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
} PARSER_RC;
-typedef struct pluginsd_action {
- PARSER_RC (*set_action)(void *user, RRDSET *st, RRDDIM *rd, long long int value);
- PARSER_RC (*begin_action)(void *user, RRDSET *st, usec_t microseconds, int trust_durations);
- PARSER_RC (*end_action)(void *user, RRDSET *st);
- PARSER_RC (*chart_action)
- (void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin,
- char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options);
- PARSER_RC (*dimension_action)
- (void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options,
- RRD_ALGORITHM algorithm_type);
-
- PARSER_RC (*flush_action)(void *user, RRDSET *st);
- PARSER_RC (*disable_action)(void *user);
- PARSER_RC (*variable_action)(void *user, RRDHOST *host, RRDSET *st, char *name, int global, NETDATA_DOUBLE value);
- PARSER_RC (*label_action)(void *user, char *key, char *value, RRDLABEL_SRC source);
- PARSER_RC (*overwrite_action)(void *user, RRDHOST *host, DICTIONARY *new_labels);
- PARSER_RC (*clabel_action)(void *user, char *key, char *value, RRDLABEL_SRC source);
- PARSER_RC (*clabel_commit_action)(void *user, RRDHOST *host, DICTIONARY *new_labels);
-
- PARSER_RC (*guid_action)(void *user, uuid_t *uuid);
- PARSER_RC (*context_action)(void *user, uuid_t *uuid);
- PARSER_RC (*tombstone_action)(void *user, uuid_t *uuid);
- PARSER_RC (*host_action)(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os,
- char *timezone, char *tags);
-} PLUGINSD_ACTION;
-
typedef enum parser_input_type {
- PARSER_INPUT_SPLIT = 1 << 1,
- PARSER_INPUT_ORIGINAL = 1 << 2,
- PARSER_INPUT_PROCESSED = 1 << 3,
- PARSER_NO_PARSE_INIT = 1 << 4,
- PARSER_NO_ACTION_INIT = 1 << 5,
+ PARSER_INPUT_SPLIT = (1 << 1),
+ PARSER_INPUT_KEEP_ORIGINAL = (1 << 2),
+ PARSER_INPUT_PROCESSED = (1 << 3),
+ PARSER_NO_PARSE_INIT = (1 << 4),
+ PARSER_NO_ACTION_INIT = (1 << 5),
+ PARSER_DEFER_UNTIL_KEYWORD = (1 << 6),
} PARSER_INPUT_TYPE;
#define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL)
-typedef PARSER_RC (*keyword_function)(char **, void *, PLUGINSD_ACTION *plugins_action);
+typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data);
typedef struct parser_keyword {
size_t worker_job_id;
@@ -68,15 +47,20 @@ typedef struct parser_data {
} PARSER_DATA;
typedef struct parser {
- size_t worker_job_ids;
+ size_t worker_job_next_id;
uint8_t version; // Parser version
RRDHOST *host;
- void *input; // Input source e.g. stream
+ int fd; // Socket
+ FILE *fp_input; // Input source e.g. stream
+ FILE *fp_output; // Stream to send commands to plugin
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl *ssl_output;
+#endif
PARSER_DATA *data; // extra input
PARSER_KEYWORD *keyword; // List of parse keywords and functions
- PLUGINSD_ACTION *plugins_action;
void *user; // User defined structure to hold extra state between calls
uint32_t flags;
+ size_t line;
char *(*read_function)(char *buffer, long unsigned int, void *input);
int (*eof_function)(void *input);
@@ -89,9 +73,24 @@ typedef struct parser {
char tmpbuffer[PLUGINSD_LINE_MAX];
char *readfrom;
#endif
+
+ struct {
+ const char *end_keyword;
+ BUFFER *response;
+ void (*action)(struct parser *parser, void *action_data);
+ void *action_data;
+ } defer;
+
+ struct {
+ DICTIONARY *functions;
+ usec_t smaller_timeout;
+ } inflight;
+
} PARSER;
-PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags);
+int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char));
+
+PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func);
int parser_next(PARSER *working_parser);
int parser_action(PARSER *working_parser, char *input);
@@ -99,22 +98,26 @@ int parser_push(PARSER *working_parser, char *line);
void parser_destroy(PARSER *working_parser);
int parser_recover_input(PARSER *working_parser);
-extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations);
-
-extern PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action);
-extern PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations);
+
+PARSER_RC pluginsd_set(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_end(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_flush(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_disable(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_label(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user);
+
+PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user);
#endif