summaryrefslogtreecommitdiffstats
path: root/libnetdata/parser
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--libnetdata/parser/Makefile.am (renamed from parser/Makefile.am)0
-rw-r--r--libnetdata/parser/README.md28
-rw-r--r--libnetdata/parser/parser.c225
-rw-r--r--libnetdata/parser/parser.h (renamed from parser/parser.h)76
4 files changed, 280 insertions, 49 deletions
diff --git a/parser/Makefile.am b/libnetdata/parser/Makefile.am
index 02fe3a314..02fe3a314 100644
--- a/parser/Makefile.am
+++ b/libnetdata/parser/Makefile.am
diff --git a/libnetdata/parser/README.md b/libnetdata/parser/README.md
new file mode 100644
index 000000000..136c23c69
--- /dev/null
+++ b/libnetdata/parser/README.md
@@ -0,0 +1,28 @@
+<!--
+title: "Parser"
+custom_edit_url: https://github.com/netdata/netdata/blob/master/parser/README.md
+sidebar_label: "Parser"
+learn_status: "Published"
+learn_topic_type: "References"
+learn_rel_path: "Developers/Database"
+-->
+
+# Parser
+
+## Introduction
+
+Generic parser that is used to register keywords and a corresponding function that will be executed when that
+keyword is encountered in the command stream (either from plugins or via streaming)
+
+To use a parser do the following:
+
+1. Define a structure that will be used to share user state across calls (user defined `void *user`)
+2. Initialize the parser using `parser_init`
+3. Register keywords with their associated callback function using `parser_add_keyword`
+4. Start a loop for as long there is input (or parser_action returns error)
+ 1. Fetch the next line using `parser_next` (if needed)
+ 2. Process the line using `parser_action`
+5. Release the parser using `parser_destroy`
+6. Release the user structure
+
+See examples in receiver.c / pluginsd_parser.c
diff --git a/libnetdata/parser/parser.c b/libnetdata/parser/parser.c
new file mode 100644
index 000000000..c3eebcd16
--- /dev/null
+++ b/libnetdata/parser/parser.c
@@ -0,0 +1,225 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "parser.h"
+#include "collectors/plugins.d/pluginsd_parser.h"
+
+static inline int find_first_keyword(const char *src, char *dst, int dst_size, int (*custom_isspace)(char)) {
+ const char *s = src, *keyword_start;
+
+ while (unlikely(custom_isspace(*s))) s++;
+ keyword_start = s;
+
+ while (likely(*s && !custom_isspace(*s)) && dst_size > 1) {
+ *dst++ = *s++;
+ dst_size--;
+ }
+ *dst = '\0';
+ return dst_size == 0 ? 0 : (int) (s - keyword_start);
+}
+
+/*
+ * Initialize a parser
+ * user : as defined by the user, will be shared across calls
+ * input : main input stream (auto detect stream -- file, socket, pipe)
+ * buffer : This is the buffer to be used (if null a buffer of size will be allocated)
+ * size : buffer size either passed or will be allocated
+ * If the buffer is auto allocated, it will auto freed when the parser is destroyed
+ *
+ *
+ */
+
+PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd,
+ PARSER_INPUT_TYPE flags, void *ssl __maybe_unused)
+{
+ PARSER *parser;
+
+ parser = callocz(1, sizeof(*parser));
+ parser->user = user;
+ parser->fd = fd;
+ parser->fp_input = fp_input;
+ parser->fp_output = fp_output;
+#ifdef ENABLE_HTTPS
+ parser->ssl_output = ssl;
+#endif
+ parser->flags = flags;
+ parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB;
+
+ return parser;
+}
+
+
+static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) {
+ uint32_t hash = parser_hash_function(command);
+ uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE;
+ PARSER_KEYWORD *t = parser->keywords.hashtable[slot];
+
+ if(likely(t && strcmp(t->keyword, command) == 0))
+ return t;
+
+ return NULL;
+}
+
+/*
+ * Add a keyword and the corresponding function that will be called
+ * Multiple functions may be added
+ * Input : keyword
+ * : callback function
+ * : flags
+ * Output: > 0 registered function number
+ * : 0 Error
+ */
+
+void parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) {
+ if(unlikely(!parser || !keyword || !*keyword || !func))
+ fatal("PARSER: invalid parameters");
+
+ PARSER_KEYWORD *t = callocz(1, sizeof(*t));
+ t->worker_job_id = parser->worker_job_next_id++;
+ t->keyword = strdupz(keyword);
+ t->func = func;
+
+ uint32_t hash = parser_hash_function(keyword);
+ uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE;
+
+ if(unlikely(parser->keywords.hashtable[slot]))
+ fatal("PARSER: hashtable collision between keyword '%s' and '%s' on slot %u. "
+ "Change the hashtable size and / or the hashing function. "
+ "Run the unit test to find the optimal values.",
+ parser->keywords.hashtable[slot]->keyword,
+ t->keyword,
+ slot
+ );
+
+ parser->keywords.hashtable[slot] = t;
+
+ worker_register_job_name(t->worker_job_id, t->keyword);
+}
+
+/*
+ * Cleanup a previously allocated parser
+ */
+
+void parser_destroy(PARSER *parser)
+{
+ if (unlikely(!parser))
+ return;
+
+ dictionary_destroy(parser->inflight.functions);
+
+ // Remove keywords
+ for(size_t i = 0 ; i < PARSER_KEYWORDS_HASHTABLE_SIZE; i++) {
+ PARSER_KEYWORD *t = parser->keywords.hashtable[i];
+ if (t) {
+ freez(t->keyword);
+ freez(t);
+ }
+ }
+
+ freez(parser);
+}
+
+
+/*
+ * Fetch the next line to process
+ *
+ */
+
+int parser_next(PARSER *parser, char *buffer, size_t buffer_size)
+{
+ char *tmp = fgets(buffer, (int)buffer_size, (FILE *)parser->fp_input);
+
+ if (unlikely(!tmp)) {
+ if (feof((FILE *)parser->fp_input))
+ error("PARSER: read failed: end of file");
+
+ else if (ferror((FILE *)parser->fp_input))
+ error("PARSER: read failed: input error");
+
+ else
+ error("PARSER: read failed: unknown error");
+
+ return 1;
+ }
+
+ return 0;
+}
+
+
+/*
+* Takes an initialized parser object that has an unprocessed entry (by calling parser_next)
+* and if it contains a valid keyword, it will execute all the callbacks
+*
+*/
+
+inline int parser_action(PARSER *parser, char *input)
+{
+ parser->line++;
+
+ if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
+ char command[PLUGINSD_LINE_MAX + 1];
+ bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space);
+
+ if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
+ if(parser->defer.response) {
+ buffer_strcat(parser->defer.response, input);
+ if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) {
+ // more than 10MB of data
+ // a bad plugin that did not send the end_keyword
+ internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
+ return 1;
+ }
+ }
+ return 0;
+ }
+ else {
+ // call the action
+ parser->defer.action(parser, parser->defer.action_data);
+
+ // empty everything
+ parser->defer.action = NULL;
+ parser->defer.action_data = NULL;
+ parser->defer.end_keyword = NULL;
+ parser->defer.response = NULL;
+ parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD;
+ }
+ return 0;
+ }
+
+ char *words[PLUGINSD_MAX_WORDS];
+ size_t num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(words, num_words, 0);
+
+ if(unlikely(!command))
+ return 0;
+
+ PARSER_RC rc;
+ PARSER_KEYWORD *t = parser_find_keyword(parser, command);
+ if(likely(t)) {
+ worker_is_busy(t->worker_job_id);
+ rc = (*t->func)(words, num_words, parser->user);
+ worker_is_idle();
+ }
+ else
+ rc = PARSER_RC_ERROR;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(rc == PARSER_RC_ERROR) {
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
+ for(size_t i = 0; i < num_words ;i++) {
+ if(i) buffer_fast_strcat(wb, " ", 1);
+
+ buffer_fast_strcat(wb, "\"", 1);
+ const char *s = get_word(words, num_words, i);
+ buffer_strcat(wb, s?s:"");
+ buffer_fast_strcat(wb, "\"", 1);
+ }
+
+ internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
+ command, parser->line, buffer_tostring(wb));
+
+ buffer_free(wb);
+ }
+#endif
+
+ return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
+}
diff --git a/parser/parser.h b/libnetdata/parser/parser.h
index ad7488389..9e0d3480d 100644
--- a/parser/parser.h
+++ b/libnetdata/parser/parser.h
@@ -3,76 +3,56 @@
#ifndef NETDATA_INCREMENTAL_PARSER_H
#define NETDATA_INCREMENTAL_PARSER_H 1
-#include "daemon/common.h"
+#include "../libnetdata.h"
-#define PARSER_MAX_CALLBACKS 20
-#define PARSER_MAX_RECOVER_KEYWORDS 128
#define WORKER_PARSER_FIRST_JOB 3
// this has to be in-sync with the same at receiver.c
#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
+#define PARSER_KEYWORDS_HASHTABLE_SIZE 73 // unittest finds this magic number
+//#define parser_hash_function(s) djb2_hash32(s)
+//#define parser_hash_function(s) fnv1_hash32(s)
+//#define parser_hash_function(s) fnv1a_hash32(s)
+//#define parser_hash_function(s) larson_hash32(s)
+#define parser_hash_function(s) pluginsd_parser_hash32(s)
+
// PARSER return codes
-typedef enum parser_rc {
+typedef enum __attribute__ ((__packed__)) parser_rc {
PARSER_RC_OK, // Callback was successful, go on
PARSER_RC_STOP, // Callback says STOP
PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
} PARSER_RC;
-typedef enum parser_input_type {
+typedef enum __attribute__ ((__packed__)) parser_input_type {
PARSER_INPUT_SPLIT = (1 << 1),
- PARSER_INPUT_KEEP_ORIGINAL = (1 << 2),
- PARSER_INPUT_PROCESSED = (1 << 3),
- PARSER_NO_PARSE_INIT = (1 << 4),
- PARSER_NO_ACTION_INIT = (1 << 5),
- PARSER_DEFER_UNTIL_KEYWORD = (1 << 6),
+ PARSER_DEFER_UNTIL_KEYWORD = (1 << 2),
} PARSER_INPUT_TYPE;
-#define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL)
-
typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data);
typedef struct parser_keyword {
- size_t worker_job_id;
- char *keyword;
- uint32_t keyword_hash;
- int func_no;
- keyword_function func[PARSER_MAX_CALLBACKS+1];
- struct parser_keyword *next;
+ size_t worker_job_id;
+ char *keyword;
+ keyword_function func;
} PARSER_KEYWORD;
-typedef struct parser_data {
- char *line;
- struct parser_data *next;
-} PARSER_DATA;
-
typedef struct parser {
size_t worker_job_next_id;
uint8_t version; // Parser version
- RRDHOST *host;
int fd; // Socket
FILE *fp_input; // Input source e.g. stream
FILE *fp_output; // Stream to send commands to plugin
#ifdef ENABLE_HTTPS
struct netdata_ssl *ssl_output;
#endif
- PARSER_DATA *data; // extra input
- PARSER_KEYWORD *keyword; // List of parse keywords and functions
- void *user; // User defined structure to hold extra state between calls
+ void *user; // User defined structure to hold extra state between calls
uint32_t flags;
size_t line;
- char *(*read_function)(char *buffer, long unsigned int, void *input);
- int (*eof_function)(void *input);
- keyword_function unknown_function;
- char buffer[PLUGINSD_LINE_MAX];
- char *recover_location[PARSER_MAX_RECOVER_KEYWORDS+1];
- char recover_input[PARSER_MAX_RECOVER_KEYWORDS];
-#ifdef ENABLE_HTTPS
- int bytesleft;
- char tmpbuffer[PLUGINSD_LINE_MAX];
- char *readfrom;
-#endif
+ struct {
+ PARSER_KEYWORD *hashtable[PARSER_KEYWORDS_HASHTABLE_SIZE];
+ } keywords;
struct {
const char *end_keyword;
@@ -85,20 +65,13 @@ typedef struct parser {
DICTIONARY *functions;
usec_t smaller_timeout;
} inflight;
-
} PARSER;
-int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char));
-
-PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
-int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func);
-int parser_next(PARSER *working_parser);
+PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
+void parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func);
+int parser_next(PARSER *working_parser, char *buffer, size_t buffer_size);
int parser_action(PARSER *working_parser, char *input);
-int parser_push(PARSER *working_parser, char *line);
void parser_destroy(PARSER *working_parser);
-int parser_recover_input(PARSER *working_parser);
-
-size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations);
PARSER_RC pluginsd_set(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user);
@@ -114,10 +87,15 @@ PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user);
-PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_end_v2(char **words, size_t num_words, void *user);
+void pluginsd_cleanup_v2(void *user);
+
#endif