From cd4377fab21e0f500bef7f06543fa848a039c1e0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 20 Jul 2023 06:50:01 +0200 Subject: Merging upstream version 1.41.0. Signed-off-by: Daniel Baumann --- collectors/plugins.d/Makefile.am | 1 + collectors/plugins.d/gperf-config.txt | 52 ++ collectors/plugins.d/gperf-hashtable.h | 163 +++++ collectors/plugins.d/local_listeners.c | 366 ++++++++++ collectors/plugins.d/plugins_d.c | 66 +- collectors/plugins.d/pluginsd_parser.c | 1220 ++++++++++++++++++-------------- collectors/plugins.d/pluginsd_parser.h | 169 ++++- 7 files changed, 1453 insertions(+), 584 deletions(-) create mode 100644 collectors/plugins.d/gperf-config.txt create mode 100644 collectors/plugins.d/gperf-hashtable.h create mode 100644 collectors/plugins.d/local_listeners.c (limited to 'collectors/plugins.d') diff --git a/collectors/plugins.d/Makefile.am b/collectors/plugins.d/Makefile.am index 59250a997..67fed309d 100644 --- a/collectors/plugins.d/Makefile.am +++ b/collectors/plugins.d/Makefile.am @@ -7,5 +7,6 @@ SUBDIRS = \ $(NULL) dist_noinst_DATA = \ + gperf-config.txt \ README.md \ $(NULL) diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt new file mode 100644 index 000000000..43be129e5 --- /dev/null +++ b/collectors/plugins.d/gperf-config.txt @@ -0,0 +1,52 @@ +%struct-type +%omit-struct-type +%define hash-function-name gperf_keyword_hash_function +%define lookup-function-name gperf_lookup_keyword +%define word-array-name gperf_keywords +%define constants-prefix GPERF_PARSER_ +%define slot-name keyword +%global-table +%null-strings +PARSER_KEYWORD; +%% +# +# Plugins Only Keywords +# +FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1 +DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2 +EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3 +HOST, 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4 +HOST_DEFINE, 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5 +HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6 +HOST_LABEL, 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7 +# +# Common keywords +# +BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8 +CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9 +CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10 +CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11 +DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12 +END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13 +FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14 +FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15 +LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16 +OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17 +SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18 +VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19 +# +# Streaming only keywords +# +CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 +BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 +SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 +END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 +# +# Streaming Replication keywords +# +CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 +RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 +RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26 +REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 +RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 +RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h new file mode 100644 index 000000000..b9e58975e --- /dev/null +++ b/collectors/plugins.d/gperf-hashtable.h @@ -0,0 +1,163 @@ +/* ANSI-C code produced by gperf version 3.1 */ +/* Command-line: gperf --multiple-iterations=1000 --output-file=gperf-hashtable.h gperf-config.txt */ +/* Computed positions: -k'1-2' */ + +#if !((' ' == 32) && ('!' == 33) && ('"' == 34) && ('#' == 35) \ + && ('%' == 37) && ('&' == 38) && ('\'' == 39) && ('(' == 40) \ + && (')' == 41) && ('*' == 42) && ('+' == 43) && (',' == 44) \ + && ('-' == 45) && ('.' == 46) && ('/' == 47) && ('0' == 48) \ + && ('1' == 49) && ('2' == 50) && ('3' == 51) && ('4' == 52) \ + && ('5' == 53) && ('6' == 54) && ('7' == 55) && ('8' == 56) \ + && ('9' == 57) && (':' == 58) && (';' == 59) && ('<' == 60) \ + && ('=' == 61) && ('>' == 62) && ('?' == 63) && ('A' == 65) \ + && ('B' == 66) && ('C' == 67) && ('D' == 68) && ('E' == 69) \ + && ('F' == 70) && ('G' == 71) && ('H' == 72) && ('I' == 73) \ + && ('J' == 74) && ('K' == 75) && ('L' == 76) && ('M' == 77) \ + && ('N' == 78) && ('O' == 79) && ('P' == 80) && ('Q' == 81) \ + && ('R' == 82) && ('S' == 83) && ('T' == 84) && ('U' == 85) \ + && ('V' == 86) && ('W' == 87) && ('X' == 88) && ('Y' == 89) \ + && ('Z' == 90) && ('[' == 91) && ('\\' == 92) && (']' == 93) \ + && ('^' == 94) && ('_' == 95) && ('a' == 97) && ('b' == 98) \ + && ('c' == 99) && ('d' == 100) && ('e' == 101) && ('f' == 102) \ + && ('g' == 103) && ('h' == 104) && ('i' == 105) && ('j' == 106) \ + && ('k' == 107) && ('l' == 108) && ('m' == 109) && ('n' == 110) \ + && ('o' == 111) && ('p' == 112) && ('q' == 113) && ('r' == 114) \ + && ('s' == 115) && ('t' == 116) && ('u' == 117) && ('v' == 118) \ + && ('w' == 119) && ('x' == 120) && ('y' == 121) && ('z' == 122) \ + && ('{' == 123) && ('|' == 124) && ('}' == 125) && ('~' == 126)) +/* The character set is not based on ISO-646. */ +#error "gperf generated tables don't work with this execution character set. Please report a bug to ." +#endif + + +#define GPERF_PARSER_TOTAL_KEYWORDS 29 +#define GPERF_PARSER_MIN_WORD_LENGTH 3 +#define GPERF_PARSER_MAX_WORD_LENGTH 21 +#define GPERF_PARSER_MIN_HASH_VALUE 4 +#define GPERF_PARSER_MAX_HASH_VALUE 36 +/* maximum key range = 33, duplicates = 0 */ + +#ifdef __GNUC__ +__inline +#else +#ifdef __cplusplus +inline +#endif +#endif +static unsigned int +gperf_keyword_hash_function (register const char *str, register size_t len) +{ + static unsigned char asso_values[] = + { + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 15, 10, 1, 1, 9, + 4, 37, 0, 20, 37, 37, 9, 37, 14, 0, + 37, 37, 1, 0, 37, 7, 13, 37, 18, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37 + }; + return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]]; +} + +static PARSER_KEYWORD gperf_keywords[] = + { + {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, +#line 18 "gperf-config.txt" + {"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4}, +#line 51 "gperf-config.txt" + {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, +#line 26 "gperf-config.txt" + {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9}, + {(char*)0}, +#line 52 "gperf-config.txt" + {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, +#line 49 "gperf-config.txt" + {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26}, +#line 21 "gperf-config.txt" + {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7}, +#line 19 "gperf-config.txt" + {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5}, +#line 35 "gperf-config.txt" + {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, +#line 42 "gperf-config.txt" + {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, +#line 50 "gperf-config.txt" + {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, +#line 20 "gperf-config.txt" + {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6}, +#line 27 "gperf-config.txt" + {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10}, +#line 48 "gperf-config.txt" + {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, +#line 15 "gperf-config.txt" + {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, +#line 31 "gperf-config.txt" + {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14}, +#line 40 "gperf-config.txt" + {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, +#line 47 "gperf-config.txt" + {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, +#line 34 "gperf-config.txt" + {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17}, +#line 28 "gperf-config.txt" + {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11}, +#line 25 "gperf-config.txt" + {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, +#line 41 "gperf-config.txt" + {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, +#line 30 "gperf-config.txt" + {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, +#line 43 "gperf-config.txt" + {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, +#line 16 "gperf-config.txt" + {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, +#line 33 "gperf-config.txt" + {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16}, +#line 29 "gperf-config.txt" + {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12}, +#line 17 "gperf-config.txt" + {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}, +#line 32 "gperf-config.txt" + {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15}, + {(char*)0}, {(char*)0}, {(char*)0}, +#line 36 "gperf-config.txt" + {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19} + }; + +PARSER_KEYWORD * +gperf_lookup_keyword (register const char *str, register size_t len) +{ + if (len <= GPERF_PARSER_MAX_WORD_LENGTH && len >= GPERF_PARSER_MIN_WORD_LENGTH) + { + register unsigned int key = gperf_keyword_hash_function (str, len); + + if (key <= GPERF_PARSER_MAX_HASH_VALUE) + { + register const char *s = gperf_keywords[key].keyword; + + if (s && *str == *s && !strcmp (str + 1, s + 1)) + return &gperf_keywords[key]; + } + } + return 0; +} diff --git a/collectors/plugins.d/local_listeners.c b/collectors/plugins.d/local_listeners.c new file mode 100644 index 000000000..a39de7974 --- /dev/null +++ b/collectors/plugins.d/local_listeners.c @@ -0,0 +1,366 @@ +#include "libnetdata/libnetdata.h" +#include "libnetdata/required_dummies.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef enum { + PROC_NET_PROTOCOL_TCP, + PROC_NET_PROTOCOL_TCP6, + PROC_NET_PROTOCOL_UDP, + PROC_NET_PROTOCOL_UDP6, +} PROC_NET_PROTOCOLS; + +#define MAX_ERROR_LOGS 10 + +static size_t pid_fds_processed = 0; +static size_t pid_fds_failed = 0; +static size_t errors_encountered = 0; + +static inline const char *protocol_name(PROC_NET_PROTOCOLS protocol) { + switch(protocol) { + default: + case PROC_NET_PROTOCOL_TCP: + return "TCP"; + + case PROC_NET_PROTOCOL_UDP: + return "UDP"; + + case PROC_NET_PROTOCOL_TCP6: + return "TCP6"; + + case PROC_NET_PROTOCOL_UDP6: + return "UDP6"; + } +} + +static inline int read_cmdline(pid_t pid, char* buffer, size_t bufferSize) { + char path[FILENAME_MAX + 1]; + snprintfz(path, FILENAME_MAX, "%s/proc/%d/cmdline", netdata_configured_host_prefix, pid); + + FILE* file = fopen(path, "r"); + if (!file) { + if(++errors_encountered < MAX_ERROR_LOGS) + collector_error("LOCAL-LISTENERS: error opening file: %s\n", path); + + return -1; + } + + size_t bytesRead = fread(buffer, 1, bufferSize - 1, file); + buffer[bytesRead] = '\0'; // Ensure null-terminated + + // Replace null characters in cmdline with spaces + for (size_t i = 0; i < bytesRead; i++) { + if (buffer[i] == '\0') { + buffer[i] = ' '; + } + } + + fclose(file); + return 0; +} + +static inline void fix_cmdline(char* str) { + if (str == NULL) + return; + + char *s = str; + + do { + if(*s == '|' || iscntrl(*s)) + *s = '_'; + + } while(*++s); + + + while(s > str && *(s-1) == ' ') + *--s = '\0'; +} + +// ---------------------------------------------------------------------------- + +#define HASH_TABLE_SIZE 100000 + +typedef struct Node { + unsigned int inode; // key + + // values + unsigned int port; + char local_address[INET6_ADDRSTRLEN]; + PROC_NET_PROTOCOLS protocol; + bool processed; + + // linking + struct Node *prev, *next; +} Node; + +typedef struct HashTable { + Node *table[HASH_TABLE_SIZE]; +} HashTable; + +static HashTable *hashTable_key_inode_port_value = NULL; + +static inline void generate_output(const char *protocol, const char *address, unsigned int port, const char *cmdline) { + printf("%s|%s|%u|%s\n", protocol, address, port, cmdline); +} + +HashTable* createHashTable() { + HashTable *hashTable = (HashTable*)mallocz(sizeof(HashTable)); + memset(hashTable, 0, sizeof(HashTable)); + return hashTable; +} + +static inline unsigned int hashFunction(unsigned int inode) { + return inode % HASH_TABLE_SIZE; +} + +static inline void insertHashTable(HashTable *hashTable, unsigned int inode, unsigned int port, PROC_NET_PROTOCOLS protocol, char *local_address) { + unsigned int index = hashFunction(inode); + Node *newNode = (Node*)mallocz(sizeof(Node)); + newNode->inode = inode; + newNode->port = port; + newNode->protocol = protocol; + strncpyz(newNode->local_address, local_address, INET6_ADDRSTRLEN - 1); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(hashTable->table[index], newNode, prev, next); +} + +static inline bool lookupHashTable_and_execute(HashTable *hashTable, unsigned int inode, pid_t pid) { + unsigned int index = hashFunction(inode); + for(Node *node = hashTable->table[index], *next = NULL ; node ; node = next) { + next = node->next; + + if(node->inode == inode && node->port) { + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(hashTable->table[index], node, prev, next); + char cmdline[8192] = ""; + read_cmdline(pid, cmdline, sizeof(cmdline)); + fix_cmdline(cmdline); + generate_output(protocol_name(node->protocol), node->local_address, node->port, cmdline); + freez(node); + return true; + } + } + + return false; +} + +void freeHashTable(HashTable *hashTable) { + for (unsigned int i = 0; i < HASH_TABLE_SIZE; i++) { + while(hashTable->table[i]) { + Node *tmp = hashTable->table[i]; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(hashTable->table[i], tmp, prev, next); + generate_output(protocol_name(tmp->protocol), tmp->local_address, tmp->port, ""); + freez(tmp); + } + } + freez(hashTable); +} + +// ---------------------------------------------------------------------------- + +static inline void found_this_socket_inode(pid_t pid, unsigned int inode) { + lookupHashTable_and_execute(hashTable_key_inode_port_value, inode, pid); +} + +bool find_all_sockets_in_proc(const char *proc_filename) { + DIR *proc_dir, *fd_dir; + struct dirent *proc_entry, *fd_entry; + char path_buffer[FILENAME_MAX + 1]; + + proc_dir = opendir(proc_filename); + if (proc_dir == NULL) { + if(++errors_encountered < MAX_ERROR_LOGS) + collector_error("LOCAL-LISTENERS: cannot opendir() '%s'", proc_filename); + + pid_fds_failed++; + return false; + } + + while ((proc_entry = readdir(proc_dir)) != NULL) { + // Check if directory entry is a PID by seeing if the name is made up of digits only + int is_pid = 1; + for (char *c = proc_entry->d_name; *c != '\0'; c++) { + if (*c < '0' || *c > '9') { + is_pid = 0; + break; + } + } + + if (!is_pid) + continue; + + // Build the path to the fd directory of the process + snprintfz(path_buffer, FILENAME_MAX, "%s/%s/fd/", proc_filename, proc_entry->d_name); + + fd_dir = opendir(path_buffer); + if (fd_dir == NULL) { + if(++errors_encountered < MAX_ERROR_LOGS) + collector_error("LOCAL-LISTENERS: cannot opendir() '%s'", path_buffer); + + pid_fds_failed++; + continue; + } + + while ((fd_entry = readdir(fd_dir)) != NULL) { + if(!strcmp(fd_entry->d_name, ".") || !strcmp(fd_entry->d_name, "..")) + continue; + + char link_path[FILENAME_MAX + 1]; + char link_target[FILENAME_MAX + 1]; + int inode; + + // Build the path to the file descriptor link + snprintfz(link_path, FILENAME_MAX, "%s/%s", path_buffer, fd_entry->d_name); + + ssize_t len = readlink(link_path, link_target, sizeof(link_target) - 1); + if (len == -1) { + if(++errors_encountered < MAX_ERROR_LOGS) + collector_error("LOCAL-LISTENERS: cannot read link '%s'", link_path); + + pid_fds_failed++; + continue; + } + link_target[len] = '\0'; + + pid_fds_processed++; + + // If the link target indicates a socket, print its inode number + if (sscanf(link_target, "socket:[%d]", &inode) == 1) + found_this_socket_inode((pid_t)strtoul(proc_entry->d_name, NULL, 10), inode); + } + + closedir(fd_dir); + } + + closedir(proc_dir); + return true; +} + +// ---------------------------------------------------------------------------- + +static inline void add_port_and_inode(PROC_NET_PROTOCOLS protocol, unsigned int port, unsigned int inode, char *local_address) { + insertHashTable(hashTable_key_inode_port_value, inode, port, protocol, local_address); +} + +static inline void print_ipv6_address(const char *ipv6_str, char *dst) { + unsigned k; + char buf[9]; + struct sockaddr_in6 sa; + + // Initialize sockaddr_in6 + memset(&sa, 0, sizeof(struct sockaddr_in6)); + sa.sin6_family = AF_INET6; + sa.sin6_port = htons(0); // replace 0 with your port number + + // Convert hex string to byte array + for (k = 0; k < 4; ++k) + { + memset(buf, 0, 9); + memcpy(buf, ipv6_str + (k * 8), 8); + sa.sin6_addr.s6_addr32[k] = strtoul(buf, NULL, 16); + } + + // Convert to human-readable format + if (inet_ntop(AF_INET6, &(sa.sin6_addr), dst, INET6_ADDRSTRLEN) == NULL) + *dst = '\0'; +} + +static inline void print_ipv4_address(uint32_t address, char *dst) { + uint8_t octets[4]; + octets[0] = address & 0xFF; + octets[1] = (address >> 8) & 0xFF; + octets[2] = (address >> 16) & 0xFF; + octets[3] = (address >> 24) & 0xFF; + sprintf(dst, "%u.%u.%u.%u", octets[0], octets[1], octets[2], octets[3]); +} + +bool read_proc_net_x(const char *filename, PROC_NET_PROTOCOLS protocol) { + FILE *fp; + char *line = NULL; + size_t len = 0; + ssize_t read; + char address[INET6_ADDRSTRLEN]; + + ssize_t min_line_length = (protocol == PROC_NET_PROTOCOL_TCP || protocol == PROC_NET_PROTOCOL_UDP) ? 105 : 155; + + fp = fopen(filename, "r"); + if (fp == NULL) + return false; + + // Read line by line + while ((read = getline(&line, &len, fp)) != -1) { + if(read < min_line_length) continue; + + char local_address6[33], rem_address6[33]; + unsigned int local_address, local_port, state, rem_address, rem_port, inode; + + switch(protocol) { + case PROC_NET_PROTOCOL_TCP: + if(line[34] != '0' || line[35] != 'A') + continue; + // fall-through + + case PROC_NET_PROTOCOL_UDP: + if (sscanf(line, "%*d: %X:%X %X:%X %X %*X:%*X %*X:%*X %*X %*d %*d %u", + &local_address, &local_port, &rem_address, &rem_port, &state, &inode) != 6) + continue; + + print_ipv4_address(local_address, address); + break; + + case PROC_NET_PROTOCOL_TCP6: + if(line[82] != '0' || line[83] != 'A') + continue; + // fall-through + + case PROC_NET_PROTOCOL_UDP6: + if(sscanf(line, "%*d: %32[0-9A-Fa-f]:%X %32[0-9A-Fa-f]:%X %X %*X:%*X %*X:%*X %*X %*d %*d %u", + local_address6, &local_port, rem_address6, &rem_port, &state, &inode) != 6) + continue; + + print_ipv6_address(local_address6, address); + break; + } + + add_port_and_inode(protocol, local_port, inode, address); + } + + fclose(fp); + if (line) + free(line); + + return true; +} + +// ---------------------------------------------------------------------------- + +int main(int argc __maybe_unused, char **argv __maybe_unused) { + char path[FILENAME_MAX + 1]; + hashTable_key_inode_port_value = createHashTable(); + + netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); + if(!netdata_configured_host_prefix) netdata_configured_host_prefix = ""; + + snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_TCP); + + snprintfz(path, FILENAME_MAX, "%s/proc/net/udp", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_UDP); + + snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp6", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_TCP6); + + snprintfz(path, FILENAME_MAX, "%s/proc/net/udp6", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_UDP6); + + snprintfz(path, FILENAME_MAX, "%s/proc", netdata_configured_host_prefix); + find_all_sockets_in_proc(path); + + freeHashTable(hashTable_key_inode_port_value); + return 0; +} diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index da5226a5c..6a235b4e6 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -3,7 +3,7 @@ #include "plugins_d.h" #include "pluginsd_parser.h" -char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL }; +char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { [0] = PLUGINS_DIR, }; struct plugind *pluginsd_root = NULL; inline size_t pluginsd_initialize_plugin_directories() @@ -18,32 +18,32 @@ inline size_t pluginsd_initialize_plugin_directories() } // Parse it and store it to plugin directories - return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace); + return quoted_strings_splitter_config(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES); } static inline void plugin_set_disabled(struct plugind *cd) { - netdata_spinlock_lock(&cd->unsafe.spinlock); + spinlock_lock(&cd->unsafe.spinlock); cd->unsafe.enabled = false; - netdata_spinlock_unlock(&cd->unsafe.spinlock); + spinlock_unlock(&cd->unsafe.spinlock); } bool plugin_is_enabled(struct plugind *cd) { - netdata_spinlock_lock(&cd->unsafe.spinlock); + spinlock_lock(&cd->unsafe.spinlock); bool ret = cd->unsafe.enabled; - netdata_spinlock_unlock(&cd->unsafe.spinlock); + spinlock_unlock(&cd->unsafe.spinlock); return ret; } static inline void plugin_set_running(struct plugind *cd) { - netdata_spinlock_lock(&cd->unsafe.spinlock); + spinlock_lock(&cd->unsafe.spinlock); cd->unsafe.running = true; - netdata_spinlock_unlock(&cd->unsafe.spinlock); + spinlock_unlock(&cd->unsafe.spinlock); } static inline bool plugin_is_running(struct plugind *cd) { - netdata_spinlock_lock(&cd->unsafe.spinlock); + spinlock_lock(&cd->unsafe.spinlock); bool ret = cd->unsafe.running; - netdata_spinlock_unlock(&cd->unsafe.spinlock); + spinlock_unlock(&cd->unsafe.spinlock); return ret; } @@ -53,7 +53,7 @@ static void pluginsd_worker_thread_cleanup(void *arg) worker_unregister(); - netdata_spinlock_lock(&cd->unsafe.spinlock); + spinlock_lock(&cd->unsafe.spinlock); cd->unsafe.running = false; cd->unsafe.thread = 0; @@ -61,15 +61,15 @@ static void pluginsd_worker_thread_cleanup(void *arg) pid_t pid = cd->unsafe.pid; cd->unsafe.pid = 0; - netdata_spinlock_unlock(&cd->unsafe.spinlock); + spinlock_unlock(&cd->unsafe.spinlock); if (pid) { siginfo_t info; - info("PLUGINSD: 'host:%s', killing data collection child process with pid %d", + netdata_log_info("PLUGINSD: 'host:%s', killing data collection child process with pid %d", rrdhost_hostname(cd->host), pid); if (killpid(pid) != -1) { - info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...", + netdata_log_info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...", rrdhost_hostname(cd->host), pid); netdata_waitid(P_PID, (id_t)pid, &info, WEXITED); @@ -85,7 +85,7 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) { } if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) { - info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", + netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); @@ -94,7 +94,7 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) { } if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, " + netdata_log_error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, " "although it reports success (exits with 0)." "We have tried to collect something %zu times - unsuccessfully. Disabling it.", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures); @@ -105,21 +105,21 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) { static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) { if (worker_ret_code == -1) { - info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.", + netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); plugin_set_disabled(cd); return; } if (!cd->successful_collections) { - error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", + netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code); plugin_set_disabled(cd); return; } if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) { - error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", + netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); sleep((unsigned int)(cd->update_every * 10)); @@ -127,7 +127,7 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r } if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." + netdata_log_error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." "We tried to restart it %zu times, but it failed to generate data. Disabling it.", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures); @@ -153,16 +153,16 @@ static void *pluginsd_worker_thread(void *arg) { FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); if (unlikely(!fp_child_input || !fp_child_output)) { - error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd); + netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd); break; } - info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d", + netdata_log_info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0); - info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).", + netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).", rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count); killpid(cd->unsafe.pid); @@ -186,21 +186,21 @@ static void *pluginsd_worker_thread(void *arg) { static void pluginsd_main_cleanup(void *data) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - info("PLUGINSD: cleaning up..."); + netdata_log_info("PLUGINSD: cleaning up..."); struct plugind *cd; for (cd = pluginsd_root; cd; cd = cd->next) { - netdata_spinlock_lock(&cd->unsafe.spinlock); + spinlock_lock(&cd->unsafe.spinlock); if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) { - info("PLUGINSD: 'host:%s', stopping plugin thread: %s", + netdata_log_info("PLUGINSD: 'host:%s', stopping plugin thread: %s", rrdhost_hostname(cd->host), cd->id); netdata_thread_cancel(cd->unsafe.thread); } - netdata_spinlock_unlock(&cd->unsafe.spinlock); + spinlock_unlock(&cd->unsafe.spinlock); } - info("PLUGINSD: cleanup completed."); + netdata_log_info("PLUGINSD: cleanup completed."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; worker_unregister(); @@ -235,7 +235,7 @@ void *pluginsd_main(void *ptr) if (unlikely(!dir)) { if (directory_errors[idx] != errno) { directory_errors[idx] = errno; - error("cannot open plugins directory '%s'", directory_name); + netdata_log_error("cannot open plugins directory '%s'", directory_name); } continue; } @@ -245,7 +245,7 @@ void *pluginsd_main(void *ptr) if (unlikely(!service_running(SERVICE_COLLECTORS))) break; - debug(D_PLUGINSD, "examining file '%s'", file->d_name); + netdata_log_debug(D_PLUGINSD, "examining file '%s'", file->d_name); if (unlikely(strcmp(file->d_name, ".") == 0 || strcmp(file->d_name, "..") == 0)) continue; @@ -254,7 +254,7 @@ void *pluginsd_main(void *ptr) if (unlikely(len <= (int)PLUGINSD_FILE_SUFFIX_LEN)) continue; if (unlikely(strcmp(PLUGINSD_FILE_SUFFIX, &file->d_name[len - (int)PLUGINSD_FILE_SUFFIX_LEN]) != 0)) { - debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX); + netdata_log_debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX); continue; } @@ -263,7 +263,7 @@ void *pluginsd_main(void *ptr) int enabled = config_get_boolean(CONFIG_SECTION_PLUGINS, pluginname, automatic_run); if (unlikely(!enabled)) { - debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name); + netdata_log_debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name); continue; } @@ -274,7 +274,7 @@ void *pluginsd_main(void *ptr) break; if (likely(cd && plugin_is_running(cd))) { - debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename); + netdata_log_debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename); continue; } diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 097e5ea60..cda17710c 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,90 +4,103 @@ #define LOG_FUNCTIONS false -static int send_to_plugin(const char *txt, void *data) { +static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; if(!txt || !*txt) return 0; + errno = 0; + spinlock_lock(&parser->writer.spinlock); + ssize_t bytes = -1; + #ifdef ENABLE_HTTPS NETDATA_SSL *ssl = parser->ssl_output; if(ssl) { + if(SSL_connection(ssl)) - return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt)); + bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt)); - error("PLUGINSD: cannot send command (SSL)"); - return -1; + else + netdata_log_error("PLUGINSD: cannot send command (SSL)"); + + spinlock_unlock(&parser->writer.spinlock); + return bytes; } #endif if(parser->fp_output) { - int bytes = fprintf(parser->fp_output, "%s", txt); + + bytes = fprintf(parser->fp_output, "%s", txt); if(bytes <= 0) { - error("PLUGINSD: cannot send command (FILE)"); - return -2; + netdata_log_error("PLUGINSD: cannot send command (FILE)"); + bytes = -2; } - fflush(parser->fp_output); + else + fflush(parser->fp_output); + + spinlock_unlock(&parser->writer.spinlock); return bytes; } if(parser->fd != -1) { - size_t bytes = 0; - size_t total = strlen(txt); + bytes = 0; + ssize_t total = (ssize_t)strlen(txt); ssize_t sent; do { sent = write(parser->fd, &txt[bytes], total - bytes); if(sent <= 0) { - error("PLUGINSD: cannot send command (fd)"); + netdata_log_error("PLUGINSD: cannot send command (fd)"); + spinlock_unlock(&parser->writer.spinlock); return -3; } bytes += sent; } while(bytes < total); + spinlock_unlock(&parser->writer.spinlock); return (int)bytes; } - error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); + spinlock_unlock(&parser->writer.spinlock); + netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); return -4; } -static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) { - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; +static inline RRDHOST *pluginsd_require_host_from_parent(PARSER *parser, const char *cmd) { + RRDHOST *host = parser->user.host; if(unlikely(!host)) - error("PLUGINSD: command %s requires a host, but is not set.", cmd); + netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd); return host; } -static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) { - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; +static inline RRDSET *pluginsd_require_chart_from_parent(PARSER *parser, const char *cmd, const char *parent_cmd) { + RRDSET *st = parser->user.st; if(unlikely(!st)) - error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd); + netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd); return st; } -static inline RRDSET *pluginsd_get_chart_from_parent(void *user) { - return ((PARSER_USER_OBJECT *) user)->st; +static inline RRDSET *pluginsd_get_chart_from_parent(PARSER *parser) { + return parser->user.st; } -static inline void pluginsd_lock_rrdset_data_collection(void *user) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - if(u->st && !u->v2.locked_data_collection) { - netdata_spinlock_lock(&u->st->data_collection_lock); - u->v2.locked_data_collection = true; +static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) { + if(parser->user.st && !parser->user.v2.locked_data_collection) { + spinlock_lock(&parser->user.st->data_collection_lock); + parser->user.v2.locked_data_collection = true; } } -static inline bool pluginsd_unlock_rrdset_data_collection(void *user) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - if(u->st && u->v2.locked_data_collection) { - netdata_spinlock_unlock(&u->st->data_collection_lock); - u->v2.locked_data_collection = false; +static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) { + if(parser->user.st && parser->user.v2.locked_data_collection) { + spinlock_unlock(&parser->user.st->data_collection_lock); + parser->user.v2.locked_data_collection = false; return true; } @@ -108,29 +121,29 @@ void pluginsd_rrdset_cleanup(RRDSET *st) { st->pluginsd.pos = 0; } -static inline void pluginsd_unlock_previous_chart(void *user, const char *keyword, bool stale) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - - if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) { +static inline void pluginsd_unlock_previous_chart(PARSER *parser, const char *keyword, bool stale) { + if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) { if(stale) - error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked", - rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked", + rrdhost_hostname(parser->user.st->rrdhost), + rrdset_id(parser->user.st), + keyword); } - if(unlikely(u->v2.ml_locked)) { - ml_chart_update_end(u->st); - u->v2.ml_locked = false; + if(unlikely(parser->user.v2.ml_locked)) { + ml_chart_update_end(parser->user.st); + parser->user.v2.ml_locked = false; if(stale) - error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked", - rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked", + rrdhost_hostname(parser->user.st->rrdhost), + rrdset_id(parser->user.st), + keyword); } } -static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - - pluginsd_unlock_previous_chart(user, keyword, true); +static inline void pluginsd_set_chart_from_parent(PARSER *parser, RRDSET *st, const char *keyword) { + pluginsd_unlock_previous_chart(parser, keyword, true); if(st) { size_t dims = dictionary_entries(st->rrddim_root_index); @@ -145,13 +158,13 @@ static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const st->pluginsd.pos = 0; } - u->st = st; + parser->user.st = st; } static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { if (unlikely(!dimension || !*dimension)) { - error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", - rrdhost_hostname(host), rrdset_id(st), cmd); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", + rrdhost_hostname(host), rrdset_id(st), cmd); return NULL; } @@ -172,8 +185,8 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons rda = rrddim_find_and_acquire(st, dimension); if (unlikely(!rda)) { - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", - rrdhost_hostname(host), rrdset_id(st), dimension, cmd); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", + rrdhost_hostname(host), rrdset_id(st), dimension, cmd); return NULL; } @@ -186,21 +199,21 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { if (unlikely(!chart || !*chart)) { - error("PLUGINSD: 'host:%s' got a %s without a chart id.", - rrdhost_hostname(host), cmd); + netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.", + rrdhost_hostname(host), cmd); return NULL; } RRDSET *st = rrdset_find(host, chart); if (unlikely(!st)) - error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.", - rrdhost_hostname(host), chart, cmd); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.", + rrdhost_hostname(host), chart, cmd); return st; } -static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) { - ((PARSER_USER_OBJECT *) user)->enabled = 0; +static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) { + parser->user.enabled = 0; if(keyword && msg) { error_limit_static_global_var(erl, 1, 0); @@ -210,22 +223,21 @@ static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, return PARSER_RC_ERROR; } -PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) { char *dimension = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", + netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET"); if (value && *value) @@ -234,18 +246,17 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) { char *id = get_word(words, num_words, 1); char *microseconds_txt = get_word(words, num_words, 2); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN); usec_t microseconds = 0; if (microseconds_txt && *microseconds_txt) { @@ -270,7 +281,7 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) if (likely(st->counter_done)) { if (likely(microseconds)) { - if (((PARSER_USER_OBJECT *)user)->trust_durations) + if (parser->user.trust_durations) rrdset_next_usec_unfiltered(st, microseconds); else rrdset_next_usec(st, microseconds); @@ -281,22 +292,21 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) { UNUSED(words); UNUSED(num_words); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); + netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END); - ((PARSER_USER_OBJECT *) user)->data_collections_count++; + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_END); + parser->user.data_collections_count++; struct timeval now; now_realtime_timeval(&now); @@ -305,15 +315,13 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -static void pluginsd_host_define_cleanup(void *user) { - PARSER_USER_OBJECT *u = user; +static void pluginsd_host_define_cleanup(PARSER *parser) { + string_freez(parser->user.host_define.hostname); + dictionary_destroy(parser->user.host_define.rrdlabels); - string_freez(u->host_define.hostname); - dictionary_destroy(u->host_define.rrdlabels); - - u->host_define.hostname = NULL; - u->host_define.rrdlabels = NULL; - u->host_define.parsing_host = false; + parser->user.host_define.hostname = NULL; + parser->user.host_define.rrdlabels = NULL; + parser->user.host_define.parsing_host = false; } static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) { @@ -325,61 +333,56 @@ static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid return true; } -static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) { - PARSER_USER_OBJECT *u = user; - +static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) { char *guid = get_word(words, num_words, 1); char *hostname = get_word(words, num_words, 2); if(unlikely(!guid || !*guid || !hostname || !*hostname)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters"); - if(unlikely(u->host_define.parsing_host)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, + if(unlikely(parser->user.host_define.parsing_host)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?"); - if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?"); + if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?"); - u->host_define.hostname = string_strdupz(hostname); - u->host_define.rrdlabels = rrdlabels_create(); - u->host_define.parsing_host = true; + parser->user.host_define.hostname = string_strdupz(hostname); + parser->user.host_define.rrdlabels = rrdlabels_create(); + parser->user.host_define.parsing_host = true; return PARSER_RC_OK; } -static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) { - PARSER_USER_OBJECT *u = user; - +static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) { char *name = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); if(!name || !*name || !value) - return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters"); - if(!u->host_define.parsing_host || !dict) - return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); + if(!parser->user.host_define.parsing_host || !dict) + return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG); return PARSER_RC_OK; } -static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) { - PARSER_USER_OBJECT *u = user; - return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL); +static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) { + return pluginsd_host_dictionary(words, num_words, parser, + parser->user.host_define.rrdlabels, + PLUGINSD_KEYWORD_HOST_LABEL); } -static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - PARSER_USER_OBJECT *u = user; - - if(!u->host_define.parsing_host) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); +static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + if(!parser->user.host_define.parsing_host) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); RRDHOST *host = rrdhost_find_or_create( - string2str(u->host_define.hostname), - string2str(u->host_define.hostname), - u->host_define.machine_guid_str, + string2str(parser->user.host_define.hostname), + string2str(parser->user.host_define.hostname), + parser->user.host_define.machine_guid_str, "Netdata Virtual Host 1.0", netdata_configured_timezone, netdata_configured_abbrev_timezone, @@ -398,22 +401,24 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu default_rrdpush_enable_replication, default_rrdpush_seconds_to_replicate, default_rrdpush_replication_step, - rrdhost_labels_to_system_info(u->host_define.rrdlabels), + rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels), false ); + rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST); + if(host->rrdlabels) { - rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels); + rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels); } else { - host->rrdlabels = u->host_define.rrdlabels; - u->host_define.rrdlabels = NULL; + host->rrdlabels = parser->user.host_define.rrdlabels; + parser->user.host_define.rrdlabels = NULL; } - pluginsd_host_define_cleanup(user); + pluginsd_host_define_cleanup(parser); - u->host = host; - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END); + parser->user.host = host; + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END); rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); rrdcontext_host_child_connected(host); @@ -422,34 +427,31 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu return PARSER_RC_OK; } -static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) { - PARSER_USER_OBJECT *u = user; - +static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) { char *guid = get_word(words, num_words, 1); if(!guid || !*guid || strcmp(guid, "localhost") == 0) { - u->host = localhost; + parser->user.host = localhost; return PARSER_RC_OK; } uuid_t uuid; char uuid_str[UUID_STR_LEN]; if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?"); RRDHOST *host = rrdhost_find_by_guid(uuid_str); if(unlikely(!host)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?"); - u->host = host; + parser->user.host = host; return PARSER_RC_OK; } -PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) -{ - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); +static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) { + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); char *type = get_word(words, num_words, 1); char *name = get_word(words, num_words, 2); @@ -473,7 +475,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) // make sure we have the required variables if (unlikely((!type || !*type || !id || !*id))) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters"); // parse the name, and make sure it does not include 'type.' if (unlikely(name && *name)) { @@ -494,11 +496,11 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) if (likely(priority_s && *priority_s)) priority = str2i(priority_s); - int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every; + int update_every = parser->user.cd->update_every; if (likely(update_every_s && *update_every_s)) update_every = str2i(update_every_s); if (unlikely(!update_every)) - update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every; + update_every = parser->user.cd->update_every; RRDSET_TYPE chart_type = RRDSET_TYPE_LINE; if (unlikely(chart)) @@ -515,7 +517,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) if (unlikely(!units)) units = "unknown"; - debug( + netdata_log_debug( D_PLUGINSD, "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d", type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type), @@ -525,14 +527,16 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) st = rrdset_create( host, type, id, name, family, context, title, units, - (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, + (plugin && *plugin) ? plugin : parser->user.cd->filename, module, priority, update_every, chart_type); if (likely(st)) { if (options && *options) { - if (strstr(options, "obsolete")) + if (strstr(options, "obsolete")) { + pluginsd_rrdset_cleanup(st); rrdset_is_obsolete(st); + } else rrdset_isnot_obsolete(st); @@ -556,22 +560,21 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } } - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_CHART); return PARSER_RC_OK; } -PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) { const char *first_entry_txt = get_word(words, num_words, 1); const char *last_entry_txt = get_word(words, num_words, 2); const char *wall_clock_time_txt = get_word(words, num_words, 3); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; @@ -590,7 +593,6 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); - PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child, last_entry_child, child_wall_clock_time, 0, 0); @@ -605,8 +607,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } -PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) { char *id = get_word(words, num_words, 1); char *name = get_word(words, num_words, 2); char *algorithm = get_word(words, num_words, 3); @@ -614,14 +615,14 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) char *divisor_s = get_word(words, num_words, 5); char *options = get_word(words, num_words, 6); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if (unlikely(!id)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id"); long multiplier = 1; if (multiplier_s && *multiplier_s) { @@ -641,7 +642,7 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) algorithm = "absolute"; if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug( + netdata_log_debug( D_PLUGINSD, "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'", rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor, @@ -720,7 +721,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void pf->sent_ut = now_realtime_usec(); if(ret < 0) { - error("FUNCTION: failed to send function to plugin, error %d", ret); + netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret); rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); } else { @@ -734,7 +735,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) { struct inflight_function *pf = new_func; - error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); + netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); pf->callback(pf->destination_wb, pf->code, pf->callback_data); string_freez(pf->function); @@ -825,8 +826,7 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou return HTTP_RESP_OK; } -PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) { bool global = false; size_t i = 1; if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { @@ -838,21 +838,21 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) char *timeout_s = get_word(words, num_words, i++); char *help = get_word(words, num_words, i++); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION); if(!host) return PARSER_RC_ERROR; - RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART); + RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART); if(!st) global = true; if (unlikely(!timeout_s || !name || !help || (!global && !st))) { - error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.", - rrdhost_hostname(host), - st?rrdset_id(st):"(unset)", - global?"yes":"no", - name?name:"(unset)", - timeout_s?timeout_s:"(unset)", - help?help:"(unset)" - ); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.", + rrdhost_hostname(host), + st?rrdset_id(st):"(unset)", + global?"yes":"no", + name?name:"(unset)", + timeout_s?timeout_s:"(unset)", + help?help:"(unset)" + ); return PARSER_RC_ERROR; } @@ -863,7 +863,6 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; } - PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser); return PARSER_RC_OK; @@ -876,15 +875,14 @@ static void pluginsd_function_result_end(struct parser *parser, void *action_dat string_freez(key); } -PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) { char *key = get_word(words, num_words, 1); char *status = get_word(words, num_words, 2); char *format = get_word(words, num_words, 3); char *expires = get_word(words, num_words, 4); if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) { - error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." + netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." , key ? key : "(unset)" , status ? status : "(unset)" , format ? format : "(unset)" @@ -898,15 +896,13 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u time_t expiration = (expires && *expires) ? str2l(expires) : 0; - PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; - struct inflight_function *pf = NULL; if(key && *key) pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key); if(!pf) { - error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)"); + netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)"); } else { if(format && *format) @@ -932,16 +928,15 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u // ---------------------------------------------------------------------------- -PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) { char *name = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); NETDATA_DOUBLE v; - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_VARIABLE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_get_chart_from_parent(user); + RRDSET *st = pluginsd_get_chart_from_parent(parser); int global = (st) ? 0 : 1; @@ -958,39 +953,39 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) } if (unlikely(!name || !*name)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name"); if (unlikely(!value || !*value)) value = NULL; if (unlikely(!value)) { - error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - (global) ? "HOST" : "CHART", - name); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + (global) ? "HOST" : "CHART", + name); return PARSER_RC_OK; } if (!global && !st) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given"); char *endptr = NULL; v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr); if (unlikely(endptr && *endptr)) { if (endptr == value) - error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - value, - name); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name); else - error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - value, - name, - endptr); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name, + endptr); } if (global) { @@ -1000,9 +995,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) rrdvar_custom_host_variable_release(host, rva); } else - error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'", - rrdhost_hostname(host), - name); + netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'", + rrdhost_hostname(host), + name); } else { const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name); if (rsa) { @@ -1010,39 +1005,36 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) rrdsetvar_custom_chart_variable_release(st, rsa); } else - error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'", - rrdhost_hostname(host), rrdset_id(st), name); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'", + rrdhost_hostname(host), rrdset_id(st), name); } return PARSER_RC_OK; } -PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) -{ - debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH); - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH); - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; +static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH); + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_FLUSH); + parser->user.replay.start_time = 0; + parser->user.replay.end_time = 0; + parser->user.replay.start_time_ut = 0; + parser->user.replay.end_time_ut = 0; return PARSER_RC_OK; } -PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused) -{ - info("PLUGINSD: plugin called DISABLE. Disabling it."); - ((PARSER_USER_OBJECT *) user)->enabled = 0; +static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it."); + parser->user.enabled = 0; return PARSER_RC_STOP; } -PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) { const char *name = get_word(words, num_words, 1); const char *label_source = get_word(words, num_words, 2); const char *value = get_word(words, num_words, 3); if (!name || !label_source || !value) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters"); char *store = (char *)value; bool allocated_store = false; @@ -1071,13 +1063,10 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) } } - if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels)) - ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create(); + if(unlikely(!(parser->user.new_host_labels))) + parser->user.new_host_labels = rrdlabels_create(); - rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, - name, - store, - str2l(label_source)); + rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source)); if (allocated_store) freez(store); @@ -1085,90 +1074,84 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) -{ - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); +static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_OVERWRITE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - debug(D_PLUGINSD, "requested to OVERWRITE host labels"); + netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels"); if(unlikely(!host->rrdlabels)) host->rrdlabels = rrdlabels_create(); - rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels)); + rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); - rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); - ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; + rrdlabels_destroy(parser->user.new_host_labels); + parser->user.new_host_labels = NULL; return PARSER_RC_OK; } - -PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) { const char *name = get_word(words, num_words, 1); const char *value = get_word(words, num_words, 2); const char *label_source = get_word(words, num_words, 3); if (!name || !value || !*label_source) { - error("Ignoring malformed or empty CHART LABEL command."); - return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + netdata_log_error("Ignoring malformed or empty CHART LABEL command."); + return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } - if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) { - RRDSET *st = pluginsd_get_chart_from_parent(user); - ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels; - rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); + if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) { + RRDSET *st = pluginsd_get_chart_from_parent(parser); + parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels; + rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily); } - rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, - name, value, str2l(label_source)); + rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source)); return PARSER_RC_OK; } -PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) -{ - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); +static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - debug(D_PLUGINSD, "requested to commit chart labels"); + netdata_log_debug(D_PLUGINSD, "requested to commit chart labels"); - if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) { - error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", - rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!parser->user.chart_rrdlabels_linked_temporarily) { + netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host)); + return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } - rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); + rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily); rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL; + parser->user.chart_rrdlabels_linked_temporarily = NULL; return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) { +static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) { char *id = get_word(words, num_words, 1); char *start_time_str = get_word(words, num_words, 2); char *end_time_str = get_word(words, num_words, 3); char *child_now_str = get_word(words, num_words, 4); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDSET *st; if (likely(!id || !*id)) - st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); + st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); else st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(start_time_str && end_time_str) { time_t start_time = (time_t) str2ull_encoded(start_time_str); @@ -1216,36 +1199,37 @@ PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) { st->counter_done++; // these are only needed for db mode RAM, SAVE, MAP, ALLOC - st->current_entry++; - if(st->current_entry >= st->entries) - st->current_entry -= st->entries; + st->db.current_entry++; + if(st->db.current_entry >= st->db.entries) + st->db.current_entry -= st->db.entries; - ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; - ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; - ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time; - ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true; + parser->user.replay.start_time = start_time; + parser->user.replay.end_time = end_time; + parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; + parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; + parser->user.replay.wall_clock_time = wall_clock_time; + parser->user.replay.rset_enabled = true; return PARSER_RC_OK; } - error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN - " from %ld to %ld, but timestamps are invalid " - "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, - rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, - wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); + netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN + " from %ld to %ld, but timestamps are invalid " + "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, + rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, + wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", + tolerance); } // the child sends an RBEGIN without any parameters initially // setting rset_enabled to false, means the RSET should not store any metrics // to store metrics, the RBEGIN needs to have timestamps - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + parser->user.replay.start_time = 0; + parser->user.replay.end_time = 0; + parser->user.replay.start_time_ut = 0; + parser->user.replay.end_time_ut = 0; + parser->user.replay.wall_clock_time = 0; + parser->user.replay.rset_enabled = false; return PARSER_RC_OK; } @@ -1276,20 +1260,18 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str return flags; } -PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) { char *dimension = get_word(words, num_words, 1); char *value_str = get_word(words, num_words, 2); char *flags_str = get_word(words, num_words, 3); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - PARSER_USER_OBJECT *u = user; - if(!u->replay.rset_enabled) { + if(!parser->user.replay.rset_enabled) { error_limit_static_thread_var(erl, 1, 0); error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); @@ -1299,18 +1281,18 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) } RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - if (unlikely(!u->replay.start_time || !u->replay.end_time)) { - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.", + if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) { + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.", rrdhost_hostname(host), rrdset_id(st), dimension, PLUGINSD_KEYWORD_REPLAY_SET, - u->replay.start_time, - u->replay.end_time, + parser->user.replay.start_time, + parser->user.replay.end_time, PLUGINSD_KEYWORD_REPLAY_BEGIN); - return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } if (unlikely(!value_str || !*value_str)) @@ -1331,10 +1313,10 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) flags = SN_EMPTY_SLOT; } - rrddim_store_metric(rd, u->replay.end_time_ut, value, flags); - rd->last_collected_time.tv_sec = u->replay.end_time; - rd->last_collected_time.tv_usec = 0; - rd->collections_counter++; + rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags); + rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time; + rd->collector.last_collected_time.tv_usec = 0; + rd->collector.counter++; } else { error_limit_static_global_var(erl, 1, 0); @@ -1346,9 +1328,8 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user) -{ - if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) +static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) { + if(parser->user.replay.rset_enabled == false) return PARSER_RC_OK; char *dimension = get_word(words, num_words, 1); @@ -1357,42 +1338,41 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words char *last_calculated_value_str = get_word(words, num_words, 4); char *last_stored_value_str = get_word(words, num_words, 5); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec; + usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec; usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; if(last_collected_ut > dim_last_collected_ut) { - rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); - rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); + rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); + rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); } - rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0; - rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0; - rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0; + rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0; + rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0; + rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0; return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user) -{ - if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) +static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) { + if(parser->user.replay.rset_enabled == false) return PARSER_RC_OK; char *last_collected_ut_str = get_word(words, num_words, 1); char *last_updated_ut_str = get_word(words, num_words, 2); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec; usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; @@ -1414,10 +1394,9 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) { if (num_words < 7) { // accepts 7, but the 7th is optional - error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); + netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); return PARSER_RC_ERROR; } @@ -1441,13 +1420,11 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded( child_world_time_txt) : now_realtime_sec(); - PARSER_USER_OBJECT *user_object = user; - - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, @@ -1460,12 +1437,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) ); #endif - ((PARSER_USER_OBJECT *) user)->data_collections_count++; + parser->user.data_collections_count++; - if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) { + if(parser->user.replay.rset_enabled && st->rrdhost->receiver) { time_t now = now_realtime_sec(); time_t started = st->rrdhost->receiver->replication_first_time_t; - time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; + time_t current = parser->user.replay.end_time; if(started && current > started) { host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started); @@ -1474,12 +1451,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) } } - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + parser->user.replay.start_time = 0; + parser->user.replay.end_time = 0; + parser->user.replay.start_time_ut = 0; + parser->user.replay.end_time_ut = 0; + parser->user.replay.wall_clock_time = 0; + parser->user.replay.rset_enabled = false; st->counter++; st->counter_done++; @@ -1509,7 +1486,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) rrdhost_hostname(host), rrdset_id(st)); #endif - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END); + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END); host->rrdpush_receiver_replication_percent = 100.0; worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent); @@ -1517,17 +1494,17 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) return PARSER_RC_OK; } - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END); + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END); rrdcontext_updated_retention_rrdset(st); - bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, + bool ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child, last_entry_child, child_world_time, first_entry_requested, last_entry_requested); return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } -PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { +static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); char *id = get_word(words, num_words, 1); @@ -1536,17 +1513,17 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { char *wall_clock_time_str = get_word(words, num_words, 4); if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); timing_step(TIMING_STEP_BEGIN2_PREPARE); RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN_V2); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN_V2); if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED))) rrdset_isnot_obsolete(st); @@ -1573,32 +1550,31 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { // ------------------------------------------------------------------------ // prepare our state - pluginsd_lock_rrdset_data_collection(user); + pluginsd_lock_rrdset_data_collection(parser); - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - u->v2.update_every = update_every; - u->v2.end_time = end_time; - u->v2.wall_clock_time = wall_clock_time; - u->v2.ml_locked = ml_chart_update_begin(st); + parser->user.v2.update_every = update_every; + parser->user.v2.end_time = end_time; + parser->user.v2.wall_clock_time = wall_clock_time; + parser->user.v2.ml_locked = ml_chart_update_begin(st); timing_step(TIMING_STEP_BEGIN2_ML); // ------------------------------------------------------------------------ // propagate it forward in v2 - if(!u->v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost)) - u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time); + if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost)) + parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time); - if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) { + if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) { // check if receiver and sender have the same number parsing capabilities - bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754); - NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); + NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; - BUFFER *wb = u->v2.stream_buffer.wb; + BUFFER *wb = parser->user.v2.stream_buffer.wb; buffer_need_bytes(wb, 1024); - if(unlikely(u->v2.stream_buffer.begin_v2_added)) + if(unlikely(parser->user.v2.stream_buffer.begin_v2_added)) buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); @@ -1626,8 +1602,8 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { buffer_fast_strcat(wb, "\n", 1); - u->v2.stream_buffer.last_point_end_time_s = end_time; - u->v2.stream_buffer.begin_v2_added = true; + parser->user.v2.stream_buffer.last_point_end_time_s = end_time; + parser->user.v2.stream_buffer.begin_v2_added = true; } timing_step(TIMING_STEP_BEGIN2_PROPAGATE); @@ -1643,16 +1619,16 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { st->counter_done++; // these are only needed for db mode RAM, SAVE, MAP, ALLOC - st->current_entry++; - if(st->current_entry >= st->entries) - st->current_entry -= st->entries; + st->db.current_entry++; + if(st->db.current_entry >= st->db.entries) + st->db.current_entry -= st->db.entries; timing_step(TIMING_STEP_BEGIN2_STORE); return PARSER_RC_OK; } -PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { +static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); char *dimension = get_word(words, num_words, 1); @@ -1661,20 +1637,18 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { char *flags_str = get_word(words, num_words, 4); if(unlikely(!dimension || !collected_str || !value_str || !flags_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); - - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); timing_step(TIMING_STEP_SET2_PREPARE); RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2); - if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED))) rrddim_isnot_obsolete(st, rd); @@ -1703,11 +1677,11 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { value = NAN; flags = SN_EMPTY_SLOT; - if(u->v2.ml_locked) - ml_dimension_is_anomalous(rd, u->v2.end_time, 0, false); + if(parser->user.v2.ml_locked) + ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false); } - else if(u->v2.ml_locked) { - if (ml_dimension_is_anomalous(rd, u->v2.end_time, value, true)) { + else if(parser->user.v2.ml_locked) { + if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) { // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS); } @@ -1720,13 +1694,13 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { // ------------------------------------------------------------------------ // propagate it forward in v2 - if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) { + if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) { // check if receiver and sender have the same number parsing capabilities - bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754); - NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; - NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); + NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; - BUFFER *wb = u->v2.stream_buffer.wb; + BUFFER *wb = parser->user.v2.stream_buffer.wb; buffer_need_bytes(wb, 1024); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); @@ -1750,51 +1724,50 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { // ------------------------------------------------------------------------ // store it - rrddim_store_metric(rd, u->v2.end_time * USEC_PER_SEC, value, flags); - rd->last_collected_time.tv_sec = u->v2.end_time; - rd->last_collected_time.tv_usec = 0; - rd->last_collected_value = collected_value; - rd->last_stored_value = value; - rd->last_calculated_value = value; - rd->collections_counter++; - rd->updated = true; + rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags); + rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time; + rd->collector.last_collected_time.tv_usec = 0; + rd->collector.last_collected_value = collected_value; + rd->collector.last_stored_value = value; + rd->collector.last_calculated_value = value; + rd->collector.counter++; + rrddim_set_updated(rd); timing_step(TIMING_STEP_SET2_STORE); return PARSER_RC_OK; } -void pluginsd_cleanup_v2(void *user) { +void pluginsd_cleanup_v2(PARSER *parser) { // this is called when the thread is stopped while processing - pluginsd_set_chart_from_parent(user, NULL, "THREAD CLEANUP"); + pluginsd_set_chart_from_parent(parser, NULL, "THREAD CLEANUP"); } -PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { +static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { timing_init(); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - u->data_collections_count++; + parser->user.data_collections_count++; timing_step(TIMING_STEP_END2_PREPARE); // ------------------------------------------------------------------------ // propagate the whole chart update in v1 - if(unlikely(!u->v2.stream_buffer.v2 && !u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb)) - rrdset_push_metrics_v1(&u->v2.stream_buffer, st); + if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb)) + rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st); timing_step(TIMING_STEP_END2_PUSH_V1); // ------------------------------------------------------------------------ // unblock data collection - pluginsd_unlock_previous_chart(user, PLUGINSD_KEYWORD_END_V2, false); + pluginsd_unlock_previous_chart(parser, PLUGINSD_KEYWORD_END_V2, false); rrdcontext_collected_rrdset(st); store_metric_collection_completed(); @@ -1803,7 +1776,7 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_ // ------------------------------------------------------------------------ // propagate it forward - rrdset_push_metrics_finished(&u->v2.stream_buffer, st); + rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st); timing_step(TIMING_STEP_END2_PROPAGATE); @@ -1812,16 +1785,16 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_ RRDDIM *rd; rrddim_foreach_read(rd, st) { - rd->calculated_value = 0; - rd->collected_value = 0; - rd->updated = false; - } + rd->collector.calculated_value = 0; + rd->collector.collected_value = 0; + rrddim_clear_updated(rd); + } rrddim_foreach_done(rd); // ------------------------------------------------------------------------ // reset state - u->v2 = (struct parser_user_object_v2){ 0 }; + parser->user.v2 = (struct parser_user_object_v2){ 0 }; timing_step(TIMING_STEP_END2_STORE); timing_report(); @@ -1829,19 +1802,126 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_ return PARSER_RC_OK; } +static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + netdata_log_info("PLUGINSD: plugin called EXIT."); + return PARSER_RC_STOP; +} + +static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser) +{ + const char *host_uuid_str = get_word(words, num_words, 1); + const char *claim_id_str = get_word(words, num_words, 2); + + if (!host_uuid_str || !claim_id_str) { + netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", + host_uuid_str ? host_uuid_str : "[unset]", + claim_id_str ? claim_id_str : "[unset]"); + return PARSER_RC_ERROR; + } + + uuid_t uuid; + RRDHOST *host = parser->user.host; + + // We don't need the parsed UUID + // just do it to check the format + if(uuid_parse(host_uuid_str, uuid)) { + netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); + return PARSER_RC_ERROR; + } + if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) { + netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); + return PARSER_RC_ERROR; + } + + if(strcmp(host_uuid_str, host->machine_guid) != 0) { + netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); + return PARSER_RC_OK; //the message is OK problem must be somewhere else + } + + rrdhost_aclk_state_lock(host); + + if (host->aclk_state.claimed_id) + freez(host->aclk_state.claimed_id); + + host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; + + rrdhost_aclk_state_unlock(host); + + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); + + rrdpush_send_claimed_id(host); + + return PARSER_RC_OK; +} + +// ---------------------------------------------------------------------------- + +static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) { +#ifdef NETDATA_INTERNAL_CHECKS + if(reader->read_buffer[reader->read_len] != '\0') + fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); +#endif + + ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1); + if(unlikely(bytes_read <= 0)) + return false; + + reader->read_len += bytes_read; + reader->read_buffer[reader->read_len] = '\0'; + + return true; +} + +static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) { + errno = 0; + struct pollfd fds[1]; + + fds[0].fd = fd; + fds[0].events = POLLIN; + + int ret = poll(fds, 1, timeout_ms); + + if (ret > 0) { + /* There is data to read */ + if (fds[0].revents & POLLIN) + return buffered_reader_read(reader, fd); + + else if(fds[0].revents & POLLERR) { + netdata_log_error("PARSER: read failed: POLLERR."); + return false; + } + else if(fds[0].revents & POLLHUP) { + netdata_log_error("PARSER: read failed: POLLHUP."); + return false; + } + else if(fds[0].revents & POLLNVAL) { + netdata_log_error("PARSER: read failed: POLLNVAL."); + return false; + } + + netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set."); + return false; + } + else if (ret == 0) { + netdata_log_error("PARSER: timeout while waiting for data."); + return false; + } + + netdata_log_error("PARSER: poll() failed with code %d.", ret); + return false; +} + void pluginsd_process_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; - pluginsd_cleanup_v2(parser->user); - pluginsd_host_define_cleanup(parser->user); + pluginsd_cleanup_v2(parser); + pluginsd_host_define_cleanup(parser); rrd_collector_finished(); parser_destroy(parser); } -// New plugins.d parser - inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) { int enabled = cd->unsafe.enabled; @@ -1852,13 +1932,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi } if (unlikely(fileno(fp_plugin_input) == -1)) { - error("input file descriptor given is not a valid stream"); + netdata_log_error("input file descriptor given is not a valid stream"); cd->serial_failures++; return 0; } if (unlikely(fileno(fp_plugin_output) == -1)) { - error("output file descriptor given is not a valid stream"); + netdata_log_error("output file descriptor given is not a valid stream"); cd->serial_failures++; return 0; } @@ -1866,38 +1946,42 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi clearerr(fp_plugin_input); clearerr(fp_plugin_output); - PARSER_USER_OBJECT user = { - .enabled = cd->unsafe.enabled, - .host = host, - .cd = cd, - .trust_durations = trust_durations - }; + PARSER *parser; + { + PARSER_USER_OBJECT user = { + .enabled = cd->unsafe.enabled, + .host = host, + .cd = cd, + .trust_durations = trust_durations + }; - // fp_plugin_output = our input; fp_plugin_input = our output - PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, - PARSER_INPUT_SPLIT, NULL); + // fp_plugin_output = our input; fp_plugin_input = our output + parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); + } pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD); rrd_collector_started(); + size_t count = 0; + // this keeps the parser with its current value // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - user.parser = parser; - char buffer[PLUGINSD_LINE_MAX + 1]; - - while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) { - if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer))) + buffered_reader_init(&parser->reader); + char buffer[PLUGINSD_LINE_MAX + 2]; + while(likely(service_running(SERVICE_COLLECTORS))) { + if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) { + if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC))) + break; + } + else if(unlikely(parser_action(parser, buffer))) break; } - // free parser with the pop function - netdata_thread_cleanup_pop(1); - - cd->unsafe.enabled = user.enabled; - size_t count = user.data_collections_count; + cd->unsafe.enabled = parser->user.enabled; + count = parser->user.data_collections_count; if (likely(count)) { cd->successful_collections += count; @@ -1906,143 +1990,187 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi else cd->serial_failures++; + // free parser with the pop function + netdata_thread_cleanup_pop(1); + return count; } -PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused) -{ - info("PLUGINSD: plugin called EXIT."); - return PARSER_RC_STOP; +void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) { + parser_init_repertoire(parser, repertoire); + + if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) + inflight_functions_init(parser); } -static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) { +PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, + PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) { + PARSER *parser; - if (types & PARSER_INIT_PLUGINSD) { - add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); - add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); + parser = callocz(1, sizeof(*parser)); + if(user) + parser->user = *user; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; +#ifdef ENABLE_HTTPS + parser->ssl_output = ssl; +#endif + parser->flags = flags; - add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define); - add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end); - add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels); - add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host); + spinlock_init(&parser->writer.spinlock); + return parser; +} - add_func(parser, PLUGINSD_KEYWORD_EXIT, pluginsd_exit); - } +PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) { + switch(keyword->id) { + case 1: + return pluginsd_set_v2(words, num_words, parser); - if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) { - // plugins.d plugins and streaming - add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); - add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); - add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); - add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); - add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); - add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); - add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); - add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); - add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); + case 2: + return pluginsd_begin_v2(words, num_words, parser); - add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); - add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); - add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + case 3: + return pluginsd_end_v2(words, num_words, parser); - inflight_functions_init(parser); - } + case 11: + return pluginsd_set(words, num_words, parser); - if (types & PARSER_INIT_STREAMING) { - add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end); + case 12: + return pluginsd_begin(words, num_words, parser); - // replication - add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end); + case 13: + return pluginsd_end(words, num_words, parser); - // streaming metrics v2 - add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2); - add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2); - add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2); - } -} + case 21: + return pluginsd_replay_set(words, num_words, parser); -void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) { - pluginsd_keywords_init_internal(parser, types, parser_add_keyword); -} + case 22: + return pluginsd_replay_begin(words, num_words, parser); -struct pluginsd_user_unittest { - size_t size; - const char **hashtable; - uint32_t (*hash)(const char *s); - size_t collisions; -}; + case 23: + return pluginsd_replay_rrddim_collection_state(words, num_words, parser); + + case 24: + return pluginsd_replay_rrdset_collection_state(words, num_words, parser); + + case 25: + return pluginsd_replay_end(words, num_words, parser); + + case 31: + return pluginsd_dimension(words, num_words, parser); + + case 32: + return pluginsd_chart(words, num_words, parser); + + case 33: + return pluginsd_chart_definition_end(words, num_words, parser); + + case 34: + return pluginsd_clabel(words, num_words, parser); + + case 35: + return pluginsd_clabel_commit(words, num_words, parser); + + case 41: + return pluginsd_function(words, num_words, parser); + + case 42: + return pluginsd_function_result_begin(words, num_words, parser); + + case 51: + return pluginsd_label(words, num_words, parser); + + case 52: + return pluginsd_overwrite(words, num_words, parser); + + case 53: + return pluginsd_variable(words, num_words, parser); + + case 61: + return streaming_claimed_id(words, num_words, parser); + + case 71: + return pluginsd_host(words, num_words, parser); + + case 72: + return pluginsd_host_define(words, num_words, parser); + + case 73: + return pluginsd_host_define_end(words, num_words, parser); -void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) { - struct pluginsd_user_unittest *u = parser->user; + case 74: + return pluginsd_host_labels(words, num_words, parser); - uint32_t hash = u->hash(keyword); - uint32_t slot = hash % u->size; + case 97: + return pluginsd_flush(words, num_words, parser); - if(u->hashtable[slot]) - u->collisions++; + case 98: + return pluginsd_disable(words, num_words, parser); - u->hashtable[slot] = keyword; + case 99: + return pluginsd_exit(words, num_words, parser); + + default: + fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id); + } } -static struct { - const char *name; - uint32_t (*hash)(const char *s); - size_t slots_needed; -} hashers[] = { - { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, }, - { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, }, - { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, }, - { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, }, - { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, }, - - // terminator - { .name = NULL, NULL, .slots_needed = 0, }, -}; +#include "gperf-hashtable.h" + +void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) { + parser->repertoire = repertoire; + + for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) { + if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire)) + worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword); + } +} + +void parser_destroy(PARSER *parser) { + if (unlikely(!parser)) + return; + + dictionary_destroy(parser->inflight.functions); + freez(parser); +} int pluginsd_parser_unittest(void) { - PARSER *p; - size_t slots_to_check = 1000; - size_t i, h; - - // check for hashtable collisions - for(h = 0; hashers[h].name ;h++) { - hashers[h].slots_needed = slots_to_check * 1000000; - - for (i = 10; i < slots_to_check; i++) { - struct pluginsd_user_unittest user = { - .hash = hashers[h].hash, - .size = i, - .hashtable = callocz(i, sizeof(const char *)), - .collisions = 0, - }; - - p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); - pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING, - pluginsd_keyword_collision_check); - parser_destroy(p); - - freez(user.hashtable); - - if (!user.collisions) { - hashers[h].slots_needed = i; - break; - } + PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); + pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING); + + char *lines[] = { + "BEGIN2 abcdefghijklmnopqr 123", + "SET2 abcdefg 0x12345678 0 0", + "SET2 hijklmnoqr 0x12345678 0 0", + "SET2 stuvwxyz 0x12345678 0 0", + "END2", + NULL, + }; + + char *words[PLUGINSD_MAX_WORDS]; + size_t iterations = 1000000; + size_t count = 0; + char input[PLUGINSD_LINE_MAX + 1]; + + usec_t started = now_realtime_usec(); + while(--iterations) { + for(size_t line = 0; lines[line] ;line++) { + strncpyz(input, lines[line], PLUGINSD_LINE_MAX); + size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); + const char *command = get_word(words, num_words, 0); + PARSER_KEYWORD *keyword = parser_find_keyword(p, command); + if(unlikely(!keyword)) + fatal("Cannot parse the line '%s'", lines[line]); + count++; } } + usec_t ended = now_realtime_usec(); - for(h = 0; hashers[h].name ;h++) { - if(hashers[h].slots_needed > 1000) - info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check); - else - info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed); - } + netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count, + (double)(ended - started) / (double)USEC_PER_SEC, + (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0); - p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); - pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING); parser_destroy(p); return 0; } diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 1fdc23a0e..5e1ea1242 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -5,13 +5,39 @@ #include "daemon/common.h" +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +// PARSER return codes +typedef enum __attribute__ ((__packed__)) parser_rc { + PARSER_RC_OK, // Callback was successful, go on + PARSER_RC_STOP, // Callback says STOP + PARSER_RC_ERROR // Callback failed (abort rest of callbacks) +} PARSER_RC; + +typedef enum __attribute__ ((__packed__)) parser_input_type { + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 2), +} PARSER_INPUT_TYPE; + typedef enum __attribute__ ((__packed__)) { PARSER_INIT_PLUGINSD = (1 << 1), PARSER_INIT_STREAMING = (1 << 2), -} PLUGINSD_KEYWORDS; +} PARSER_REPERTOIRE; + +struct parser; +typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser); + +typedef struct parser_keyword { + char *keyword; + size_t id; + PARSER_REPERTOIRE repertoire; + size_t worker_job_id; +} PARSER_KEYWORD; typedef struct parser_user_object { - PARSER *parser; RRDSET *st; RRDHOST *host; void *opaque; @@ -54,9 +80,142 @@ typedef struct parser_user_object { } v2; } PARSER_USER_OBJECT; -PARSER_RC pluginsd_function(char **words, size_t num_words, void *user); -PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user); +typedef struct parser { + uint8_t version; // Parser version + PARSER_REPERTOIRE repertoire; + uint32_t flags; + int fd; // Socket + size_t line; + FILE *fp_input; // Input source e.g. stream + FILE *fp_output; // Stream to send commands to plugin + +#ifdef ENABLE_HTTPS + NETDATA_SSL *ssl_output; +#endif + + PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls + + struct buffered_reader reader; + + struct { + const char *end_keyword; + BUFFER *response; + void (*action)(struct parser *parser, void *action_data); + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_timeout; + } inflight; + + struct { + SPINLOCK spinlock; + } writer; + +} PARSER; + +PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); +void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire); +void parser_destroy(PARSER *working_parser); +void pluginsd_cleanup_v2(PARSER *parser); void inflight_functions_init(PARSER *parser); -void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types); +void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire); +PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words); + +static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) { + const char *s = src, *keyword_start; + + while (unlikely(isspace_map[(uint8_t)*s])) s++; + keyword_start = s; + + while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) { + *dst++ = *s++; + dst_size--; + } + *dst = '\0'; + return dst_size == 0 ? 0 : (int) (s - keyword_start); +} + +PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len); + +static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) { + PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command)); + if(t && (t->repertoire & parser->repertoire)) + return t; + + return NULL; +} + +static inline int parser_action(PARSER *parser, char *input) { + parser->line++; + + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + char command[PLUGINSD_LINE_MAX + 1]; + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, isspace_map_pluginsd); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + char *words[PLUGINSD_MAX_WORDS]; + size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); + const char *command = get_word(words, num_words, 0); + + if(unlikely(!command)) + return 0; + + PARSER_RC rc; + PARSER_KEYWORD *t = parser_find_keyword(parser, command); + if(likely(t)) { + worker_is_busy(t->worker_job_id); + rc = parser_execute(parser, t, words, num_words); + // rc = (*t->func)(words, num_words, parser); + worker_is_idle(); + } + else + rc = PARSER_RC_ERROR; + + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } + + return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); +} #endif //NETDATA_PLUGINSD_PARSER_H -- cgit v1.2.3