diff options
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/gperf-config.txt | 69 | ||||
-rw-r--r-- | collectors/plugins.d/gperf-hashtable.h | 178 | ||||
-rw-r--r-- | collectors/plugins.d/local_listeners.c | 54 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 83 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 5 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 771 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 69 |
7 files changed, 794 insertions, 435 deletions
diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt index a1d0c51ba..bad51367c 100644 --- a/collectors/plugins.d/gperf-config.txt +++ b/collectors/plugins.d/gperf-config.txt @@ -12,46 +12,47 @@ PARSER_KEYWORD; # # Plugins Only Keywords # -FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1 -DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2 -EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3 -HOST, 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4 -HOST_DEFINE, 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5 -HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6 -HOST_LABEL, 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7 +FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1 +DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2 +EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3 +HOST, 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4 +HOST_DEFINE, 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5 +HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6 +HOST_LABEL, 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7 # # Common keywords # -BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8 -CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9 -CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10 -CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11 -DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12 -END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13 -FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14 -FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15 -LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16 -OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17 -SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18 -VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19 -DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 -DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 -DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 -REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 -DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 +BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8 +CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9 +CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10 +CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11 +DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12 +END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13 +FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14 +FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15 +LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16 +OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17 +SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18 +VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19 +DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 +DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 +DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 +DYNCFG_RESET, 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 +REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 +DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 # # Streaming only keywords # -CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 -BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26 -SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 -END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 +CLAIMED_ID, 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26 +BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 +SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 +END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 # # Streaming Replication keywords # -CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 -RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30 -RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 -REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 -RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33 -RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34 +CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30 +RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 +RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 +REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33 +RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34 +RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35 diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h index 5bbf9fa98..b327d8d6d 100644 --- a/collectors/plugins.d/gperf-hashtable.h +++ b/collectors/plugins.d/gperf-hashtable.h @@ -30,12 +30,12 @@ #endif -#define GPERF_PARSER_TOTAL_KEYWORDS 34 +#define GPERF_PARSER_TOTAL_KEYWORDS 35 #define GPERF_PARSER_MIN_WORD_LENGTH 3 #define GPERF_PARSER_MAX_WORD_LENGTH 22 #define GPERF_PARSER_MIN_HASH_VALUE 3 -#define GPERF_PARSER_MAX_HASH_VALUE 36 -/* maximum key range = 34, duplicates = 0 */ +#define GPERF_PARSER_MAX_HASH_VALUE 47 +/* maximum key range = 45, duplicates = 0 */ #ifdef __GNUC__ __inline @@ -49,32 +49,32 @@ gperf_keyword_hash_function (register const char *str, register size_t len) { static unsigned char asso_values[] = { - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 12, 28, 5, 2, 0, - 0, 37, 3, 13, 37, 37, 14, 37, 0, 2, - 37, 37, 1, 3, 37, 6, 10, 37, 32, 2, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, - 37, 37, 37, 37, 37, 37 + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 11, 18, 0, 0, 0, + 6, 48, 9, 0, 48, 48, 20, 48, 0, 8, + 48, 48, 1, 12, 48, 20, 18, 48, 2, 0, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, + 48, 48, 48, 48, 48, 48 }; return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]]; } @@ -83,73 +83,79 @@ static PARSER_KEYWORD gperf_keywords[] = { {(char*)0}, {(char*)0}, {(char*)0}, #line 30 "gperf-config.txt" - {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, -#line 48 "gperf-config.txt" - {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, + {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, +#line 49 "gperf-config.txt" + {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, +#line 56 "gperf-config.txt" + {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33}, +#line 17 "gperf-config.txt" + {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}, +#line 16 "gperf-config.txt" + {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, #line 55 "gperf-config.txt" - {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, + {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, +#line 29 "gperf-config.txt" + {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12}, +#line 42 "gperf-config.txt" + {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, + {(char*)0}, +#line 40 "gperf-config.txt" + {"DYNCFG_RESET", 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, +#line 37 "gperf-config.txt" + {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, +#line 26 "gperf-config.txt" + {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9}, #line 35 "gperf-config.txt" - {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, -#line 47 "gperf-config.txt" - {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, -#line 56 "gperf-config.txt" - {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33}, -#line 18 "gperf-config.txt" - {"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4}, -#line 54 "gperf-config.txt" - {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, + {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, +#line 48 "gperf-config.txt" + {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, #line 57 "gperf-config.txt" - {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34}, + {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34}, #line 41 "gperf-config.txt" - {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, -#line 26 "gperf-config.txt" - {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9}, -#line 31 "gperf-config.txt" - {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14}, + {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, +#line 39 "gperf-config.txt" + {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, +#line 58 "gperf-config.txt" + {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35}, +#line 18 "gperf-config.txt" + {"HOST", 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4}, +#line 38 "gperf-config.txt" + {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, +#line 25 "gperf-config.txt" + {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, +#line 47 "gperf-config.txt" + {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, +#line 54 "gperf-config.txt" + {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, +#line 27 "gperf-config.txt" + {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10}, #line 21 "gperf-config.txt" - {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7}, + {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7}, #line 19 "gperf-config.txt" - {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5}, -#line 37 "gperf-config.txt" - {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, -#line 40 "gperf-config.txt" - {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, + {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5}, +#line 53 "gperf-config.txt" + {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30}, +#line 46 "gperf-config.txt" + {"CLAIMED_ID", 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26}, #line 15 "gperf-config.txt" - {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, + {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, #line 20 "gperf-config.txt" - {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6}, + {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6}, +#line 28 "gperf-config.txt" + {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11}, +#line 31 "gperf-config.txt" + {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14}, #line 34 "gperf-config.txt" - {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17}, -#line 16 "gperf-config.txt" - {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, -#line 39 "gperf-config.txt" - {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, -#line 29 "gperf-config.txt" - {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12}, -#line 27 "gperf-config.txt" - {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10}, -#line 38 "gperf-config.txt" - {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, -#line 32 "gperf-config.txt" - {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15}, -#line 52 "gperf-config.txt" - {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, -#line 45 "gperf-config.txt" - {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, -#line 36 "gperf-config.txt" - {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19}, + {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17}, #line 33 "gperf-config.txt" - {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16}, -#line 28 "gperf-config.txt" - {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11}, -#line 25 "gperf-config.txt" - {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, -#line 46 "gperf-config.txt" - {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26}, -#line 53 "gperf-config.txt" - {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30}, -#line 17 "gperf-config.txt" - {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3} + {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16}, +#line 36 "gperf-config.txt" + {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19}, + {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, + {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, + {(char*)0}, +#line 32 "gperf-config.txt" + {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15} }; PARSER_KEYWORD * diff --git a/collectors/plugins.d/local_listeners.c b/collectors/plugins.d/local_listeners.c index a39de7974..f2c5e688b 100644 --- a/collectors/plugins.d/local_listeners.c +++ b/collectors/plugins.d/local_listeners.c @@ -338,25 +338,59 @@ bool read_proc_net_x(const char *filename, PROC_NET_PROTOCOLS protocol) { } // ---------------------------------------------------------------------------- - -int main(int argc __maybe_unused, char **argv __maybe_unused) { +typedef struct { + bool read_tcp; + bool read_tcp6; + bool read_udp; + bool read_udp6; +} CommandLineArguments; + +int main(int argc, char **argv) { char path[FILENAME_MAX + 1]; hashTable_key_inode_port_value = createHashTable(); netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); if(!netdata_configured_host_prefix) netdata_configured_host_prefix = ""; - snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_TCP); + CommandLineArguments args = {.read_tcp = false, .read_tcp6 = false, .read_udp = false, .read_udp6 = false}; + + for (int i = 1; i < argc; i++) { + if (strcmp("tcp", argv[i]) == 0) { + args.read_tcp = true; + continue; + } else if (strcmp("tcp6", argv[i]) == 0) { + args.read_tcp6 = true; + continue; + } else if (strcmp("udp", argv[i]) == 0) { + args.read_udp = true; + continue; + } else if (strcmp("udp6", argv[i]) == 0) { + args.read_udp6 = true; + continue; + } + } + + bool read_all_files = (!args.read_tcp && !args.read_tcp6 && !args.read_udp && !args.read_udp6); - snprintfz(path, FILENAME_MAX, "%s/proc/net/udp", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_UDP); + if (read_all_files || args.read_tcp) { + snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_TCP); + } - snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp6", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_TCP6); + if (read_all_files || args.read_udp) { + snprintfz(path, FILENAME_MAX, "%s/proc/net/udp", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_UDP); + } - snprintfz(path, FILENAME_MAX, "%s/proc/net/udp6", netdata_configured_host_prefix); - read_proc_net_x(path, PROC_NET_PROTOCOL_UDP6); + if (read_all_files || args.read_tcp6) { + snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp6", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_TCP6); + } + + if (read_all_files || args.read_udp6) { + snprintfz(path, FILENAME_MAX, "%s/proc/net/udp6", netdata_configured_host_prefix); + read_proc_net_x(path, PROC_NET_PROTOCOL_UDP6); + } snprintfz(path, FILENAME_MAX, "%s/proc", netdata_configured_host_prefix); find_all_sockets_in_proc(path); diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 08c26a198..20061ad29 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -47,8 +47,7 @@ static inline bool plugin_is_running(struct plugind *cd) { return ret; } -static void pluginsd_worker_thread_cleanup(void *arg) -{ +static void pluginsd_worker_thread_cleanup(void *arg) { struct plugind *cd = (struct plugind *)arg; worker_unregister(); @@ -143,41 +142,64 @@ static void *pluginsd_worker_thread(void *arg) { netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg); - struct plugind *cd = (struct plugind *)arg; - plugin_set_running(cd); + { + struct plugind *cd = (struct plugind *) arg; + plugin_set_running(cd); - size_t count = 0; + size_t count = 0; - while (service_running(SERVICE_COLLECTORS)) { - FILE *fp_child_input = NULL; - FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); + while(service_running(SERVICE_COLLECTORS)) { + FILE *fp_child_input = NULL; + FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); - if (unlikely(!fp_child_input || !fp_child_output)) { - netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd); - break; - } + if(unlikely(!fp_child_input || !fp_child_output)) { + netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", + rrdhost_hostname(cd->host), cd->cmd); + break; + } - netdata_log_info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "PLUGINSD: 'host:%s' connected to '%s' running on pid %d", + rrdhost_hostname(cd->host), + cd->fullfilename, cd->unsafe.pid); - count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0); + const char *plugin = strrchr(cd->fullfilename, '/'); + if(plugin) + plugin++; + else + plugin = cd->fullfilename; - netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).", - rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count); + char module[100]; + snprintfz(module, sizeof(module), "plugins.d[%s]", plugin); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_MODULE, module), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rrdhost_hostname(cd->host)), + ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "pluginsd"), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - killpid(cd->unsafe.pid); + count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0); - int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count); - if (likely(worker_ret_code == 0)) - pluginsd_worker_thread_handle_success(cd); - else - pluginsd_worker_thread_handle_error(cd, worker_ret_code); + killpid(cd->unsafe.pid); - cd->unsafe.pid = 0; - if (unlikely(!plugin_is_enabled(cd))) - break; - } + int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); + + if(likely(worker_ret_code == 0)) + pluginsd_worker_thread_handle_success(cd); + else + pluginsd_worker_thread_handle_error(cd, worker_ret_code); + + cd->unsafe.pid = 0; + + if(unlikely(!plugin_is_enabled(cd))) + break; + } + } netdata_thread_cleanup_pop(1); return NULL; @@ -217,6 +239,13 @@ void *pluginsd_main(void *ptr) // disable some plugins by default config_get_boolean(CONFIG_SECTION_PLUGINS, "slabinfo", CONFIG_BOOLEAN_NO); + config_get_boolean(CONFIG_SECTION_PLUGINS, "logs-management", +#if defined(LOGS_MANAGEMENT_DEV_MODE) + CONFIG_BOOLEAN_YES +#else + CONFIG_BOOLEAN_NO +#endif + ); // it crashes (both threads) on Alpine after we made it multi-threaded // works with "--device /dev/ipmi0", but this is not default // see https://github.com/netdata/netdata/pull/15564 for details diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 7c5df4168..37c70f7e3 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -16,14 +16,11 @@ #define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE" #define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE" #define PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB "DYNCFG_REGISTER_JOB" +#define PLUGINSD_KEYWORD_DYNCFG_RESET "DYNCFG_RESET" #define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS" #define PLUGINSD_KEYWORD_DELETE_JOB "DELETE_JOB" -#define PLUGINSD_LINE_MAX_SSL_READ 512 - -#define PLUGINSD_MAX_WORDS 20 - #define PLUGINSD_MAX_DIRECTORIES 20 extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES]; diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 2e69c7da5..3b47c6c0f 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,8 +4,8 @@ #define LOG_FUNCTIONS false -#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING) -#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD) +#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING) +#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD) static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; @@ -13,6 +13,11 @@ static ssize_t send_to_plugin(const char *txt, void *data) { if(!txt || !*txt) return 0; +#ifdef ENABLE_H2O + if(parser->h2o_ctx) + return h2o_stream_write(parser->h2o_ctx, txt, strlen(txt)); +#endif + errno = 0; spinlock_lock(&parser->writer.spinlock); ssize_t bytes = -1; @@ -110,23 +115,6 @@ static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) { return false; } -void pluginsd_rrdset_cleanup(RRDSET *st) { - spinlock_lock(&st->pluginsd.spinlock); - - for(size_t i = 0; i < st->pluginsd.size ; i++) { - rrddim_acquired_release(st->pluginsd.rda[i]); // can be NULL - st->pluginsd.rda[i] = NULL; - } - - freez(st->pluginsd.rda); - st->pluginsd.collector_tid = 0; - st->pluginsd.rda = NULL; - st->pluginsd.size = 0; - st->pluginsd.pos = 0; - - spinlock_unlock(&st->pluginsd.spinlock); -} - static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) { if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) { if(stale) @@ -150,7 +138,12 @@ static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const ch static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) { pluginsd_unlock_previous_scope_chart(parser, keyword, true); + + if(parser->user.cleanup_slots && parser->user.st) + rrdset_pluginsd_receive_unslot(parser->user.st); + parser->user.st = NULL; + parser->user.cleanup_slots = false; } static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) { @@ -160,11 +153,12 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch if(unlikely(old_collector_tid)) { if(old_collector_tid != my_collector_tid) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)", - keyword ? keyword : "UNKNOWN", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - my_collector_tid, old_collector_tid); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING, + "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)", + keyword ? keyword : "UNKNOWN", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + my_collector_tid, old_collector_tid); return false; } @@ -176,61 +170,141 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch pluginsd_clear_scope_chart(parser, keyword); - size_t dims = dictionary_entries(st->rrddim_root_index); - if(unlikely(st->pluginsd.size < dims)) { - st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *)); + st->pluginsd.pos = 0; + parser->user.st = st; + parser->user.cleanup_slots = false; + + return true; +} + +static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) { + size_t wanted_size = st->pluginsd.size; + + if(slot >= 1) { + st->pluginsd.dims_with_slots = true; + wanted_size = slot; + } + else { + st->pluginsd.dims_with_slots = false; + wanted_size = dictionary_entries(st->rrddim_root_index); + } + + if(wanted_size > st->pluginsd.size) { + st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim)); // initialize the empty slots - for(ssize_t i = (ssize_t)dims - 1; i >= (ssize_t)st->pluginsd.size ;i--) - st->pluginsd.rda[i] = NULL; + for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) { + st->pluginsd.prd_array[i].rda = NULL; + st->pluginsd.prd_array[i].rd = NULL; + st->pluginsd.prd_array[i].id = NULL; + } - st->pluginsd.size = dims; + st->pluginsd.size = wanted_size; } - st->pluginsd.pos = 0; - parser->user.st = st; + if(st->pluginsd.dims_with_slots) { + struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1]; - return true; + if(prd->rd != rd) { + prd->rda = rrddim_find_and_acquire(st, string2str(rd->id)); + prd->rd = rrddim_acquired_to_rrddim(prd->rda); + prd->id = string2str(prd->rd->id); + } + + if(obsolete) + parser->user.cleanup_slots = true; + } } -static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { +static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) { if (unlikely(!dimension || !*dimension)) { netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", rrdhost_hostname(host), rrdset_id(st), cmd); return NULL; } - if(unlikely(st->pluginsd.pos >= st->pluginsd.size)) - st->pluginsd.pos = 0; + if (unlikely(!st->pluginsd.size)) { + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.", + rrdhost_hostname(host), rrdset_id(st), cmd); + return NULL; + } + + struct pluginsd_rrddim *prd; + RRDDIM *rd; + + if(likely(st->pluginsd.dims_with_slots)) { + // caching with slots + + if(unlikely(slot < 1 || slot > st->pluginsd.size)) { + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.", + rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size); + return NULL; + } - RRDDIM_ACQUIRED *rda = st->pluginsd.rda[st->pluginsd.pos]; + prd = &st->pluginsd.prd_array[slot - 1]; - if(likely(rda)) { - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); - if (likely(rd && string_strcmp(rd->id, dimension) == 0)) { - // we found a cached RDA - st->pluginsd.pos++; + rd = prd->rd; + if(likely(rd)) { +#ifdef NETDATA_INTERNAL_CHECKS + if(strcmp(prd->id, dimension) != 0) { + ssize_t t; + for(t = 0; t < st->pluginsd.size ;t++) { + if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0) + break; + } + if(t >= st->pluginsd.size) + t = -1; + + internal_fatal(true, + "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', " + "the right slot is %zd", + dimension, slot, prd->id, t); + } +#endif return rd; } - else { - // the collector is sending dimensions in a different order - // release the previous one, to reuse this slot - rrddim_acquired_release(rda); - st->pluginsd.rda[st->pluginsd.pos] = NULL; + } + else { + // caching without slots + + if(unlikely(st->pluginsd.pos >= st->pluginsd.size)) + st->pluginsd.pos = 0; + + prd = &st->pluginsd.prd_array[st->pluginsd.pos++]; + + rd = prd->rd; + if(likely(rd)) { + const char *id = prd->id; + + if(strcmp(id, dimension) == 0) { + // we found it cached + return rd; + } + else { + // the cached one is not good for us + rrddim_acquired_release(prd->rda); + prd->rda = NULL; + prd->rd = NULL; + prd->id = NULL; + } } } - rda = rrddim_find_and_acquire(st, dimension); + // we need to find the dimension and set it to prd + + RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); if (unlikely(!rda)) { netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", - rrdhost_hostname(host), rrdset_id(st), dimension, cmd); + rrdhost_hostname(host), rrdset_id(st), dimension, cmd); return NULL; } - st->pluginsd.rda[st->pluginsd.pos++] = rda; + prd->rda = rda; + prd->rd = rd = rrddim_acquired_to_rrddim(rda); + prd->id = string2str(rd->id); - return rrddim_acquired_to_rrddim(rda); + return rd; } static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { @@ -248,20 +322,89 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons return st; } +static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) { + ssize_t slot = -1; + char *id = get_word(words, num_words, 1); + if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] && + id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') { + slot = (ssize_t) str2ull_encoded(&id[5]); + if(slot < 0) slot = 0; // to make the caller increment its idx of the words + } + + return slot; +} + +static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) { + // clean possible old cached data + rrdset_pluginsd_receive_unslot(st); + + if(unlikely(slot < 1 || slot >= INT32_MAX)) + return; + + RRDHOST *host = st->rrdhost; + + if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) { + spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock); + size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size; + size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2; + + if(new_slots < (size_t)slot) + new_slots = slot; + + host->rrdpush.receive.pluginsd_chart_slots.array = + reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *)); + + for(size_t i = old_slots; i < new_slots ;i++) + host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL; + + host->rrdpush.receive.pluginsd_chart_slots.size = new_slots; + spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock); + } + + host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st; + st->pluginsd.last_slot = (int32_t)slot - 1; + parser->user.cleanup_slots = obsolete; +} + +static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) { + if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) + return pluginsd_find_chart(host, id, keyword); + + RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1]; + + if(!st) { + st = pluginsd_find_chart(host, id, keyword); + if(st) + pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)); + } + else { + internal_fatal(string_strcmp(st->id, id) != 0, + "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'", + slot - 1, id, string2str(st->id)); + } + + return st; +} + static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) { parser->user.enabled = 0; if(keyword && msg) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO, + "PLUGINSD: keyword %s: %s", keyword, msg); } return PARSER_RC_ERROR; } static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) { - char *dimension = get_word(words, num_words, 1); - char *value = get_word(words, num_words, 2); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *value = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -269,7 +412,7 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET); if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); st->pluginsd.set = true; @@ -285,13 +428,17 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par } static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) { - char *id = get_word(words, num_words, 1); - char *microseconds_txt = get_word(words, num_words, 2); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *microseconds_txt = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); + RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN); if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN)) @@ -332,8 +479,9 @@ static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *p } static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) { - UNUSED(words); - UNUSED(num_words); + char *tv_sec = get_word(words, num_words, 1); + char *tv_usec = get_word(words, num_words, 2); + char *pending_rrdset_next = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -347,9 +495,15 @@ static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *par pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END); parser->user.data_collections_count++; - struct timeval now; - now_realtime_timeval(&now); - rrdset_timed_done(st, now, /* pending_rrdset_next = */ false); + struct timeval tv = { + .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0, + .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0 + }; + + if(!tv.tv_sec) + now_realtime_timeval(&tv); + + rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false); return PARSER_RC_OK; } @@ -419,30 +573,29 @@ static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, si return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); RRDHOST *host = rrdhost_find_or_create( - string2str(parser->user.host_define.hostname), - string2str(parser->user.host_define.hostname), - parser->user.host_define.machine_guid_str, - "Netdata Virtual Host 1.0", - netdata_configured_timezone, - netdata_configured_abbrev_timezone, - netdata_configured_utc_offset, - NULL, - program_name, - program_version, - default_rrd_update_every, - default_rrd_history_entries, - default_rrd_memory_mode, - default_health_enabled, - default_rrdpush_enabled, - default_rrdpush_destination, - default_rrdpush_api_key, - default_rrdpush_send_charts_matching, - default_rrdpush_enable_replication, - default_rrdpush_seconds_to_replicate, - default_rrdpush_replication_step, - rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels), - false - ); + string2str(parser->user.host_define.hostname), + string2str(parser->user.host_define.hostname), + parser->user.host_define.machine_guid_str, + "Netdata Virtual Host 1.0", + netdata_configured_timezone, + netdata_configured_abbrev_timezone, + netdata_configured_utc_offset, + NULL, + program_name, + program_version, + default_rrd_update_every, + default_rrd_history_entries, + default_rrd_memory_mode, + default_health_enabled, + default_rrdpush_enabled, + default_rrdpush_destination, + default_rrdpush_api_key, + default_rrdpush_send_charts_matching, + default_rrdpush_enable_replication, + default_rrdpush_seconds_to_replicate, + default_rrdpush_replication_step, + rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels), + false); rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST); @@ -492,18 +645,22 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - char *type = get_word(words, num_words, 1); - char *name = get_word(words, num_words, 2); - char *title = get_word(words, num_words, 3); - char *units = get_word(words, num_words, 4); - char *family = get_word(words, num_words, 5); - char *context = get_word(words, num_words, 6); - char *chart = get_word(words, num_words, 7); - char *priority_s = get_word(words, num_words, 8); - char *update_every_s = get_word(words, num_words, 9); - char *options = get_word(words, num_words, 10); - char *plugin = get_word(words, num_words, 11); - char *module = get_word(words, num_words, 12); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *type = get_word(words, num_words, idx++); + char *name = get_word(words, num_words, idx++); + char *title = get_word(words, num_words, idx++); + char *units = get_word(words, num_words, idx++); + char *family = get_word(words, num_words, idx++); + char *context = get_word(words, num_words, idx++); + char *chart = get_word(words, num_words, idx++); + char *priority_s = get_word(words, num_words, idx++); + char *update_every_s = get_word(words, num_words, idx++); + char *options = get_word(words, num_words, idx++); + char *plugin = get_word(words, num_words, idx++); + char *module = get_word(words, num_words, idx++); // parse the id from type char *id = NULL; @@ -570,14 +727,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p module, priority, update_every, chart_type); + bool obsolete = false; if (likely(st)) { if (options && *options) { if (strstr(options, "obsolete")) { - pluginsd_rrdset_cleanup(st); - rrdset_is_obsolete(st); + rrdset_is_obsolete___safe_from_collector_thread(st); + obsolete = true; } else - rrdset_isnot_obsolete(st); + rrdset_isnot_obsolete___safe_from_collector_thread(st); if (strstr(options, "detail")) rrdset_flag_set(st, RRDSET_FLAG_DETAIL); @@ -595,13 +753,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } else { - rrdset_isnot_obsolete(st); + rrdset_isnot_obsolete___safe_from_collector_thread(st); rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); + + pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete); } else pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART); @@ -652,12 +812,16 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w } static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) { - char *id = get_word(words, num_words, 1); - char *name = get_word(words, num_words, 2); - char *algorithm = get_word(words, num_words, 3); - char *multiplier_s = get_word(words, num_words, 4); - char *divisor_s = get_word(words, num_words, 5); - char *options = get_word(words, num_words, 6); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *name = get_word(words, num_words, idx++); + char *algorithm = get_word(words, num_words, idx++); + char *multiplier_s = get_word(words, num_words, idx++); + char *divisor_s = get_word(words, num_words, idx++); + char *options = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -696,11 +860,14 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE int unhide_dimension = 1; rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + bool obsolete = false; if (options && *options) { - if (strstr(options, "obsolete") != NULL) - rrddim_is_obsolete(st, rd); + if (strstr(options, "obsolete") != NULL) { + obsolete = true; + rrddim_is_obsolete___safe_from_collector_thread(st, rd); + } else - rrddim_isnot_obsolete(st, rd); + rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); unhide_dimension = !strstr(options, "hidden"); @@ -708,8 +875,9 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); if (strstr(options, "nooverflow") != NULL) rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); - } else - rrddim_isnot_obsolete(st, rd); + } + else + rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); bool should_update_dimension = false; @@ -727,6 +895,8 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); } + pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete); + return PARSER_RC_OK; } @@ -759,7 +929,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void const char *transaction = dictionary_acquired_item_name(item); char buffer[2048 + 1]; - snprintfz(buffer, 2048, "%s %s %d \"%s\"\n", + snprintfz(buffer, sizeof(buffer) - 1, "%s %s %d \"%s\"\n", pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION", transaction, pf->timeout, @@ -932,7 +1102,7 @@ void pluginsd_function_cancel(void *data) { internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction); char buffer[2048 + 1]; - snprintfz(buffer, 2048, "%s %s\n", + snprintfz(buffer, sizeof(buffer) - 1, "%s %s\n", PLUGINSD_KEYWORD_FUNCTION_CANCEL, transaction); @@ -947,7 +1117,8 @@ void pluginsd_function_cancel(void *data) { dfe_done(t); if(sent <= 0) - netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); } // this is the function that is called from @@ -1247,6 +1418,15 @@ static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *p if(unlikely(!(parser->user.new_host_labels))) parser->user.new_host_labels = rrdlabels_create(); + if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) { + int is_ephemeral = appconfig_test_boolean_value((char *) value); + if (is_ephemeral) { + RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL); + if (likely(host)) + rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST); + } + } + rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source)); if (allocated_store) @@ -1265,6 +1445,8 @@ static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t n host->rrdlabels = rrdlabels_create(); rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels); + if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST)) + rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); rrdlabels_destroy(parser->user.new_host_labels); @@ -1311,16 +1493,21 @@ static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdset_metadata_updated(st); + parser->user.chart_rrdlabels_linked_temporarily = NULL; return PARSER_RC_OK; } static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) { - char *id = get_word(words, num_words, 1); - char *start_time_str = get_word(words, num_words, 2); - char *end_time_str = get_word(words, num_words, 3); - char *child_now_str = get_word(words, num_words, 4); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *start_time_str = get_word(words, num_words, idx++); + char *end_time_str = get_word(words, num_words, idx++); + char *child_now_str = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1329,7 +1516,7 @@ static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PA if (likely(!id || !*id)) st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); else - st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); + st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1444,9 +1631,13 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str } static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) { - char *dimension = get_word(words, num_words, 1); - char *value_str = get_word(words, num_words, 2); - char *flags_str = get_word(words, num_words, 3); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *value_str = get_word(words, num_words, idx++); + char *flags_str = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1455,15 +1646,16 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(!parser->user.replay.rset_enabled) { - error_limit_static_thread_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", - rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); + nd_log_limit_static_thread_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR, + "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", + rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); // we have to return OK here return PARSER_RC_OK; } - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_SET); if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); st->pluginsd.set = true; @@ -1504,8 +1696,10 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS rd->collector.counter++; } else { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.", + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING, + "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. " + "Ignoring data.", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd)); } } @@ -1517,11 +1711,15 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si if(parser->user.replay.rset_enabled == false) return PARSER_RC_OK; - char *dimension = get_word(words, num_words, 1); - char *last_collected_ut_str = get_word(words, num_words, 2); - char *last_collected_value_str = get_word(words, num_words, 3); - char *last_calculated_value_str = get_word(words, num_words, 4); - char *last_stored_value_str = get_word(words, num_words, 5); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *last_collected_ut_str = get_word(words, num_words, idx++); + char *last_collected_value_str = get_word(words, num_words, idx++); + char *last_calculated_value_str = get_word(words, num_words, idx++); + char *last_stored_value_str = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1535,7 +1733,7 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si st->pluginsd.set = false; } - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec; @@ -1699,10 +1897,14 @@ static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARS static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); - char *id = get_word(words, num_words, 1); - char *update_every_str = get_word(words, num_words, 2); - char *end_time_str = get_word(words, num_words, 3); - char *wall_clock_time_str = get_word(words, num_words, 4); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *update_every_str = get_word(words, num_words, idx++); + char *end_time_str = get_word(words, num_words, idx++); + char *wall_clock_time_str = get_word(words, num_words, idx++); if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str)) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); @@ -1712,14 +1914,15 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER timing_step(TIMING_STEP_BEGIN2_PREPARE); - RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2); + RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) - rrdset_isnot_obsolete(st); + rrdset_isnot_obsolete___safe_from_collector_thread(st); timing_step(TIMING_STEP_BEGIN2_FIND_CHART); @@ -1759,9 +1962,12 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time); if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) { - // check if receiver and sender have the same number parsing capabilities + // check receiver capabilities bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); - NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + + // check sender capabilities + bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; BUFFER *wb = parser->user.v2.stream_buffer.wb; @@ -1770,28 +1976,35 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER if(unlikely(parser->user.v2.stream_buffer.begin_v2_added)) buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "' ", 2); if(can_copy) buffer_strcat(wb, update_every_str); else - buffer_print_uint64_encoded(wb, encoding, update_every); + buffer_print_uint64_encoded(wb, integer_encoding, update_every); buffer_fast_strcat(wb, " ", 1); if(can_copy) buffer_strcat(wb, end_time_str); else - buffer_print_uint64_encoded(wb, encoding, end_time); + buffer_print_uint64_encoded(wb, integer_encoding, end_time); buffer_fast_strcat(wb, " ", 1); if(can_copy) buffer_strcat(wb, wall_clock_time_str); else - buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); @@ -1824,10 +2037,14 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); - char *dimension = get_word(words, num_words, 1); - char *collected_str = get_word(words, num_words, 2); - char *value_str = get_word(words, num_words, 3); - char *flags_str = get_word(words, num_words, 4); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *collected_str = get_word(words, num_words, idx++); + char *value_str = get_word(words, num_words, idx++); + char *flags_str = get_word(words, num_words, idx++); if(unlikely(!dimension || !collected_str || !value_str || !flags_str)) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); @@ -1840,13 +2057,13 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER * timing_step(TIMING_STEP_SET2_PREPARE); - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2); if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); st->pluginsd.set = true; if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED))) - rrddim_isnot_obsolete(st, rd); + rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION); @@ -1892,12 +2109,22 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER * if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) { // check if receiver and sender have the same number parsing capabilities bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); + + // check the sender capabilities + bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; BUFFER *wb = parser->user.v2.stream_buffer.wb; buffer_need_bytes(wb, 1024); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); if(can_copy) @@ -1978,13 +2205,27 @@ static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_ // ------------------------------------------------------------------------ // cleanup RRDSET / RRDDIM - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - rd->collector.calculated_value = 0; - rd->collector.collected_value = 0; - rrddim_clear_updated(rd); + if(likely(st->pluginsd.dims_with_slots)) { + for(size_t i = 0; i < st->pluginsd.size ;i++) { + RRDDIM *rd = st->pluginsd.prd_array[i].rd; + + if(!rd) + continue; + + rd->collector.calculated_value = 0; + rd->collector.collected_value = 0; + rrddim_clear_updated(rd); + } + } + else { + RRDDIM *rd; + rrddim_foreach_read(rd, st){ + rd->collector.calculated_value = 0; + rd->collector.collected_value = 0; + rrddim_clear_updated(rd); + } + rrddim_foreach_done(rd); } - rrddim_foreach_done(rd); // ------------------------------------------------------------------------ // reset state @@ -2438,24 +2679,35 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si } static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) { - if (atol(words[3]) < 0) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags"); - dyncfg_job_flg_t flags = atol(words[3]); + const char *module_name = words[0]; + const char *job_name = words[1]; + const char *job_type_str = words[2]; + const char *flags_str = words[3]; + + long f = str2l(flags_str); + + if (f < 0) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags received"); + + dyncfg_job_flg_t flags = f; + if (SERVING_PLUGINSD(parser)) flags |= JOB_FLG_PLUGIN_PUSHED; else flags |= JOB_FLG_STREAMING_PUSHED; - enum job_type job_type = str2job_type(words[2]); + enum job_type job_type = dyncfg_str2job_type(job_type_str); if (job_type == JOB_TYPE_UNKNOWN) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type"); + if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)"); - if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job + if (register_job(parser->user.host->configurable_plugins, plugin_name, module_name, job_name, job_type, flags, 0)) // ignore existing is off as this is explicitly called register job return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job"); - rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags); + rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, module_name, job_name, job_type, flags); + return PARSER_RC_OK; } @@ -2474,6 +2726,25 @@ static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_ return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]); } +static inline PARSER_RC pluginsd_dyncfg_reset(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + if (unlikely(num_words != (SERVING_PLUGINSD(parser) ? 1 : 2))) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, SERVING_PLUGINSD(parser) ? "expected 0 parameters" : "expected 1 parameter: plugin_name"); + + if (SERVING_PLUGINSD(parser)) { + unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item); + rrdpush_send_dyncfg_reset(parser->user.host, parser->user.cd->configuration->name); + parser->user.cd->configuration = NULL; + } else { + const DICTIONARY_ITEM *di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); + if (unlikely(di == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, "plugin not found"); + unregister_plugin(parser->user.host->configurable_plugins, di); + rrdpush_send_dyncfg_reset(parser->user.host, words[1]); + } + + return PARSER_RC_OK; +} + static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) { int state = str2i(words[3]); @@ -2482,7 +2753,7 @@ static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_word return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status"); char *message = NULL; - if (num_words == 5) + if (num_words == 5 && strlen(words[4]) > 0) message = words[4]; const DICTIONARY_ITEM *plugin_item; @@ -2584,70 +2855,49 @@ static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PAR // ---------------------------------------------------------------------------- -static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) { -#ifdef NETDATA_INTERNAL_CHECKS - if(reader->read_buffer[reader->read_len] != '\0') - fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); -#endif - - ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1); - if(unlikely(bytes_read <= 0)) - return false; - - reader->read_len += bytes_read; - reader->read_buffer[reader->read_len] = '\0'; - - return true; -} - -static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) { - errno = 0; - struct pollfd fds[1]; +void pluginsd_process_thread_cleanup(void *ptr) { + PARSER *parser = (PARSER *)ptr; - fds[0].fd = fd; - fds[0].events = POLLIN; + pluginsd_cleanup_v2(parser); + pluginsd_host_define_cleanup(parser); - int ret = poll(fds, 1, timeout_ms); + rrd_collector_finished(); - if (ret > 0) { - /* There is data to read */ - if (fds[0].revents & POLLIN) - return buffered_reader_read(reader, fd); +#ifdef NETDATA_LOG_STREAM_RECEIVE + if(parser->user.stream_log_fp) { + fclose(parser->user.stream_log_fp); + parser->user.stream_log_fp = NULL; + } +#endif - else if(fds[0].revents & POLLERR) { - netdata_log_error("PARSER: read failed: POLLERR."); - return false; - } - else if(fds[0].revents & POLLHUP) { - netdata_log_error("PARSER: read failed: POLLHUP."); - return false; - } - else if(fds[0].revents & POLLNVAL) { - netdata_log_error("PARSER: read failed: POLLNVAL."); - return false; - } + parser_destroy(parser); +} - netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set."); - return false; - } - else if (ret == 0) { - netdata_log_error("PARSER: timeout while waiting for data."); +bool parser_reconstruct_node(BUFFER *wb, void *ptr) { + PARSER *parser = ptr; + if(!parser || !parser->user.host) return false; - } - netdata_log_error("PARSER: poll() failed with code %d.", ret); - return false; + buffer_strcat(wb, rrdhost_hostname(parser->user.host)); + return true; } -void pluginsd_process_thread_cleanup(void *ptr) { - PARSER *parser = (PARSER *)ptr; +bool parser_reconstruct_instance(BUFFER *wb, void *ptr) { + PARSER *parser = ptr; + if(!parser || !parser->user.st) + return false; - pluginsd_cleanup_v2(parser); - pluginsd_host_define_cleanup(parser); + buffer_strcat(wb, rrdset_name(parser->user.st)); + return true; +} - rrd_collector_finished(); +bool parser_reconstruct_context(BUFFER *wb, void *ptr) { + PARSER *parser = ptr; + if(!parser || !parser->user.st) + return false; - parser_destroy(parser); + buffer_strcat(wb, string2str(parser->user.st->context)); + return true; } inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) @@ -2697,33 +2947,51 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - buffered_reader_init(&parser->reader); - BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); - while(likely(service_running(SERVICE_COLLECTORS))) { - if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { - if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC))) - break; - - continue; - } - - if(unlikely(parser_action(parser, buffer->buffer))) - break; - - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - buffer_free(buffer); - - cd->unsafe.enabled = parser->user.enabled; - count = parser->user.data_collections_count; - - if (likely(count)) { - cd->successful_collections += count; - cd->serial_failures = 0; - } - else - cd->serial_failures++; + { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), + ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), + ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), + ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + buffered_reader_init(&parser->reader); + BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); + while(likely(service_running(SERVICE_COLLECTORS))) { + + if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { + buffered_reader_ret_t ret = buffered_reader_read_timeout( + &parser->reader, + fileno((FILE *) parser->fp_input), + 2 * 60 * MSEC_PER_SEC, true + ); + + if(unlikely(ret != BUFFERED_READER_READ_OK)) + break; + + continue; + } + + if(unlikely(parser_action(parser, buffer->buffer))) + break; + + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + buffer_free(buffer); + + cd->unsafe.enabled = parser->user.enabled; + count = parser->user.data_collections_count; + + if(likely(count)) { + cd->successful_collections += count; + cd->serial_failures = 0; + } + else + cd->serial_failures++; + } // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -2855,6 +3123,9 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, case 103: return pluginsd_register_job(words, num_words, parser); + case 104: + return pluginsd_dyncfg_reset(words, num_words, parser); + case 110: return pluginsd_job_status(words, num_words, parser); diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 74767569b..1fce9a89a 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -11,8 +11,11 @@ #define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) // this controls the max response size of a function -#define PLUGINSD_MAX_DEFERRED_SIZE (20 * 1024 * 1024) +#define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024) +#define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024 + +#define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral" // PARSER return codes typedef enum __attribute__ ((__packed__)) parser_rc { PARSER_RC_OK, // Callback was successful, go on @@ -28,6 +31,7 @@ typedef enum __attribute__ ((__packed__)) parser_input_type { typedef enum __attribute__ ((__packed__)) { PARSER_INIT_PLUGINSD = (1 << 1), PARSER_INIT_STREAMING = (1 << 2), + PARSER_REP_METADATA = (1 << 3), } PARSER_REPERTOIRE; struct parser; @@ -41,6 +45,7 @@ typedef struct parser_keyword { } PARSER_KEYWORD; typedef struct parser_user_object { + bool cleanup_slots; RRDSET *st; RRDHOST *host; void *opaque; @@ -51,6 +56,11 @@ typedef struct parser_user_object { size_t data_collections_count; int enabled; +#ifdef NETDATA_LOG_STREAM_RECEIVE + FILE *stream_log_fp; + PARSER_REPERTOIRE stream_log_repertoire; +#endif + STREAM_CAPABILITIES capabilities; // receiver capabilities struct { @@ -88,17 +98,21 @@ typedef struct parser { PARSER_REPERTOIRE repertoire; uint32_t flags; int fd; // Socket - size_t line; FILE *fp_input; // Input source e.g. stream FILE *fp_output; // Stream to send commands to plugin #ifdef ENABLE_HTTPS NETDATA_SSL *ssl_output; #endif +#ifdef ENABLE_H2O + void *h2o_ctx; // if set we use h2o_stream functions to send data +#endif PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls struct buffered_reader reader; + struct line_splitter line; + PARSER_KEYWORD *keyword; struct { const char *end_keyword; @@ -150,8 +164,17 @@ static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *co return NULL; } +bool parser_reconstruct_node(BUFFER *wb, void *ptr); +bool parser_reconstruct_instance(BUFFER *wb, void *ptr); +bool parser_reconstruct_context(BUFFER *wb, void *ptr); + static inline int parser_action(PARSER *parser, char *input) { - parser->line++; +#ifdef NETDATA_LOG_STREAM_RECEIVE + static __thread char line[PLUGINSD_LINE_MAX + 1]; + strncpyz(line, input, sizeof(line) - 1); +#endif + + parser->line.count++; if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { char command[100 + 1]; @@ -183,18 +206,25 @@ static inline int parser_action(PARSER *parser, char *input) { return 0; } - static __thread char *words[PLUGINSD_MAX_WORDS]; - size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); - const char *command = get_word(words, num_words, 0); + parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS); + const char *command = get_word(parser->line.words, parser->line.num_words, 0); - if(unlikely(!command)) + if(unlikely(!command)) { + line_splitter_reset(&parser->line); return 0; + } PARSER_RC rc; - PARSER_KEYWORD *t = parser_find_keyword(parser, command); - if(likely(t)) { - worker_is_busy(t->worker_job_id); - rc = parser_execute(parser, t, words, num_words); + parser->keyword = parser_find_keyword(parser, command); + if(likely(parser->keyword)) { + worker_is_busy(parser->keyword->worker_job_id); + +#ifdef NETDATA_LOG_STREAM_RECEIVE + if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire) + fprintf(parser->user.stream_log_fp, "%s", line); +#endif + + rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words); // rc = (*t->func)(words, num_words, parser); worker_is_idle(); } @@ -202,22 +232,13 @@ static inline int parser_action(PARSER *parser, char *input) { rc = PARSER_RC_ERROR; if(rc == PARSER_RC_ERROR) { - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); - for(size_t i = 0; i < num_words ;i++) { - if(i) buffer_fast_strcat(wb, " ", 1); - - buffer_fast_strcat(wb, "\"", 1); - const char *s = get_word(words, num_words, i); - buffer_strcat(wb, s?s:""); - buffer_fast_strcat(wb, "\"", 1); - } - + CLEAN_BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + line_splitter_reconstruct_line(wb, &parser->line); netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", - command, parser->line, buffer_tostring(wb)); - - buffer_free(wb); + command, parser->line.count, buffer_tostring(wb)); } + line_splitter_reset(&parser->line); return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); } |