summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/Makefile.am1
-rw-r--r--collectors/plugins.d/gperf-config.txt52
-rw-r--r--collectors/plugins.d/gperf-hashtable.h163
-rw-r--r--collectors/plugins.d/local_listeners.c366
-rw-r--r--collectors/plugins.d/plugins_d.c66
-rw-r--r--collectors/plugins.d/pluginsd_parser.c1220
-rw-r--r--collectors/plugins.d/pluginsd_parser.h169
7 files changed, 1453 insertions, 584 deletions
diff --git a/collectors/plugins.d/Makefile.am b/collectors/plugins.d/Makefile.am
index 59250a997..67fed309d 100644
--- a/collectors/plugins.d/Makefile.am
+++ b/collectors/plugins.d/Makefile.am
@@ -7,5 +7,6 @@ SUBDIRS = \
$(NULL)
dist_noinst_DATA = \
+ gperf-config.txt \
README.md \
$(NULL)
diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt
new file mode 100644
index 000000000..43be129e5
--- /dev/null
+++ b/collectors/plugins.d/gperf-config.txt
@@ -0,0 +1,52 @@
+%struct-type
+%omit-struct-type
+%define hash-function-name gperf_keyword_hash_function
+%define lookup-function-name gperf_lookup_keyword
+%define word-array-name gperf_keywords
+%define constants-prefix GPERF_PARSER_
+%define slot-name keyword
+%global-table
+%null-strings
+PARSER_KEYWORD;
+%%
+#
+# Plugins Only Keywords
+#
+FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1
+DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2
+EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3
+HOST, 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4
+HOST_DEFINE, 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5
+HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6
+HOST_LABEL, 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7
+#
+# Common keywords
+#
+BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8
+CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9
+CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10
+CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11
+DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12
+END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13
+FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14
+FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15
+LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16
+OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17
+SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18
+VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19
+#
+# Streaming only keywords
+#
+CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20
+BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21
+SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22
+END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
+#
+# Streaming Replication keywords
+#
+CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
+RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
+RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26
+REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
+RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
+RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h
new file mode 100644
index 000000000..b9e58975e
--- /dev/null
+++ b/collectors/plugins.d/gperf-hashtable.h
@@ -0,0 +1,163 @@
+/* ANSI-C code produced by gperf version 3.1 */
+/* Command-line: gperf --multiple-iterations=1000 --output-file=gperf-hashtable.h gperf-config.txt */
+/* Computed positions: -k'1-2' */
+
+#if !((' ' == 32) && ('!' == 33) && ('"' == 34) && ('#' == 35) \
+ && ('%' == 37) && ('&' == 38) && ('\'' == 39) && ('(' == 40) \
+ && (')' == 41) && ('*' == 42) && ('+' == 43) && (',' == 44) \
+ && ('-' == 45) && ('.' == 46) && ('/' == 47) && ('0' == 48) \
+ && ('1' == 49) && ('2' == 50) && ('3' == 51) && ('4' == 52) \
+ && ('5' == 53) && ('6' == 54) && ('7' == 55) && ('8' == 56) \
+ && ('9' == 57) && (':' == 58) && (';' == 59) && ('<' == 60) \
+ && ('=' == 61) && ('>' == 62) && ('?' == 63) && ('A' == 65) \
+ && ('B' == 66) && ('C' == 67) && ('D' == 68) && ('E' == 69) \
+ && ('F' == 70) && ('G' == 71) && ('H' == 72) && ('I' == 73) \
+ && ('J' == 74) && ('K' == 75) && ('L' == 76) && ('M' == 77) \
+ && ('N' == 78) && ('O' == 79) && ('P' == 80) && ('Q' == 81) \
+ && ('R' == 82) && ('S' == 83) && ('T' == 84) && ('U' == 85) \
+ && ('V' == 86) && ('W' == 87) && ('X' == 88) && ('Y' == 89) \
+ && ('Z' == 90) && ('[' == 91) && ('\\' == 92) && (']' == 93) \
+ && ('^' == 94) && ('_' == 95) && ('a' == 97) && ('b' == 98) \
+ && ('c' == 99) && ('d' == 100) && ('e' == 101) && ('f' == 102) \
+ && ('g' == 103) && ('h' == 104) && ('i' == 105) && ('j' == 106) \
+ && ('k' == 107) && ('l' == 108) && ('m' == 109) && ('n' == 110) \
+ && ('o' == 111) && ('p' == 112) && ('q' == 113) && ('r' == 114) \
+ && ('s' == 115) && ('t' == 116) && ('u' == 117) && ('v' == 118) \
+ && ('w' == 119) && ('x' == 120) && ('y' == 121) && ('z' == 122) \
+ && ('{' == 123) && ('|' == 124) && ('}' == 125) && ('~' == 126))
+/* The character set is not based on ISO-646. */
+#error "gperf generated tables don't work with this execution character set. Please report a bug to <bug-gperf@gnu.org>."
+#endif
+
+
+#define GPERF_PARSER_TOTAL_KEYWORDS 29
+#define GPERF_PARSER_MIN_WORD_LENGTH 3
+#define GPERF_PARSER_MAX_WORD_LENGTH 21
+#define GPERF_PARSER_MIN_HASH_VALUE 4
+#define GPERF_PARSER_MAX_HASH_VALUE 36
+/* maximum key range = 33, duplicates = 0 */
+
+#ifdef __GNUC__
+__inline
+#else
+#ifdef __cplusplus
+inline
+#endif
+#endif
+static unsigned int
+gperf_keyword_hash_function (register const char *str, register size_t len)
+{
+ static unsigned char asso_values[] =
+ {
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 15, 10, 1, 1, 9,
+ 4, 37, 0, 20, 37, 37, 9, 37, 14, 0,
+ 37, 37, 1, 0, 37, 7, 13, 37, 18, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37
+ };
+ return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]];
+}
+
+static PARSER_KEYWORD gperf_keywords[] =
+ {
+ {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0},
+#line 18 "gperf-config.txt"
+ {"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4},
+#line 51 "gperf-config.txt"
+ {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
+#line 26 "gperf-config.txt"
+ {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9},
+ {(char*)0},
+#line 52 "gperf-config.txt"
+ {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
+#line 49 "gperf-config.txt"
+ {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26},
+#line 21 "gperf-config.txt"
+ {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7},
+#line 19 "gperf-config.txt"
+ {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5},
+#line 35 "gperf-config.txt"
+ {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18},
+#line 42 "gperf-config.txt"
+ {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22},
+#line 50 "gperf-config.txt"
+ {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
+#line 20 "gperf-config.txt"
+ {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6},
+#line 27 "gperf-config.txt"
+ {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10},
+#line 48 "gperf-config.txt"
+ {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25},
+#line 15 "gperf-config.txt"
+ {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
+#line 31 "gperf-config.txt"
+ {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14},
+#line 40 "gperf-config.txt"
+ {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20},
+#line 47 "gperf-config.txt"
+ {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
+#line 34 "gperf-config.txt"
+ {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17},
+#line 28 "gperf-config.txt"
+ {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11},
+#line 25 "gperf-config.txt"
+ {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
+#line 41 "gperf-config.txt"
+ {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21},
+#line 30 "gperf-config.txt"
+ {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13},
+#line 43 "gperf-config.txt"
+ {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
+#line 16 "gperf-config.txt"
+ {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
+#line 33 "gperf-config.txt"
+ {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16},
+#line 29 "gperf-config.txt"
+ {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12},
+#line 17 "gperf-config.txt"
+ {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3},
+#line 32 "gperf-config.txt"
+ {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15},
+ {(char*)0}, {(char*)0}, {(char*)0},
+#line 36 "gperf-config.txt"
+ {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19}
+ };
+
+PARSER_KEYWORD *
+gperf_lookup_keyword (register const char *str, register size_t len)
+{
+ if (len <= GPERF_PARSER_MAX_WORD_LENGTH && len >= GPERF_PARSER_MIN_WORD_LENGTH)
+ {
+ register unsigned int key = gperf_keyword_hash_function (str, len);
+
+ if (key <= GPERF_PARSER_MAX_HASH_VALUE)
+ {
+ register const char *s = gperf_keywords[key].keyword;
+
+ if (s && *str == *s && !strcmp (str + 1, s + 1))
+ return &gperf_keywords[key];
+ }
+ }
+ return 0;
+}
diff --git a/collectors/plugins.d/local_listeners.c b/collectors/plugins.d/local_listeners.c
new file mode 100644
index 000000000..a39de7974
--- /dev/null
+++ b/collectors/plugins.d/local_listeners.c
@@ -0,0 +1,366 @@
+#include "libnetdata/libnetdata.h"
+#include "libnetdata/required_dummies.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <dirent.h>
+#include <string.h>
+#include <unistd.h>
+#include <ctype.h>
+#include <arpa/inet.h>
+
+typedef enum {
+ PROC_NET_PROTOCOL_TCP,
+ PROC_NET_PROTOCOL_TCP6,
+ PROC_NET_PROTOCOL_UDP,
+ PROC_NET_PROTOCOL_UDP6,
+} PROC_NET_PROTOCOLS;
+
+#define MAX_ERROR_LOGS 10
+
+static size_t pid_fds_processed = 0;
+static size_t pid_fds_failed = 0;
+static size_t errors_encountered = 0;
+
+static inline const char *protocol_name(PROC_NET_PROTOCOLS protocol) {
+ switch(protocol) {
+ default:
+ case PROC_NET_PROTOCOL_TCP:
+ return "TCP";
+
+ case PROC_NET_PROTOCOL_UDP:
+ return "UDP";
+
+ case PROC_NET_PROTOCOL_TCP6:
+ return "TCP6";
+
+ case PROC_NET_PROTOCOL_UDP6:
+ return "UDP6";
+ }
+}
+
+static inline int read_cmdline(pid_t pid, char* buffer, size_t bufferSize) {
+ char path[FILENAME_MAX + 1];
+ snprintfz(path, FILENAME_MAX, "%s/proc/%d/cmdline", netdata_configured_host_prefix, pid);
+
+ FILE* file = fopen(path, "r");
+ if (!file) {
+ if(++errors_encountered < MAX_ERROR_LOGS)
+ collector_error("LOCAL-LISTENERS: error opening file: %s\n", path);
+
+ return -1;
+ }
+
+ size_t bytesRead = fread(buffer, 1, bufferSize - 1, file);
+ buffer[bytesRead] = '\0'; // Ensure null-terminated
+
+ // Replace null characters in cmdline with spaces
+ for (size_t i = 0; i < bytesRead; i++) {
+ if (buffer[i] == '\0') {
+ buffer[i] = ' ';
+ }
+ }
+
+ fclose(file);
+ return 0;
+}
+
+static inline void fix_cmdline(char* str) {
+ if (str == NULL)
+ return;
+
+ char *s = str;
+
+ do {
+ if(*s == '|' || iscntrl(*s))
+ *s = '_';
+
+ } while(*++s);
+
+
+ while(s > str && *(s-1) == ' ')
+ *--s = '\0';
+}
+
+// ----------------------------------------------------------------------------
+
+#define HASH_TABLE_SIZE 100000
+
+typedef struct Node {
+ unsigned int inode; // key
+
+ // values
+ unsigned int port;
+ char local_address[INET6_ADDRSTRLEN];
+ PROC_NET_PROTOCOLS protocol;
+ bool processed;
+
+ // linking
+ struct Node *prev, *next;
+} Node;
+
+typedef struct HashTable {
+ Node *table[HASH_TABLE_SIZE];
+} HashTable;
+
+static HashTable *hashTable_key_inode_port_value = NULL;
+
+static inline void generate_output(const char *protocol, const char *address, unsigned int port, const char *cmdline) {
+ printf("%s|%s|%u|%s\n", protocol, address, port, cmdline);
+}
+
+HashTable* createHashTable() {
+ HashTable *hashTable = (HashTable*)mallocz(sizeof(HashTable));
+ memset(hashTable, 0, sizeof(HashTable));
+ return hashTable;
+}
+
+static inline unsigned int hashFunction(unsigned int inode) {
+ return inode % HASH_TABLE_SIZE;
+}
+
+static inline void insertHashTable(HashTable *hashTable, unsigned int inode, unsigned int port, PROC_NET_PROTOCOLS protocol, char *local_address) {
+ unsigned int index = hashFunction(inode);
+ Node *newNode = (Node*)mallocz(sizeof(Node));
+ newNode->inode = inode;
+ newNode->port = port;
+ newNode->protocol = protocol;
+ strncpyz(newNode->local_address, local_address, INET6_ADDRSTRLEN - 1);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(hashTable->table[index], newNode, prev, next);
+}
+
+static inline bool lookupHashTable_and_execute(HashTable *hashTable, unsigned int inode, pid_t pid) {
+ unsigned int index = hashFunction(inode);
+ for(Node *node = hashTable->table[index], *next = NULL ; node ; node = next) {
+ next = node->next;
+
+ if(node->inode == inode && node->port) {
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(hashTable->table[index], node, prev, next);
+ char cmdline[8192] = "";
+ read_cmdline(pid, cmdline, sizeof(cmdline));
+ fix_cmdline(cmdline);
+ generate_output(protocol_name(node->protocol), node->local_address, node->port, cmdline);
+ freez(node);
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void freeHashTable(HashTable *hashTable) {
+ for (unsigned int i = 0; i < HASH_TABLE_SIZE; i++) {
+ while(hashTable->table[i]) {
+ Node *tmp = hashTable->table[i];
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(hashTable->table[i], tmp, prev, next);
+ generate_output(protocol_name(tmp->protocol), tmp->local_address, tmp->port, "");
+ freez(tmp);
+ }
+ }
+ freez(hashTable);
+}
+
+// ----------------------------------------------------------------------------
+
+static inline void found_this_socket_inode(pid_t pid, unsigned int inode) {
+ lookupHashTable_and_execute(hashTable_key_inode_port_value, inode, pid);
+}
+
+bool find_all_sockets_in_proc(const char *proc_filename) {
+ DIR *proc_dir, *fd_dir;
+ struct dirent *proc_entry, *fd_entry;
+ char path_buffer[FILENAME_MAX + 1];
+
+ proc_dir = opendir(proc_filename);
+ if (proc_dir == NULL) {
+ if(++errors_encountered < MAX_ERROR_LOGS)
+ collector_error("LOCAL-LISTENERS: cannot opendir() '%s'", proc_filename);
+
+ pid_fds_failed++;
+ return false;
+ }
+
+ while ((proc_entry = readdir(proc_dir)) != NULL) {
+ // Check if directory entry is a PID by seeing if the name is made up of digits only
+ int is_pid = 1;
+ for (char *c = proc_entry->d_name; *c != '\0'; c++) {
+ if (*c < '0' || *c > '9') {
+ is_pid = 0;
+ break;
+ }
+ }
+
+ if (!is_pid)
+ continue;
+
+ // Build the path to the fd directory of the process
+ snprintfz(path_buffer, FILENAME_MAX, "%s/%s/fd/", proc_filename, proc_entry->d_name);
+
+ fd_dir = opendir(path_buffer);
+ if (fd_dir == NULL) {
+ if(++errors_encountered < MAX_ERROR_LOGS)
+ collector_error("LOCAL-LISTENERS: cannot opendir() '%s'", path_buffer);
+
+ pid_fds_failed++;
+ continue;
+ }
+
+ while ((fd_entry = readdir(fd_dir)) != NULL) {
+ if(!strcmp(fd_entry->d_name, ".") || !strcmp(fd_entry->d_name, ".."))
+ continue;
+
+ char link_path[FILENAME_MAX + 1];
+ char link_target[FILENAME_MAX + 1];
+ int inode;
+
+ // Build the path to the file descriptor link
+ snprintfz(link_path, FILENAME_MAX, "%s/%s", path_buffer, fd_entry->d_name);
+
+ ssize_t len = readlink(link_path, link_target, sizeof(link_target) - 1);
+ if (len == -1) {
+ if(++errors_encountered < MAX_ERROR_LOGS)
+ collector_error("LOCAL-LISTENERS: cannot read link '%s'", link_path);
+
+ pid_fds_failed++;
+ continue;
+ }
+ link_target[len] = '\0';
+
+ pid_fds_processed++;
+
+ // If the link target indicates a socket, print its inode number
+ if (sscanf(link_target, "socket:[%d]", &inode) == 1)
+ found_this_socket_inode((pid_t)strtoul(proc_entry->d_name, NULL, 10), inode);
+ }
+
+ closedir(fd_dir);
+ }
+
+ closedir(proc_dir);
+ return true;
+}
+
+// ----------------------------------------------------------------------------
+
+static inline void add_port_and_inode(PROC_NET_PROTOCOLS protocol, unsigned int port, unsigned int inode, char *local_address) {
+ insertHashTable(hashTable_key_inode_port_value, inode, port, protocol, local_address);
+}
+
+static inline void print_ipv6_address(const char *ipv6_str, char *dst) {
+ unsigned k;
+ char buf[9];
+ struct sockaddr_in6 sa;
+
+ // Initialize sockaddr_in6
+ memset(&sa, 0, sizeof(struct sockaddr_in6));
+ sa.sin6_family = AF_INET6;
+ sa.sin6_port = htons(0); // replace 0 with your port number
+
+ // Convert hex string to byte array
+ for (k = 0; k < 4; ++k)
+ {
+ memset(buf, 0, 9);
+ memcpy(buf, ipv6_str + (k * 8), 8);
+ sa.sin6_addr.s6_addr32[k] = strtoul(buf, NULL, 16);
+ }
+
+ // Convert to human-readable format
+ if (inet_ntop(AF_INET6, &(sa.sin6_addr), dst, INET6_ADDRSTRLEN) == NULL)
+ *dst = '\0';
+}
+
+static inline void print_ipv4_address(uint32_t address, char *dst) {
+ uint8_t octets[4];
+ octets[0] = address & 0xFF;
+ octets[1] = (address >> 8) & 0xFF;
+ octets[2] = (address >> 16) & 0xFF;
+ octets[3] = (address >> 24) & 0xFF;
+ sprintf(dst, "%u.%u.%u.%u", octets[0], octets[1], octets[2], octets[3]);
+}
+
+bool read_proc_net_x(const char *filename, PROC_NET_PROTOCOLS protocol) {
+ FILE *fp;
+ char *line = NULL;
+ size_t len = 0;
+ ssize_t read;
+ char address[INET6_ADDRSTRLEN];
+
+ ssize_t min_line_length = (protocol == PROC_NET_PROTOCOL_TCP || protocol == PROC_NET_PROTOCOL_UDP) ? 105 : 155;
+
+ fp = fopen(filename, "r");
+ if (fp == NULL)
+ return false;
+
+ // Read line by line
+ while ((read = getline(&line, &len, fp)) != -1) {
+ if(read < min_line_length) continue;
+
+ char local_address6[33], rem_address6[33];
+ unsigned int local_address, local_port, state, rem_address, rem_port, inode;
+
+ switch(protocol) {
+ case PROC_NET_PROTOCOL_TCP:
+ if(line[34] != '0' || line[35] != 'A')
+ continue;
+ // fall-through
+
+ case PROC_NET_PROTOCOL_UDP:
+ if (sscanf(line, "%*d: %X:%X %X:%X %X %*X:%*X %*X:%*X %*X %*d %*d %u",
+ &local_address, &local_port, &rem_address, &rem_port, &state, &inode) != 6)
+ continue;
+
+ print_ipv4_address(local_address, address);
+ break;
+
+ case PROC_NET_PROTOCOL_TCP6:
+ if(line[82] != '0' || line[83] != 'A')
+ continue;
+ // fall-through
+
+ case PROC_NET_PROTOCOL_UDP6:
+ if(sscanf(line, "%*d: %32[0-9A-Fa-f]:%X %32[0-9A-Fa-f]:%X %X %*X:%*X %*X:%*X %*X %*d %*d %u",
+ local_address6, &local_port, rem_address6, &rem_port, &state, &inode) != 6)
+ continue;
+
+ print_ipv6_address(local_address6, address);
+ break;
+ }
+
+ add_port_and_inode(protocol, local_port, inode, address);
+ }
+
+ fclose(fp);
+ if (line)
+ free(line);
+
+ return true;
+}
+
+// ----------------------------------------------------------------------------
+
+int main(int argc __maybe_unused, char **argv __maybe_unused) {
+ char path[FILENAME_MAX + 1];
+ hashTable_key_inode_port_value = createHashTable();
+
+ netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
+ if(!netdata_configured_host_prefix) netdata_configured_host_prefix = "";
+
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_TCP);
+
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/udp", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_UDP);
+
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp6", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_TCP6);
+
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/udp6", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_UDP6);
+
+ snprintfz(path, FILENAME_MAX, "%s/proc", netdata_configured_host_prefix);
+ find_all_sockets_in_proc(path);
+
+ freeHashTable(hashTable_key_inode_port_value);
+ return 0;
+}
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index da5226a5c..6a235b4e6 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -3,7 +3,7 @@
#include "plugins_d.h"
#include "pluginsd_parser.h"
-char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL };
+char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { [0] = PLUGINS_DIR, };
struct plugind *pluginsd_root = NULL;
inline size_t pluginsd_initialize_plugin_directories()
@@ -18,32 +18,32 @@ inline size_t pluginsd_initialize_plugin_directories()
}
// Parse it and store it to plugin directories
- return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace);
+ return quoted_strings_splitter_config(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES);
}
static inline void plugin_set_disabled(struct plugind *cd) {
- netdata_spinlock_lock(&cd->unsafe.spinlock);
+ spinlock_lock(&cd->unsafe.spinlock);
cd->unsafe.enabled = false;
- netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ spinlock_unlock(&cd->unsafe.spinlock);
}
bool plugin_is_enabled(struct plugind *cd) {
- netdata_spinlock_lock(&cd->unsafe.spinlock);
+ spinlock_lock(&cd->unsafe.spinlock);
bool ret = cd->unsafe.enabled;
- netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ spinlock_unlock(&cd->unsafe.spinlock);
return ret;
}
static inline void plugin_set_running(struct plugind *cd) {
- netdata_spinlock_lock(&cd->unsafe.spinlock);
+ spinlock_lock(&cd->unsafe.spinlock);
cd->unsafe.running = true;
- netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ spinlock_unlock(&cd->unsafe.spinlock);
}
static inline bool plugin_is_running(struct plugind *cd) {
- netdata_spinlock_lock(&cd->unsafe.spinlock);
+ spinlock_lock(&cd->unsafe.spinlock);
bool ret = cd->unsafe.running;
- netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ spinlock_unlock(&cd->unsafe.spinlock);
return ret;
}
@@ -53,7 +53,7 @@ static void pluginsd_worker_thread_cleanup(void *arg)
worker_unregister();
- netdata_spinlock_lock(&cd->unsafe.spinlock);
+ spinlock_lock(&cd->unsafe.spinlock);
cd->unsafe.running = false;
cd->unsafe.thread = 0;
@@ -61,15 +61,15 @@ static void pluginsd_worker_thread_cleanup(void *arg)
pid_t pid = cd->unsafe.pid;
cd->unsafe.pid = 0;
- netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ spinlock_unlock(&cd->unsafe.spinlock);
if (pid) {
siginfo_t info;
- info("PLUGINSD: 'host:%s', killing data collection child process with pid %d",
+ netdata_log_info("PLUGINSD: 'host:%s', killing data collection child process with pid %d",
rrdhost_hostname(cd->host), pid);
if (killpid(pid) != -1) {
- info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...",
+ netdata_log_info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...",
rrdhost_hostname(cd->host), pid);
netdata_waitid(P_PID, (id_t)pid, &info, WEXITED);
@@ -85,7 +85,7 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) {
}
if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) {
- info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
+ netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid,
plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
@@ -94,7 +94,7 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) {
}
if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
- error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, "
+ netdata_log_error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, "
"although it reports success (exits with 0)."
"We have tried to collect something %zu times - unsuccessfully. Disabling it.",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
@@ -105,21 +105,21 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) {
static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) {
if (worker_ret_code == -1) {
- info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.",
+ netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
plugin_set_disabled(cd);
return;
}
if (!cd->successful_collections) {
- error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.",
+ netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code);
plugin_set_disabled(cd);
return;
}
if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) {
- error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
+ netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
sleep((unsigned int)(cd->update_every * 10));
@@ -127,7 +127,7 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r
}
if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
- error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
+ netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
"We tried to restart it %zu times, but it failed to generate data. Disabling it.",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code,
cd->successful_collections, cd->serial_failures);
@@ -153,16 +153,16 @@ static void *pluginsd_worker_thread(void *arg) {
FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
if (unlikely(!fp_child_input || !fp_child_output)) {
- error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd);
+ netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd);
break;
}
- info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
+ netdata_log_info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
- info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
+ netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
killpid(cd->unsafe.pid);
@@ -186,21 +186,21 @@ static void *pluginsd_worker_thread(void *arg) {
static void pluginsd_main_cleanup(void *data) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- info("PLUGINSD: cleaning up...");
+ netdata_log_info("PLUGINSD: cleaning up...");
struct plugind *cd;
for (cd = pluginsd_root; cd; cd = cd->next) {
- netdata_spinlock_lock(&cd->unsafe.spinlock);
+ spinlock_lock(&cd->unsafe.spinlock);
if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) {
- info("PLUGINSD: 'host:%s', stopping plugin thread: %s",
+ netdata_log_info("PLUGINSD: 'host:%s', stopping plugin thread: %s",
rrdhost_hostname(cd->host), cd->id);
netdata_thread_cancel(cd->unsafe.thread);
}
- netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ spinlock_unlock(&cd->unsafe.spinlock);
}
- info("PLUGINSD: cleanup completed.");
+ netdata_log_info("PLUGINSD: cleanup completed.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
worker_unregister();
@@ -235,7 +235,7 @@ void *pluginsd_main(void *ptr)
if (unlikely(!dir)) {
if (directory_errors[idx] != errno) {
directory_errors[idx] = errno;
- error("cannot open plugins directory '%s'", directory_name);
+ netdata_log_error("cannot open plugins directory '%s'", directory_name);
}
continue;
}
@@ -245,7 +245,7 @@ void *pluginsd_main(void *ptr)
if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
- debug(D_PLUGINSD, "examining file '%s'", file->d_name);
+ netdata_log_debug(D_PLUGINSD, "examining file '%s'", file->d_name);
if (unlikely(strcmp(file->d_name, ".") == 0 || strcmp(file->d_name, "..") == 0))
continue;
@@ -254,7 +254,7 @@ void *pluginsd_main(void *ptr)
if (unlikely(len <= (int)PLUGINSD_FILE_SUFFIX_LEN))
continue;
if (unlikely(strcmp(PLUGINSD_FILE_SUFFIX, &file->d_name[len - (int)PLUGINSD_FILE_SUFFIX_LEN]) != 0)) {
- debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX);
+ netdata_log_debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX);
continue;
}
@@ -263,7 +263,7 @@ void *pluginsd_main(void *ptr)
int enabled = config_get_boolean(CONFIG_SECTION_PLUGINS, pluginname, automatic_run);
if (unlikely(!enabled)) {
- debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name);
+ netdata_log_debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name);
continue;
}
@@ -274,7 +274,7 @@ void *pluginsd_main(void *ptr)
break;
if (likely(cd && plugin_is_running(cd))) {
- debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename);
+ netdata_log_debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename);
continue;
}
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 097e5ea60..cda17710c 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,90 +4,103 @@
#define LOG_FUNCTIONS false
-static int send_to_plugin(const char *txt, void *data) {
+static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
if(!txt || !*txt)
return 0;
+ errno = 0;
+ spinlock_lock(&parser->writer.spinlock);
+ ssize_t bytes = -1;
+
#ifdef ENABLE_HTTPS
NETDATA_SSL *ssl = parser->ssl_output;
if(ssl) {
+
if(SSL_connection(ssl))
- return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt));
+ bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
- error("PLUGINSD: cannot send command (SSL)");
- return -1;
+ else
+ netdata_log_error("PLUGINSD: cannot send command (SSL)");
+
+ spinlock_unlock(&parser->writer.spinlock);
+ return bytes;
}
#endif
if(parser->fp_output) {
- int bytes = fprintf(parser->fp_output, "%s", txt);
+
+ bytes = fprintf(parser->fp_output, "%s", txt);
if(bytes <= 0) {
- error("PLUGINSD: cannot send command (FILE)");
- return -2;
+ netdata_log_error("PLUGINSD: cannot send command (FILE)");
+ bytes = -2;
}
- fflush(parser->fp_output);
+ else
+ fflush(parser->fp_output);
+
+ spinlock_unlock(&parser->writer.spinlock);
return bytes;
}
if(parser->fd != -1) {
- size_t bytes = 0;
- size_t total = strlen(txt);
+ bytes = 0;
+ ssize_t total = (ssize_t)strlen(txt);
ssize_t sent;
do {
sent = write(parser->fd, &txt[bytes], total - bytes);
if(sent <= 0) {
- error("PLUGINSD: cannot send command (fd)");
+ netdata_log_error("PLUGINSD: cannot send command (fd)");
+ spinlock_unlock(&parser->writer.spinlock);
return -3;
}
bytes += sent;
}
while(bytes < total);
+ spinlock_unlock(&parser->writer.spinlock);
return (int)bytes;
}
- error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
+ spinlock_unlock(&parser->writer.spinlock);
+ netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
return -4;
}
-static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) {
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+static inline RRDHOST *pluginsd_require_host_from_parent(PARSER *parser, const char *cmd) {
+ RRDHOST *host = parser->user.host;
if(unlikely(!host))
- error("PLUGINSD: command %s requires a host, but is not set.", cmd);
+ netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
return host;
}
-static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) {
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+static inline RRDSET *pluginsd_require_chart_from_parent(PARSER *parser, const char *cmd, const char *parent_cmd) {
+ RRDSET *st = parser->user.st;
if(unlikely(!st))
- error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
+ netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
return st;
}
-static inline RRDSET *pluginsd_get_chart_from_parent(void *user) {
- return ((PARSER_USER_OBJECT *) user)->st;
+static inline RRDSET *pluginsd_get_chart_from_parent(PARSER *parser) {
+ return parser->user.st;
}
-static inline void pluginsd_lock_rrdset_data_collection(void *user) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- if(u->st && !u->v2.locked_data_collection) {
- netdata_spinlock_lock(&u->st->data_collection_lock);
- u->v2.locked_data_collection = true;
+static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && !parser->user.v2.locked_data_collection) {
+ spinlock_lock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = true;
}
}
-static inline bool pluginsd_unlock_rrdset_data_collection(void *user) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- if(u->st && u->v2.locked_data_collection) {
- netdata_spinlock_unlock(&u->st->data_collection_lock);
- u->v2.locked_data_collection = false;
+static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && parser->user.v2.locked_data_collection) {
+ spinlock_unlock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = false;
return true;
}
@@ -108,29 +121,29 @@ void pluginsd_rrdset_cleanup(RRDSET *st) {
st->pluginsd.pos = 0;
}
-static inline void pluginsd_unlock_previous_chart(void *user, const char *keyword, bool stale) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
-
- if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) {
+static inline void pluginsd_unlock_previous_chart(PARSER *parser, const char *keyword, bool stale) {
+ if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
if(stale)
- error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
- rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
}
- if(unlikely(u->v2.ml_locked)) {
- ml_chart_update_end(u->st);
- u->v2.ml_locked = false;
+ if(unlikely(parser->user.v2.ml_locked)) {
+ ml_chart_update_end(parser->user.st);
+ parser->user.v2.ml_locked = false;
if(stale)
- error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
- rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
}
}
-static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
-
- pluginsd_unlock_previous_chart(user, keyword, true);
+static inline void pluginsd_set_chart_from_parent(PARSER *parser, RRDSET *st, const char *keyword) {
+ pluginsd_unlock_previous_chart(parser, keyword, true);
if(st) {
size_t dims = dictionary_entries(st->rrddim_root_index);
@@ -145,13 +158,13 @@ static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const
st->pluginsd.pos = 0;
}
- u->st = st;
+ parser->user.st = st;
}
static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
if (unlikely(!dimension || !*dimension)) {
- error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
- rrdhost_hostname(host), rrdset_id(st), cmd);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
@@ -172,8 +185,8 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons
rda = rrddim_find_and_acquire(st, dimension);
if (unlikely(!rda)) {
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
- rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
return NULL;
}
@@ -186,21 +199,21 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons
static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
if (unlikely(!chart || !*chart)) {
- error("PLUGINSD: 'host:%s' got a %s without a chart id.",
- rrdhost_hostname(host), cmd);
+ netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
+ rrdhost_hostname(host), cmd);
return NULL;
}
RRDSET *st = rrdset_find(host, chart);
if (unlikely(!st))
- error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
- rrdhost_hostname(host), chart, cmd);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
+ rrdhost_hostname(host), chart, cmd);
return st;
}
-static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) {
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
+static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
+ parser->user.enabled = 0;
if(keyword && msg) {
error_limit_static_global_var(erl, 1, 0);
@@ -210,22 +223,21 @@ static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword,
return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
char *dimension = get_word(words, num_words, 1);
char *value = get_word(words, num_words, 2);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
+ netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
if (value && *value)
@@ -234,18 +246,17 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
char *id = get_word(words, num_words, 1);
char *microseconds_txt = get_word(words, num_words, 2);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN);
usec_t microseconds = 0;
if (microseconds_txt && *microseconds_txt) {
@@ -270,7 +281,7 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
if (likely(st->counter_done)) {
if (likely(microseconds)) {
- if (((PARSER_USER_OBJECT *)user)->trust_durations)
+ if (parser->user.trust_durations)
rrdset_next_usec_unfiltered(st, microseconds);
else
rrdset_next_usec(st, microseconds);
@@ -281,22 +292,21 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
UNUSED(words);
UNUSED(num_words);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
+ netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END);
- ((PARSER_USER_OBJECT *) user)->data_collections_count++;
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_END);
+ parser->user.data_collections_count++;
struct timeval now;
now_realtime_timeval(&now);
@@ -305,15 +315,13 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-static void pluginsd_host_define_cleanup(void *user) {
- PARSER_USER_OBJECT *u = user;
+static void pluginsd_host_define_cleanup(PARSER *parser) {
+ string_freez(parser->user.host_define.hostname);
+ dictionary_destroy(parser->user.host_define.rrdlabels);
- string_freez(u->host_define.hostname);
- dictionary_destroy(u->host_define.rrdlabels);
-
- u->host_define.hostname = NULL;
- u->host_define.rrdlabels = NULL;
- u->host_define.parsing_host = false;
+ parser->user.host_define.hostname = NULL;
+ parser->user.host_define.rrdlabels = NULL;
+ parser->user.host_define.parsing_host = false;
}
static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
@@ -325,61 +333,56 @@ static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid
return true;
}
-static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) {
- PARSER_USER_OBJECT *u = user;
-
+static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
char *guid = get_word(words, num_words, 1);
char *hostname = get_word(words, num_words, 2);
if(unlikely(!guid || !*guid || !hostname || !*hostname))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
- if(unlikely(u->host_define.parsing_host))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE,
+ if(unlikely(parser->user.host_define.parsing_host))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
"another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
- if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
+ if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
- u->host_define.hostname = string_strdupz(hostname);
- u->host_define.rrdlabels = rrdlabels_create();
- u->host_define.parsing_host = true;
+ parser->user.host_define.hostname = string_strdupz(hostname);
+ parser->user.host_define.rrdlabels = rrdlabels_create();
+ parser->user.host_define.parsing_host = true;
return PARSER_RC_OK;
}
-static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) {
- PARSER_USER_OBJECT *u = user;
-
+static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) {
char *name = get_word(words, num_words, 1);
char *value = get_word(words, num_words, 2);
if(!name || !*name || !value)
- return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
- if(!u->host_define.parsing_host || !dict)
- return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+ if(!parser->user.host_define.parsing_host || !dict)
+ return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
return PARSER_RC_OK;
}
-static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) {
- PARSER_USER_OBJECT *u = user;
- return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL);
+static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
+ return pluginsd_host_dictionary(words, num_words, parser,
+ parser->user.host_define.rrdlabels,
+ PLUGINSD_KEYWORD_HOST_LABEL);
}
-static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
- PARSER_USER_OBJECT *u = user;
-
- if(!u->host_define.parsing_host)
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ if(!parser->user.host_define.parsing_host)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
RRDHOST *host = rrdhost_find_or_create(
- string2str(u->host_define.hostname),
- string2str(u->host_define.hostname),
- u->host_define.machine_guid_str,
+ string2str(parser->user.host_define.hostname),
+ string2str(parser->user.host_define.hostname),
+ parser->user.host_define.machine_guid_str,
"Netdata Virtual Host 1.0",
netdata_configured_timezone,
netdata_configured_abbrev_timezone,
@@ -398,22 +401,24 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu
default_rrdpush_enable_replication,
default_rrdpush_seconds_to_replicate,
default_rrdpush_replication_step,
- rrdhost_labels_to_system_info(u->host_define.rrdlabels),
+ rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
false
);
+ rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
+
if(host->rrdlabels) {
- rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels);
+ rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
}
else {
- host->rrdlabels = u->host_define.rrdlabels;
- u->host_define.rrdlabels = NULL;
+ host->rrdlabels = parser->user.host_define.rrdlabels;
+ parser->user.host_define.rrdlabels = NULL;
}
- pluginsd_host_define_cleanup(user);
+ pluginsd_host_define_cleanup(parser);
- u->host = host;
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
+ parser->user.host = host;
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
rrdcontext_host_child_connected(host);
@@ -422,34 +427,31 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu
return PARSER_RC_OK;
}
-static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) {
- PARSER_USER_OBJECT *u = user;
-
+static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
char *guid = get_word(words, num_words, 1);
if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
- u->host = localhost;
+ parser->user.host = localhost;
return PARSER_RC_OK;
}
uuid_t uuid;
char uuid_str[UUID_STR_LEN];
if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
if(unlikely(!host))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
- u->host = host;
+ parser->user.host = host;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
-{
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
char *type = get_word(words, num_words, 1);
char *name = get_word(words, num_words, 2);
@@ -473,7 +475,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
// make sure we have the required variables
if (unlikely((!type || !*type || !id || !*id)))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
// parse the name, and make sure it does not include 'type.'
if (unlikely(name && *name)) {
@@ -494,11 +496,11 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
if (likely(priority_s && *priority_s))
priority = str2i(priority_s);
- int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
+ int update_every = parser->user.cd->update_every;
if (likely(update_every_s && *update_every_s))
update_every = str2i(update_every_s);
if (unlikely(!update_every))
- update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
+ update_every = parser->user.cd->update_every;
RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
if (unlikely(chart))
@@ -515,7 +517,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
if (unlikely(!units))
units = "unknown";
- debug(
+ netdata_log_debug(
D_PLUGINSD,
"creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
@@ -525,14 +527,16 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
st = rrdset_create(
host, type, id, name, family, context, title, units,
- (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename,
+ (plugin && *plugin) ? plugin : parser->user.cd->filename,
module, priority, update_every,
chart_type);
if (likely(st)) {
if (options && *options) {
- if (strstr(options, "obsolete"))
+ if (strstr(options, "obsolete")) {
+ pluginsd_rrdset_cleanup(st);
rrdset_is_obsolete(st);
+ }
else
rrdset_isnot_obsolete(st);
@@ -556,22 +560,21 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
}
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_CHART);
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
const char *first_entry_txt = get_word(words, num_words, 1);
const char *last_entry_txt = get_word(words, num_words, 2);
const char *wall_clock_time_txt = get_word(words, num_words, 3);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
@@ -590,7 +593,6 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
- PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
ok = replicate_chart_request(send_to_plugin, parser, host, st,
first_entry_child, last_entry_child, child_wall_clock_time,
0, 0);
@@ -605,8 +607,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
char *id = get_word(words, num_words, 1);
char *name = get_word(words, num_words, 2);
char *algorithm = get_word(words, num_words, 3);
@@ -614,14 +615,14 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
char *divisor_s = get_word(words, num_words, 5);
char *options = get_word(words, num_words, 6);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if (unlikely(!id))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
long multiplier = 1;
if (multiplier_s && *multiplier_s) {
@@ -641,7 +642,7 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
algorithm = "absolute";
if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(
+ netdata_log_debug(
D_PLUGINSD,
"creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
@@ -720,7 +721,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
pf->sent_ut = now_realtime_usec();
if(ret < 0) {
- error("FUNCTION: failed to send function to plugin, error %d", ret);
+ netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret);
rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
}
else {
@@ -734,7 +735,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
struct inflight_function *pf = new_func;
- error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
+ netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
pf->callback(pf->destination_wb, pf->code, pf->callback_data);
string_freez(pf->function);
@@ -825,8 +826,7 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou
return HTTP_RESP_OK;
}
-PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
bool global = false;
size_t i = 1;
if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
@@ -838,21 +838,21 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
char *timeout_s = get_word(words, num_words, i++);
char *help = get_word(words, num_words, i++);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION);
if(!host) return PARSER_RC_ERROR;
- RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
+ RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
if(!st) global = true;
if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
- error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
- rrdhost_hostname(host),
- st?rrdset_id(st):"(unset)",
- global?"yes":"no",
- name?name:"(unset)",
- timeout_s?timeout_s:"(unset)",
- help?help:"(unset)"
- );
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
+ rrdhost_hostname(host),
+ st?rrdset_id(st):"(unset)",
+ global?"yes":"no",
+ name?name:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ help?help:"(unset)"
+ );
return PARSER_RC_ERROR;
}
@@ -863,7 +863,6 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
}
- PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
return PARSER_RC_OK;
@@ -876,15 +875,14 @@ static void pluginsd_function_result_end(struct parser *parser, void *action_dat
string_freez(key);
}
-PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
char *key = get_word(words, num_words, 1);
char *status = get_word(words, num_words, 2);
char *format = get_word(words, num_words, 3);
char *expires = get_word(words, num_words, 4);
if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
- error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
+ netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
, key ? key : "(unset)"
, status ? status : "(unset)"
, format ? format : "(unset)"
@@ -898,15 +896,13 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u
time_t expiration = (expires && *expires) ? str2l(expires) : 0;
- PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
-
struct inflight_function *pf = NULL;
if(key && *key)
pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
if(!pf) {
- error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
+ netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
}
else {
if(format && *format)
@@ -932,16 +928,15 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u
// ----------------------------------------------------------------------------
-PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
char *name = get_word(words, num_words, 1);
char *value = get_word(words, num_words, 2);
NETDATA_DOUBLE v;
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_VARIABLE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_get_chart_from_parent(user);
+ RRDSET *st = pluginsd_get_chart_from_parent(parser);
int global = (st) ? 0 : 1;
@@ -958,39 +953,39 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
}
if (unlikely(!name || !*name))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
if (unlikely(!value || !*value))
value = NULL;
if (unlikely(!value)) {
- error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- (global) ? "HOST" : "CHART",
- name);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ (global) ? "HOST" : "CHART",
+ name);
return PARSER_RC_OK;
}
if (!global && !st)
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
char *endptr = NULL;
v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
if (unlikely(endptr && *endptr)) {
if (endptr == value)
- error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name);
else
- error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name,
- endptr);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name,
+ endptr);
}
if (global) {
@@ -1000,9 +995,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
rrdvar_custom_host_variable_release(host, rva);
}
else
- error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
- rrdhost_hostname(host),
- name);
+ netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
+ rrdhost_hostname(host),
+ name);
} else {
const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
if (rsa) {
@@ -1010,39 +1005,36 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
rrdsetvar_custom_chart_variable_release(st, rsa);
}
else
- error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
- rrdhost_hostname(host), rrdset_id(st), name);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
+ rrdhost_hostname(host), rrdset_id(st), name);
}
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
-{
- debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH);
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_FLUSH);
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
-{
- info("PLUGINSD: plugin called DISABLE. Disabling it.");
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
+static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
+ parser->user.enabled = 0;
return PARSER_RC_STOP;
}
-PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
const char *name = get_word(words, num_words, 1);
const char *label_source = get_word(words, num_words, 2);
const char *value = get_word(words, num_words, 3);
if (!name || !label_source || !value)
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
char *store = (char *)value;
bool allocated_store = false;
@@ -1071,13 +1063,10 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
}
}
- if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
- ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
+ if(unlikely(!(parser->user.new_host_labels)))
+ parser->user.new_host_labels = rrdlabels_create();
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
- name,
- store,
- str2l(label_source));
+ rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
if (allocated_store)
freez(store);
@@ -1085,90 +1074,84 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
-{
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_OVERWRITE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- debug(D_PLUGINSD, "requested to OVERWRITE host labels");
+ netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
if(unlikely(!host->rrdlabels))
host->rrdlabels = rrdlabels_create();
- rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
+ rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
- rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
- ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
+ rrdlabels_destroy(parser->user.new_host_labels);
+ parser->user.new_host_labels = NULL;
return PARSER_RC_OK;
}
-
-PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
const char *name = get_word(words, num_words, 1);
const char *value = get_word(words, num_words, 2);
const char *label_source = get_word(words, num_words, 3);
if (!name || !value || !*label_source) {
- error("Ignoring malformed or empty CHART LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
}
- if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
- RRDSET *st = pluginsd_get_chart_from_parent(user);
- ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels;
- rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
+ if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
+ RRDSET *st = pluginsd_get_chart_from_parent(parser);
+ parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
+ rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
}
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
- name, value, str2l(label_source));
+ rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
-{
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- debug(D_PLUGINSD, "requested to commit chart labels");
+ netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
- if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
- error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
- rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!parser->user.chart_rrdlabels_linked_temporarily) {
+ netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
}
- rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
+ rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL;
+ parser->user.chart_rrdlabels_linked_temporarily = NULL;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
+static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
char *id = get_word(words, num_words, 1);
char *start_time_str = get_word(words, num_words, 2);
char *end_time_str = get_word(words, num_words, 3);
char *child_now_str = get_word(words, num_words, 4);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDSET *st;
if (likely(!id || !*id))
- st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
else
st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(start_time_str && end_time_str) {
time_t start_time = (time_t) str2ull_encoded(start_time_str);
@@ -1216,36 +1199,37 @@ PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
st->counter_done++;
// these are only needed for db mode RAM, SAVE, MAP, ALLOC
- st->current_entry++;
- if(st->current_entry >= st->entries)
- st->current_entry -= st->entries;
+ st->db.current_entry++;
+ if(st->db.current_entry >= st->db.entries)
+ st->db.current_entry -= st->db.entries;
- ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
- ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time;
- ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true;
+ parser->user.replay.start_time = start_time;
+ parser->user.replay.end_time = end_time;
+ parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+ parser->user.replay.wall_clock_time = wall_clock_time;
+ parser->user.replay.rset_enabled = true;
return PARSER_RC_OK;
}
- error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
- " from %ld to %ld, but timestamps are invalid "
- "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
- rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
- wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
+ netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
+ " from %ld to %ld, but timestamps are invalid "
+ "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+ wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock",
+ tolerance);
}
// the child sends an RBEGIN without any parameters initially
// setting rset_enabled to false, means the RSET should not store any metrics
// to store metrics, the RBEGIN needs to have timestamps
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
+ parser->user.replay.wall_clock_time = 0;
+ parser->user.replay.rset_enabled = false;
return PARSER_RC_OK;
}
@@ -1276,20 +1260,18 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str
return flags;
}
-PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
char *dimension = get_word(words, num_words, 1);
char *value_str = get_word(words, num_words, 2);
char *flags_str = get_word(words, num_words, 3);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- PARSER_USER_OBJECT *u = user;
- if(!u->replay.rset_enabled) {
+ if(!parser->user.replay.rset_enabled) {
error_limit_static_thread_var(erl, 1, 0);
error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
@@ -1299,18 +1281,18 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
}
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if (unlikely(!u->replay.start_time || !u->replay.end_time)) {
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
+ if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
rrdhost_hostname(host),
rrdset_id(st),
dimension,
PLUGINSD_KEYWORD_REPLAY_SET,
- u->replay.start_time,
- u->replay.end_time,
+ parser->user.replay.start_time,
+ parser->user.replay.end_time,
PLUGINSD_KEYWORD_REPLAY_BEGIN);
- return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
}
if (unlikely(!value_str || !*value_str))
@@ -1331,10 +1313,10 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
flags = SN_EMPTY_SLOT;
}
- rrddim_store_metric(rd, u->replay.end_time_ut, value, flags);
- rd->last_collected_time.tv_sec = u->replay.end_time;
- rd->last_collected_time.tv_usec = 0;
- rd->collections_counter++;
+ rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
+ rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.counter++;
}
else {
error_limit_static_global_var(erl, 1, 0);
@@ -1346,9 +1328,8 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user)
-{
- if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
+static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) {
+ if(parser->user.replay.rset_enabled == false)
return PARSER_RC_OK;
char *dimension = get_word(words, num_words, 1);
@@ -1357,42 +1338,41 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
char *last_calculated_value_str = get_word(words, num_words, 4);
char *last_stored_value_str = get_word(words, num_words, 5);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
+ usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
if(last_collected_ut > dim_last_collected_ut) {
- rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
- rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
+ rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
- rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
- rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
- rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
+ rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
+ rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
+ rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user)
-{
- if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
+static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) {
+ if(parser->user.replay.rset_enabled == false)
return PARSER_RC_OK;
char *last_collected_ut_str = get_word(words, num_words, 1);
char *last_updated_ut_str = get_word(words, num_words, 2);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
@@ -1414,10 +1394,9 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
if (num_words < 7) { // accepts 7, but the 7th is optional
- error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
+ netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
return PARSER_RC_ERROR;
}
@@ -1441,13 +1420,11 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
child_world_time_txt) : now_realtime_sec();
- PARSER_USER_OBJECT *user_object = user;
-
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
@@ -1460,12 +1437,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
);
#endif
- ((PARSER_USER_OBJECT *) user)->data_collections_count++;
+ parser->user.data_collections_count++;
- if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
+ if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
time_t now = now_realtime_sec();
time_t started = st->rrdhost->receiver->replication_first_time_t;
- time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+ time_t current = parser->user.replay.end_time;
if(started && current > started) {
host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
@@ -1474,12 +1451,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
}
}
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
+ parser->user.replay.wall_clock_time = 0;
+ parser->user.replay.rset_enabled = false;
st->counter++;
st->counter_done++;
@@ -1509,7 +1486,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
rrdhost_hostname(host), rrdset_id(st));
#endif
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END);
host->rrdpush_receiver_replication_percent = 100.0;
worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
@@ -1517,17 +1494,17 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END);
rrdcontext_updated_retention_rrdset(st);
- bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
+ bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
first_entry_child, last_entry_child, child_world_time,
first_entry_requested, last_entry_requested);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
+static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
char *id = get_word(words, num_words, 1);
@@ -1536,17 +1513,17 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
char *wall_clock_time_str = get_word(words, num_words, 4);
if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
timing_step(TIMING_STEP_BEGIN2_PREPARE);
RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN_V2);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN_V2);
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED)))
rrdset_isnot_obsolete(st);
@@ -1573,32 +1550,31 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// prepare our state
- pluginsd_lock_rrdset_data_collection(user);
+ pluginsd_lock_rrdset_data_collection(parser);
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- u->v2.update_every = update_every;
- u->v2.end_time = end_time;
- u->v2.wall_clock_time = wall_clock_time;
- u->v2.ml_locked = ml_chart_update_begin(st);
+ parser->user.v2.update_every = update_every;
+ parser->user.v2.end_time = end_time;
+ parser->user.v2.wall_clock_time = wall_clock_time;
+ parser->user.v2.ml_locked = ml_chart_update_begin(st);
timing_step(TIMING_STEP_BEGIN2_ML);
// ------------------------------------------------------------------------
// propagate it forward in v2
- if(!u->v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
- u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time);
+ if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
+ parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
- if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) {
+ if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
// check if receiver and sender have the same number parsing capabilities
- bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
- NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
- BUFFER *wb = u->v2.stream_buffer.wb;
+ BUFFER *wb = parser->user.v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
- if(unlikely(u->v2.stream_buffer.begin_v2_added))
+ if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
@@ -1626,8 +1602,8 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
buffer_fast_strcat(wb, "\n", 1);
- u->v2.stream_buffer.last_point_end_time_s = end_time;
- u->v2.stream_buffer.begin_v2_added = true;
+ parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
+ parser->user.v2.stream_buffer.begin_v2_added = true;
}
timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
@@ -1643,16 +1619,16 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
st->counter_done++;
// these are only needed for db mode RAM, SAVE, MAP, ALLOC
- st->current_entry++;
- if(st->current_entry >= st->entries)
- st->current_entry -= st->entries;
+ st->db.current_entry++;
+ if(st->db.current_entry >= st->db.entries)
+ st->db.current_entry -= st->db.entries;
timing_step(TIMING_STEP_BEGIN2_STORE);
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
+static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
char *dimension = get_word(words, num_words, 1);
@@ -1661,20 +1637,18 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
char *flags_str = get_word(words, num_words, 4);
if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
-
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
timing_step(TIMING_STEP_SET2_PREPARE);
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
rrddim_isnot_obsolete(st, rd);
@@ -1703,11 +1677,11 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
value = NAN;
flags = SN_EMPTY_SLOT;
- if(u->v2.ml_locked)
- ml_dimension_is_anomalous(rd, u->v2.end_time, 0, false);
+ if(parser->user.v2.ml_locked)
+ ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
}
- else if(u->v2.ml_locked) {
- if (ml_dimension_is_anomalous(rd, u->v2.end_time, value, true)) {
+ else if(parser->user.v2.ml_locked) {
+ if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
// clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
}
@@ -1720,13 +1694,13 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// propagate it forward in v2
- if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) {
+ if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
// check if receiver and sender have the same number parsing capabilities
- bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
- NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
- NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
- BUFFER *wb = u->v2.stream_buffer.wb;
+ BUFFER *wb = parser->user.v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
@@ -1750,51 +1724,50 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// store it
- rrddim_store_metric(rd, u->v2.end_time * USEC_PER_SEC, value, flags);
- rd->last_collected_time.tv_sec = u->v2.end_time;
- rd->last_collected_time.tv_usec = 0;
- rd->last_collected_value = collected_value;
- rd->last_stored_value = value;
- rd->last_calculated_value = value;
- rd->collections_counter++;
- rd->updated = true;
+ rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
+ rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.last_collected_value = collected_value;
+ rd->collector.last_stored_value = value;
+ rd->collector.last_calculated_value = value;
+ rd->collector.counter++;
+ rrddim_set_updated(rd);
timing_step(TIMING_STEP_SET2_STORE);
return PARSER_RC_OK;
}
-void pluginsd_cleanup_v2(void *user) {
+void pluginsd_cleanup_v2(PARSER *parser) {
// this is called when the thread is stopped while processing
- pluginsd_set_chart_from_parent(user, NULL, "THREAD CLEANUP");
+ pluginsd_set_chart_from_parent(parser, NULL, "THREAD CLEANUP");
}
-PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
+static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
timing_init();
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- u->data_collections_count++;
+ parser->user.data_collections_count++;
timing_step(TIMING_STEP_END2_PREPARE);
// ------------------------------------------------------------------------
// propagate the whole chart update in v1
- if(unlikely(!u->v2.stream_buffer.v2 && !u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb))
- rrdset_push_metrics_v1(&u->v2.stream_buffer, st);
+ if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
+ rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
timing_step(TIMING_STEP_END2_PUSH_V1);
// ------------------------------------------------------------------------
// unblock data collection
- pluginsd_unlock_previous_chart(user, PLUGINSD_KEYWORD_END_V2, false);
+ pluginsd_unlock_previous_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
rrdcontext_collected_rrdset(st);
store_metric_collection_completed();
@@ -1803,7 +1776,7 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
// ------------------------------------------------------------------------
// propagate it forward
- rrdset_push_metrics_finished(&u->v2.stream_buffer, st);
+ rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
timing_step(TIMING_STEP_END2_PROPAGATE);
@@ -1812,16 +1785,16 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- rd->calculated_value = 0;
- rd->collected_value = 0;
- rd->updated = false;
- }
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
rrddim_foreach_done(rd);
// ------------------------------------------------------------------------
// reset state
- u->v2 = (struct parser_user_object_v2){ 0 };
+ parser->user.v2 = (struct parser_user_object_v2){ 0 };
timing_step(TIMING_STEP_END2_STORE);
timing_report();
@@ -1829,19 +1802,126 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
return PARSER_RC_OK;
}
+static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ netdata_log_info("PLUGINSD: plugin called EXIT.");
+ return PARSER_RC_STOP;
+}
+
+static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
+{
+ const char *host_uuid_str = get_word(words, num_words, 1);
+ const char *claim_id_str = get_word(words, num_words, 2);
+
+ if (!host_uuid_str || !claim_id_str) {
+ netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
+ host_uuid_str ? host_uuid_str : "[unset]",
+ claim_id_str ? claim_id_str : "[unset]");
+ return PARSER_RC_ERROR;
+ }
+
+ uuid_t uuid;
+ RRDHOST *host = parser->user.host;
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(host_uuid_str, uuid)) {
+ netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
+ netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(host_uuid_str, host->machine_guid) != 0) {
+ netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewhere else
+ }
+
+ rrdhost_aclk_state_lock(host);
+
+ if (host->aclk_state.claimed_id)
+ freez(host->aclk_state.claimed_id);
+
+ host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
+
+ rrdhost_aclk_state_unlock(host);
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
+
+ rrdpush_send_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+
+static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(reader->read_buffer[reader->read_len] != '\0')
+ fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
+#endif
+
+ ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
+ if(unlikely(bytes_read <= 0))
+ return false;
+
+ reader->read_len += bytes_read;
+ reader->read_buffer[reader->read_len] = '\0';
+
+ return true;
+}
+
+static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
+ errno = 0;
+ struct pollfd fds[1];
+
+ fds[0].fd = fd;
+ fds[0].events = POLLIN;
+
+ int ret = poll(fds, 1, timeout_ms);
+
+ if (ret > 0) {
+ /* There is data to read */
+ if (fds[0].revents & POLLIN)
+ return buffered_reader_read(reader, fd);
+
+ else if(fds[0].revents & POLLERR) {
+ netdata_log_error("PARSER: read failed: POLLERR.");
+ return false;
+ }
+ else if(fds[0].revents & POLLHUP) {
+ netdata_log_error("PARSER: read failed: POLLHUP.");
+ return false;
+ }
+ else if(fds[0].revents & POLLNVAL) {
+ netdata_log_error("PARSER: read failed: POLLNVAL.");
+ return false;
+ }
+
+ netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
+ return false;
+ }
+ else if (ret == 0) {
+ netdata_log_error("PARSER: timeout while waiting for data.");
+ return false;
+ }
+
+ netdata_log_error("PARSER: poll() failed with code %d.", ret);
+ return false;
+}
+
void pluginsd_process_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
- pluginsd_cleanup_v2(parser->user);
- pluginsd_host_define_cleanup(parser->user);
+ pluginsd_cleanup_v2(parser);
+ pluginsd_host_define_cleanup(parser);
rrd_collector_finished();
parser_destroy(parser);
}
-// New plugins.d parser
-
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
{
int enabled = cd->unsafe.enabled;
@@ -1852,13 +1932,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
}
if (unlikely(fileno(fp_plugin_input) == -1)) {
- error("input file descriptor given is not a valid stream");
+ netdata_log_error("input file descriptor given is not a valid stream");
cd->serial_failures++;
return 0;
}
if (unlikely(fileno(fp_plugin_output) == -1)) {
- error("output file descriptor given is not a valid stream");
+ netdata_log_error("output file descriptor given is not a valid stream");
cd->serial_failures++;
return 0;
}
@@ -1866,38 +1946,42 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
clearerr(fp_plugin_input);
clearerr(fp_plugin_output);
- PARSER_USER_OBJECT user = {
- .enabled = cd->unsafe.enabled,
- .host = host,
- .cd = cd,
- .trust_durations = trust_durations
- };
+ PARSER *parser;
+ {
+ PARSER_USER_OBJECT user = {
+ .enabled = cd->unsafe.enabled,
+ .host = host,
+ .cd = cd,
+ .trust_durations = trust_durations
+ };
- // fp_plugin_output = our input; fp_plugin_input = our output
- PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1,
- PARSER_INPUT_SPLIT, NULL);
+ // fp_plugin_output = our input; fp_plugin_input = our output
+ parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
+ }
pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
rrd_collector_started();
+ size_t count = 0;
+
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- user.parser = parser;
- char buffer[PLUGINSD_LINE_MAX + 1];
-
- while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) {
- if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer)))
+ buffered_reader_init(&parser->reader);
+ char buffer[PLUGINSD_LINE_MAX + 2];
+ while(likely(service_running(SERVICE_COLLECTORS))) {
+ if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) {
+ if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
+ break;
+ }
+ else if(unlikely(parser_action(parser, buffer)))
break;
}
- // free parser with the pop function
- netdata_thread_cleanup_pop(1);
-
- cd->unsafe.enabled = user.enabled;
- size_t count = user.data_collections_count;
+ cd->unsafe.enabled = parser->user.enabled;
+ count = parser->user.data_collections_count;
if (likely(count)) {
cd->successful_collections += count;
@@ -1906,143 +1990,187 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
else
cd->serial_failures++;
+ // free parser with the pop function
+ netdata_thread_cleanup_pop(1);
+
return count;
}
-PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
-{
- info("PLUGINSD: plugin called EXIT.");
- return PARSER_RC_STOP;
+void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) {
+ parser_init_repertoire(parser, repertoire);
+
+ if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING))
+ inflight_functions_init(parser);
}
-static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) {
+PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd,
+ PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) {
+ PARSER *parser;
- if (types & PARSER_INIT_PLUGINSD) {
- add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
- add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
+ parser = callocz(1, sizeof(*parser));
+ if(user)
+ parser->user = *user;
+ parser->fd = fd;
+ parser->fp_input = fp_input;
+ parser->fp_output = fp_output;
+#ifdef ENABLE_HTTPS
+ parser->ssl_output = ssl;
+#endif
+ parser->flags = flags;
- add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define);
- add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end);
- add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels);
- add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host);
+ spinlock_init(&parser->writer.spinlock);
+ return parser;
+}
- add_func(parser, PLUGINSD_KEYWORD_EXIT, pluginsd_exit);
- }
+PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) {
+ switch(keyword->id) {
+ case 1:
+ return pluginsd_set_v2(words, num_words, parser);
- if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) {
- // plugins.d plugins and streaming
- add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
- add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
- add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
- add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
- add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
- add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit);
- add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel);
- add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function);
- add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin);
+ case 2:
+ return pluginsd_begin_v2(words, num_words, parser);
- add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
- add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set);
- add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
+ case 3:
+ return pluginsd_end_v2(words, num_words, parser);
- inflight_functions_init(parser);
- }
+ case 11:
+ return pluginsd_set(words, num_words, parser);
- if (types & PARSER_INIT_STREAMING) {
- add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end);
+ case 12:
+ return pluginsd_begin(words, num_words, parser);
- // replication
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end);
+ case 13:
+ return pluginsd_end(words, num_words, parser);
- // streaming metrics v2
- add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2);
- add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2);
- add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2);
- }
-}
+ case 21:
+ return pluginsd_replay_set(words, num_words, parser);
-void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) {
- pluginsd_keywords_init_internal(parser, types, parser_add_keyword);
-}
+ case 22:
+ return pluginsd_replay_begin(words, num_words, parser);
-struct pluginsd_user_unittest {
- size_t size;
- const char **hashtable;
- uint32_t (*hash)(const char *s);
- size_t collisions;
-};
+ case 23:
+ return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
+
+ case 24:
+ return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
+
+ case 25:
+ return pluginsd_replay_end(words, num_words, parser);
+
+ case 31:
+ return pluginsd_dimension(words, num_words, parser);
+
+ case 32:
+ return pluginsd_chart(words, num_words, parser);
+
+ case 33:
+ return pluginsd_chart_definition_end(words, num_words, parser);
+
+ case 34:
+ return pluginsd_clabel(words, num_words, parser);
+
+ case 35:
+ return pluginsd_clabel_commit(words, num_words, parser);
+
+ case 41:
+ return pluginsd_function(words, num_words, parser);
+
+ case 42:
+ return pluginsd_function_result_begin(words, num_words, parser);
+
+ case 51:
+ return pluginsd_label(words, num_words, parser);
+
+ case 52:
+ return pluginsd_overwrite(words, num_words, parser);
+
+ case 53:
+ return pluginsd_variable(words, num_words, parser);
+
+ case 61:
+ return streaming_claimed_id(words, num_words, parser);
+
+ case 71:
+ return pluginsd_host(words, num_words, parser);
+
+ case 72:
+ return pluginsd_host_define(words, num_words, parser);
+
+ case 73:
+ return pluginsd_host_define_end(words, num_words, parser);
-void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) {
- struct pluginsd_user_unittest *u = parser->user;
+ case 74:
+ return pluginsd_host_labels(words, num_words, parser);
- uint32_t hash = u->hash(keyword);
- uint32_t slot = hash % u->size;
+ case 97:
+ return pluginsd_flush(words, num_words, parser);
- if(u->hashtable[slot])
- u->collisions++;
+ case 98:
+ return pluginsd_disable(words, num_words, parser);
- u->hashtable[slot] = keyword;
+ case 99:
+ return pluginsd_exit(words, num_words, parser);
+
+ default:
+ fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
+ }
}
-static struct {
- const char *name;
- uint32_t (*hash)(const char *s);
- size_t slots_needed;
-} hashers[] = {
- { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, },
- { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, },
- { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, },
- { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, },
- { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, },
-
- // terminator
- { .name = NULL, NULL, .slots_needed = 0, },
-};
+#include "gperf-hashtable.h"
+
+void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
+ parser->repertoire = repertoire;
+
+ for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
+ if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
+ worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
+ }
+}
+
+void parser_destroy(PARSER *parser) {
+ if (unlikely(!parser))
+ return;
+
+ dictionary_destroy(parser->inflight.functions);
+ freez(parser);
+}
int pluginsd_parser_unittest(void) {
- PARSER *p;
- size_t slots_to_check = 1000;
- size_t i, h;
-
- // check for hashtable collisions
- for(h = 0; hashers[h].name ;h++) {
- hashers[h].slots_needed = slots_to_check * 1000000;
-
- for (i = 10; i < slots_to_check; i++) {
- struct pluginsd_user_unittest user = {
- .hash = hashers[h].hash,
- .size = i,
- .hashtable = callocz(i, sizeof(const char *)),
- .collisions = 0,
- };
-
- p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
- pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING,
- pluginsd_keyword_collision_check);
- parser_destroy(p);
-
- freez(user.hashtable);
-
- if (!user.collisions) {
- hashers[h].slots_needed = i;
- break;
- }
+ PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
+ pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
+
+ char *lines[] = {
+ "BEGIN2 abcdefghijklmnopqr 123",
+ "SET2 abcdefg 0x12345678 0 0",
+ "SET2 hijklmnoqr 0x12345678 0 0",
+ "SET2 stuvwxyz 0x12345678 0 0",
+ "END2",
+ NULL,
+ };
+
+ char *words[PLUGINSD_MAX_WORDS];
+ size_t iterations = 1000000;
+ size_t count = 0;
+ char input[PLUGINSD_LINE_MAX + 1];
+
+ usec_t started = now_realtime_usec();
+ while(--iterations) {
+ for(size_t line = 0; lines[line] ;line++) {
+ strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
+ size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(words, num_words, 0);
+ PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
+ if(unlikely(!keyword))
+ fatal("Cannot parse the line '%s'", lines[line]);
+ count++;
}
}
+ usec_t ended = now_realtime_usec();
- for(h = 0; hashers[h].name ;h++) {
- if(hashers[h].slots_needed > 1000)
- info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check);
- else
- info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed);
- }
+ netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
+ (double)(ended - started) / (double)USEC_PER_SEC,
+ (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
- p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
- pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
parser_destroy(p);
return 0;
}
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index 1fdc23a0e..5e1ea1242 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -5,13 +5,39 @@
#include "daemon/common.h"
+#define WORKER_PARSER_FIRST_JOB 3
+
+// this has to be in-sync with the same at receiver.c
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
+
+// PARSER return codes
+typedef enum __attribute__ ((__packed__)) parser_rc {
+ PARSER_RC_OK, // Callback was successful, go on
+ PARSER_RC_STOP, // Callback says STOP
+ PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
+} PARSER_RC;
+
+typedef enum __attribute__ ((__packed__)) parser_input_type {
+ PARSER_INPUT_SPLIT = (1 << 1),
+ PARSER_DEFER_UNTIL_KEYWORD = (1 << 2),
+} PARSER_INPUT_TYPE;
+
typedef enum __attribute__ ((__packed__)) {
PARSER_INIT_PLUGINSD = (1 << 1),
PARSER_INIT_STREAMING = (1 << 2),
-} PLUGINSD_KEYWORDS;
+} PARSER_REPERTOIRE;
+
+struct parser;
+typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser);
+
+typedef struct parser_keyword {
+ char *keyword;
+ size_t id;
+ PARSER_REPERTOIRE repertoire;
+ size_t worker_job_id;
+} PARSER_KEYWORD;
typedef struct parser_user_object {
- PARSER *parser;
RRDSET *st;
RRDHOST *host;
void *opaque;
@@ -54,9 +80,142 @@ typedef struct parser_user_object {
} v2;
} PARSER_USER_OBJECT;
-PARSER_RC pluginsd_function(char **words, size_t num_words, void *user);
-PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user);
+typedef struct parser {
+ uint8_t version; // Parser version
+ PARSER_REPERTOIRE repertoire;
+ uint32_t flags;
+ int fd; // Socket
+ size_t line;
+ FILE *fp_input; // Input source e.g. stream
+ FILE *fp_output; // Stream to send commands to plugin
+
+#ifdef ENABLE_HTTPS
+ NETDATA_SSL *ssl_output;
+#endif
+
+ PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
+
+ struct buffered_reader reader;
+
+ struct {
+ const char *end_keyword;
+ BUFFER *response;
+ void (*action)(struct parser *parser, void *action_data);
+ void *action_data;
+ } defer;
+
+ struct {
+ DICTIONARY *functions;
+ usec_t smaller_timeout;
+ } inflight;
+
+ struct {
+ SPINLOCK spinlock;
+ } writer;
+
+} PARSER;
+
+PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
+void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire);
+void parser_destroy(PARSER *working_parser);
+void pluginsd_cleanup_v2(PARSER *parser);
void inflight_functions_init(PARSER *parser);
-void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types);
+void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire);
+PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words);
+
+static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) {
+ const char *s = src, *keyword_start;
+
+ while (unlikely(isspace_map[(uint8_t)*s])) s++;
+ keyword_start = s;
+
+ while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) {
+ *dst++ = *s++;
+ dst_size--;
+ }
+ *dst = '\0';
+ return dst_size == 0 ? 0 : (int) (s - keyword_start);
+}
+
+PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len);
+
+static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) {
+ PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command));
+ if(t && (t->repertoire & parser->repertoire))
+ return t;
+
+ return NULL;
+}
+
+static inline int parser_action(PARSER *parser, char *input) {
+ parser->line++;
+
+ if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
+ char command[PLUGINSD_LINE_MAX + 1];
+ bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, isspace_map_pluginsd);
+
+ if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
+ if(parser->defer.response) {
+ buffer_strcat(parser->defer.response, input);
+ if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) {
+ // more than 10MB of data
+ // a bad plugin that did not send the end_keyword
+ internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
+ return 1;
+ }
+ }
+ return 0;
+ }
+ else {
+ // call the action
+ parser->defer.action(parser, parser->defer.action_data);
+
+ // empty everything
+ parser->defer.action = NULL;
+ parser->defer.action_data = NULL;
+ parser->defer.end_keyword = NULL;
+ parser->defer.response = NULL;
+ parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD;
+ }
+ return 0;
+ }
+
+ char *words[PLUGINSD_MAX_WORDS];
+ size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(words, num_words, 0);
+
+ if(unlikely(!command))
+ return 0;
+
+ PARSER_RC rc;
+ PARSER_KEYWORD *t = parser_find_keyword(parser, command);
+ if(likely(t)) {
+ worker_is_busy(t->worker_job_id);
+ rc = parser_execute(parser, t, words, num_words);
+ // rc = (*t->func)(words, num_words, parser);
+ worker_is_idle();
+ }
+ else
+ rc = PARSER_RC_ERROR;
+
+ if(rc == PARSER_RC_ERROR) {
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
+ for(size_t i = 0; i < num_words ;i++) {
+ if(i) buffer_fast_strcat(wb, " ", 1);
+
+ buffer_fast_strcat(wb, "\"", 1);
+ const char *s = get_word(words, num_words, i);
+ buffer_strcat(wb, s?s:"");
+ buffer_fast_strcat(wb, "\"", 1);
+ }
+
+ netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
+ command, parser->line, buffer_tostring(wb));
+
+ buffer_free(wb);
+ }
+
+ return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
+}
#endif //NETDATA_PLUGINSD_PARSER_H