summaryrefslogtreecommitdiffstats
path: root/parser
diff options
context:
space:
mode:
Diffstat (limited to 'parser')
-rw-r--r--parser/Makefile.am9
-rw-r--r--parser/README.md149
-rw-r--r--parser/parser.c320
-rw-r--r--parser/parser.h114
4 files changed, 592 insertions, 0 deletions
diff --git a/parser/Makefile.am b/parser/Makefile.am
new file mode 100644
index 0000000..02fe3a3
--- /dev/null
+++ b/parser/Makefile.am
@@ -0,0 +1,9 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
diff --git a/parser/README.md b/parser/README.md
new file mode 100644
index 0000000..50d55e3
--- /dev/null
+++ b/parser/README.md
@@ -0,0 +1,149 @@
+#### Introduction
+
+The parser will be used to process streaming and plugins input as well as metadata
+
+Usage
+
+1. Define a structure that will be used to share user state across calls
+1. Initialize the parser using `parser_init`
+2. Register keywords and associated callback function using `parser_add_keyword`
+3. Register actions on the keywords
+4. Start a loop until EOF
+ 1. Fetch the next line using `parser_next`
+ 2. Process the line using `parser_action`
+ 1. The registered callbacks are executed to parse the input
+ 2. The registered action for the callback is called for processing
+4. Release the parser using `parser_destroy`
+5. Release the user structure
+
+#### Functions
+
+----
+##### parse_init(RRDHOST *host, void *user, void *input, int flags)
+
+Initialize an internal parser with the specified user defined data structure that will be shared across calls.
+
+Input
+- Host
+ - The host this parser will be dealing with. For streaming with SSL enabled for this host
+- user
+ - User defined structure that is passed in all the calls
+- input
+ - Where the parser will get the input from
+- flags
+ - flags to define processing on the input
+
+Output
+- A parser structure
+
+
+
+----
+##### parse_push(PARSER *parser, char *line)
+
+Push a new line for processing
+
+Input
+
+- parser
+ - The parser object as returned by the `parser_init`
+- line
+ - The new line to process
+
+
+Output
+- The line will be injected into the stream and will be the next one to be processed
+
+Returns
+- 0 line added
+- 1 error detected
+
+----
+##### parse_add_keyword(PARSER *parser, char *keyword, keyword_function callback_function)
+
+The function will add callbacks for keywords. The callback function is defined as
+
+`typedef PARSER_RC (*keyword_function)(char **, void *);`
+
+Input
+
+- parser
+ - The parser object as returned by the `parser_init`
+- keyword
+ - The keyword to register
+- keyword_function
+ - The callback that will handle the keyword processing
+ * The callback function should return one of the following
+ * PARSER_RC_OK -- Callback was successful (continue with other callbacks)
+ * PARSER_RC_STOP -- Stop processing callbacks (return OK)
+ * PARSER_RC_ERROR -- Callback failed, exit
+
+Output
+- The corresponding keyword and callback will be registered
+
+Returns
+- 0 maximum callbacks already registered for this keyword
+- > 0 which is the number of callbacks associated with this keyword.
+
+
+----
+##### parser_next(PARSER *parser)
+Return the next item to parse
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+
+Output
+- The parser will store internally the next item to parse
+
+Returns
+- 0 Next item fetched successfully
+- 1 No more items to parse
+
+----
+##### parser_action(PARSER *parser, char *input)
+Return the next item to parse
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+- input
+ - Process the input specified instead of using the internal buffer
+
+Output
+- The current keyword will be processed by calling all the registered callbacks
+
+Returns
+- 0 Callbacks called successfully
+- 1 Failed
+
+----
+##### parser_destroy(PARSER *parser)
+Cleanup a previously allocated parser
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+
+Output
+- The parser is deallocated
+
+Returns
+- none
+
+----
+##### parser_recover_input(PARSER *parser)
+Cleanup a previously allocated parser
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+
+Output
+- The parser is deallocated
+
+Returns
+- none
+
+[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fparser%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
diff --git a/parser/parser.c b/parser/parser.c
new file mode 100644
index 0000000..21d7fb3
--- /dev/null
+++ b/parser/parser.c
@@ -0,0 +1,320 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "parser.h"
+
+static inline int find_keyword(char *str, char *keyword, int max_size, int (*custom_isspace)(char))
+{
+ char *s = str, *keyword_start;
+
+ while (unlikely(custom_isspace(*s))) s++;
+ keyword_start = s;
+
+ while (likely(*s && !custom_isspace(*s)) && max_size > 0) {
+ *keyword++ = *s++;
+ max_size--;
+ }
+ *keyword = '\0';
+ return max_size == 0 ? 0 : (int) (s - keyword_start);
+}
+
+/*
+ * Initialize a parser
+ * user : as defined by the user, will be shared across calls
+ * input : main input stream (auto detect stream -- file, socket, pipe)
+ * buffer : This is the buffer to be used (if null a buffer of size will be allocated)
+ * size : buffer size either passed or will be allocated
+ * If the buffer is auto allocated, it will auto freed when the parser is destroyed
+ *
+ *
+ */
+
+PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags)
+{
+ PARSER *parser;
+
+ parser = callocz(1, sizeof(*parser));
+
+ if (unlikely(!parser))
+ return NULL;
+
+ parser->plugins_action = callocz(1, sizeof(PLUGINSD_ACTION));
+ if (unlikely(!parser->plugins_action)) {
+ freez(parser);
+ return NULL;
+ }
+
+ parser->user = user;
+ parser->input = input;
+ parser->flags = flags;
+ parser->host = host;
+#ifdef ENABLE_HTTPS
+ parser->bytesleft = 0;
+ parser->readfrom = NULL;
+#endif
+
+ if (unlikely(!(flags & PARSER_NO_PARSE_INIT))) {
+ int rc = parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
+ rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
+ rc += parser_add_keyword(parser, "SET", pluginsd_set);
+ }
+
+ return parser;
+}
+
+
+/*
+ * Push a new line into the parsing stream
+ *
+ * This line will be the next one to process ie the next fetch will get this one
+ *
+ */
+
+int parser_push(PARSER *parser, char *line)
+{
+ PARSER_DATA *tmp_parser_data;
+
+ if (unlikely(!parser))
+ return 1;
+
+ if (unlikely(!line))
+ return 0;
+
+ tmp_parser_data = callocz(1, sizeof(*tmp_parser_data));
+ tmp_parser_data->line = strdupz(line);
+ tmp_parser_data->next = parser->data;
+ parser->data = tmp_parser_data;
+
+ return 0;
+}
+
+/*
+ * Add a keyword and the corresponding function that will be called
+ * Multiple functions may be added
+ * Input : keyword
+ * : callback function
+ * : flags
+ * Output: > 0 registered function number
+ * : 0 Error
+ */
+
+int parser_add_keyword(PARSER *parser, char *keyword, keyword_function func)
+{
+ PARSER_KEYWORD *tmp_keyword;
+
+ if (strcmp(keyword, "_read") == 0) {
+ parser->read_function = (void *) func;
+ return 0;
+ }
+
+ if (strcmp(keyword, "_eof") == 0) {
+ parser->eof_function = (void *) func;
+ return 0;
+ }
+
+ if (strcmp(keyword, "_unknown") == 0) {
+ parser->unknown_function = (void *) func;
+ return 0;
+ }
+
+ uint32_t keyword_hash = simple_hash(keyword);
+
+ tmp_keyword = parser->keyword;
+
+ while (tmp_keyword) {
+ if (tmp_keyword->keyword_hash == keyword_hash && (!strcmp(tmp_keyword->keyword, keyword))) {
+ if (tmp_keyword->func_no == PARSER_MAX_CALLBACKS)
+ return 0;
+ tmp_keyword->func[tmp_keyword->func_no++] = (void *) func;
+ return tmp_keyword->func_no;
+ }
+ tmp_keyword = tmp_keyword->next;
+ }
+
+ tmp_keyword = callocz(1, sizeof(*tmp_keyword));
+
+ tmp_keyword->keyword = strdupz(keyword);
+ tmp_keyword->keyword_hash = keyword_hash;
+ tmp_keyword->func[tmp_keyword->func_no++] = (void *) func;
+
+ tmp_keyword->next = parser->keyword;
+ parser->keyword = tmp_keyword;
+ return tmp_keyword->func_no;
+}
+
+/*
+ * Cleanup a previously allocated parser
+ */
+
+void parser_destroy(PARSER *parser)
+{
+ if (unlikely(!parser))
+ return;
+
+ PARSER_KEYWORD *tmp_keyword, *tmp_keyword_next;
+ PARSER_DATA *tmp_parser_data, *tmp_parser_data_next;
+
+ // Remove keywords
+ tmp_keyword = parser->keyword;
+ while (tmp_keyword) {
+ tmp_keyword_next = tmp_keyword->next;
+ freez(tmp_keyword->keyword);
+ freez(tmp_keyword);
+ tmp_keyword = tmp_keyword_next;
+ }
+
+ // Remove pushed data if any
+ tmp_parser_data = parser->data;
+ while (tmp_parser_data) {
+ tmp_parser_data_next = tmp_parser_data->next;
+ freez(tmp_parser_data->line);
+ freez(tmp_parser_data);
+ tmp_parser_data = tmp_parser_data_next;
+ }
+
+ freez(parser->plugins_action);
+
+ freez(parser);
+ return;
+}
+
+
+/*
+ * Fetch the next line to process
+ *
+ */
+
+int parser_next(PARSER *parser)
+{
+ char *tmp = NULL;
+
+ if (unlikely(!parser))
+ return 1;
+
+ parser->flags &= ~(PARSER_INPUT_PROCESSED);
+
+ PARSER_DATA *tmp_parser_data = parser->data;
+
+ if (unlikely(tmp_parser_data)) {
+ strncpyz(parser->buffer, tmp_parser_data->line, PLUGINSD_LINE_MAX);
+ parser->data = tmp_parser_data->next;
+ freez(tmp_parser_data->line);
+ freez(tmp_parser_data);
+ return 0;
+ }
+
+ if (unlikely(parser->read_function))
+ tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->input);
+ else
+ tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->input);
+
+ if (unlikely(!tmp)) {
+ if (unlikely(parser->eof_function)) {
+ int rc = parser->eof_function(parser->input);
+ error("read failed: user defined function returned %d", rc);
+ }
+ else {
+ if (feof((FILE *)parser->input))
+ error("read failed: end of file");
+ else if (ferror((FILE *)parser->input))
+ error("read failed: input error");
+ else
+ error("read failed: unknown error");
+ }
+ return 1;
+ }
+ return 0;
+}
+
+
+/*
+* Takes an initialized parser object that has an unprocessed entry (by calling parser_next)
+* and if it contains a valid keyword, it will execute all the callbacks
+*
+*/
+
+inline int parser_action(PARSER *parser, char *input)
+{
+ PARSER_RC rc = PARSER_RC_OK;
+ char *words[PLUGINSD_MAX_WORDS] = { NULL };
+ char command[PLUGINSD_LINE_MAX];
+ keyword_function action_function;
+ keyword_function *action_function_list = NULL;
+
+ if (unlikely(!parser))
+ return 1;
+ parser->recover_location[0] = 0x0;
+
+ // if not direct input check if we have reprocessed this
+ if (unlikely(!input && parser->flags & PARSER_INPUT_PROCESSED))
+ return 0;
+
+ PARSER_KEYWORD *tmp_keyword = parser->keyword;
+ if (unlikely(!tmp_keyword)) {
+ return 1;
+ }
+
+ if (unlikely(!input))
+ input = parser->buffer;
+
+ if (unlikely(!find_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space)))
+ return 0;
+
+ if ((parser->flags & PARSER_INPUT_ORIGINAL) == PARSER_INPUT_ORIGINAL)
+ pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS);
+ else
+ pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+
+ uint32_t command_hash = simple_hash(command);
+
+ while(tmp_keyword) {
+ if (command_hash == tmp_keyword->keyword_hash &&
+ (!strcmp(command, tmp_keyword->keyword))) {
+ action_function_list = &tmp_keyword->func[0];
+ break;
+ }
+ tmp_keyword = tmp_keyword->next;
+ }
+
+ if (unlikely(!action_function_list)) {
+ if (unlikely(parser->unknown_function))
+ rc = parser->unknown_function(words, parser->user, NULL);
+ else
+ rc = PARSER_RC_ERROR;
+#ifdef NETDATA_INTERNAL_CHECKS
+ error("Unknown keyword [%s]", input);
+#endif
+ }
+ else {
+ while ((action_function = *action_function_list) != NULL) {
+ rc = action_function(words, parser->user, parser->plugins_action);
+ if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP))
+ break;
+ action_function_list++;
+ }
+ }
+
+ if (likely(input == parser->buffer))
+ parser->flags |= PARSER_INPUT_PROCESSED;
+
+ return (rc == PARSER_RC_ERROR);
+}
+
+inline int parser_recover_input(PARSER *parser)
+{
+ if (unlikely(!parser))
+ return 1;
+
+ for(int i=0; i < PARSER_MAX_RECOVER_KEYWORDS && parser->recover_location[i]; i++)
+ *(parser->recover_location[i]) = parser->recover_input[i];
+
+ parser->recover_location[0] = 0x0;
+
+ return 0;
+}
diff --git a/parser/parser.h b/parser/parser.h
new file mode 100644
index 0000000..86a837e
--- /dev/null
+++ b/parser/parser.h
@@ -0,0 +1,114 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_INCREMENTAL_PARSER_H
+#define NETDATA_INCREMENTAL_PARSER_H 1
+
+#include "../daemon/common.h"
+
+#define PARSER_MAX_CALLBACKS 20
+#define PARSER_MAX_RECOVER_KEYWORDS 128
+
+// PARSER return codes
+typedef enum parser_rc {
+ PARSER_RC_OK, // Callback was successful, go on
+ PARSER_RC_STOP, // Callback says STOP
+ PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
+} PARSER_RC;
+
+typedef struct pluginsd_action {
+ PARSER_RC (*set_action)(void *user, RRDSET *st, RRDDIM *rd, long long int value);
+ PARSER_RC (*begin_action)(void *user, RRDSET *st, usec_t microseconds, int trust_durations);
+ PARSER_RC (*end_action)(void *user, RRDSET *st);
+ PARSER_RC (*chart_action)
+ (void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin,
+ char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options);
+ PARSER_RC (*dimension_action)
+ (void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options,
+ RRD_ALGORITHM algorithm_type);
+
+ PARSER_RC (*flush_action)(void *user, RRDSET *st);
+ PARSER_RC (*disable_action)(void *user);
+ PARSER_RC (*variable_action)(void *user, RRDHOST *host, RRDSET *st, char *name, int global, calculated_number value);
+ PARSER_RC (*label_action)(void *user, char *key, char *value, LABEL_SOURCE source);
+ PARSER_RC (*overwrite_action)(void *user, RRDHOST *host, struct label *new_labels);
+
+ PARSER_RC (*guid_action)(void *user, uuid_t *uuid);
+ PARSER_RC (*context_action)(void *user, uuid_t *uuid);
+ PARSER_RC (*tombstone_action)(void *user, uuid_t *uuid);
+ PARSER_RC (*host_action)(void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os,
+ char *timezone, char *tags);
+} PLUGINSD_ACTION;
+
+typedef enum parser_input_type {
+ PARSER_INPUT_SPLIT = 1 << 1,
+ PARSER_INPUT_ORIGINAL = 1 << 2,
+ PARSER_INPUT_PROCESSED = 1 << 3,
+ PARSER_NO_PARSE_INIT = 1 << 4,
+ PARSER_NO_ACTION_INIT = 1 << 5,
+} PARSER_INPUT_TYPE;
+
+#define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL)
+
+typedef PARSER_RC (*keyword_function)(char **, void *, PLUGINSD_ACTION *plugins_action);
+
+typedef struct parser_keyword {
+ char *keyword;
+ uint32_t keyword_hash;
+ int func_no;
+ keyword_function func[PARSER_MAX_CALLBACKS+1];
+ struct parser_keyword *next;
+} PARSER_KEYWORD;
+
+typedef struct parser_data {
+ char *line;
+ struct parser_data *next;
+} PARSER_DATA;
+
+typedef struct parser {
+ uint8_t version; // Parser version
+ RRDHOST *host;
+ void *input; // Input source e.g. stream
+ PARSER_DATA *data; // extra input
+ PARSER_KEYWORD *keyword; // List of parse keywords and functions
+ PLUGINSD_ACTION *plugins_action;
+ void *user; // User defined structure to hold extra state between calls
+ uint32_t flags;
+
+ char *(*read_function)(char *buffer, long unsigned int, void *input);
+ int (*eof_function)(void *input);
+ keyword_function unknown_function;
+ char buffer[PLUGINSD_LINE_MAX];
+ char *recover_location[PARSER_MAX_RECOVER_KEYWORDS+1];
+ char recover_input[PARSER_MAX_RECOVER_KEYWORDS];
+#ifdef ENABLE_HTTPS
+ int bytesleft;
+ char tmpbuffer[PLUGINSD_LINE_MAX];
+ char *readfrom;
+#endif
+} PARSER;
+
+PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags);
+int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func);
+int parser_next(PARSER *working_parser);
+int parser_action(PARSER *working_parser, char *input);
+int parser_push(PARSER *working_parser, char *line);
+void parser_destroy(PARSER *working_parser);
+int parser_recover_input(PARSER *working_parser);
+
+extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations);
+
+extern PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+extern PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action);
+
+#endif