summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/gperf-config.txt69
-rw-r--r--collectors/plugins.d/gperf-hashtable.h178
-rw-r--r--collectors/plugins.d/local_listeners.c54
-rw-r--r--collectors/plugins.d/plugins_d.c83
-rw-r--r--collectors/plugins.d/plugins_d.h5
-rw-r--r--collectors/plugins.d/pluginsd_parser.c771
-rw-r--r--collectors/plugins.d/pluginsd_parser.h69
7 files changed, 794 insertions, 435 deletions
diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt
index a1d0c51ba..bad51367c 100644
--- a/collectors/plugins.d/gperf-config.txt
+++ b/collectors/plugins.d/gperf-config.txt
@@ -12,46 +12,47 @@ PARSER_KEYWORD;
#
# Plugins Only Keywords
#
-FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1
-DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2
-EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3
-HOST, 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4
-HOST_DEFINE, 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5
-HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6
-HOST_LABEL, 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7
+FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1
+DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2
+EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3
+HOST, 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4
+HOST_DEFINE, 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5
+HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6
+HOST_LABEL, 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7
#
# Common keywords
#
-BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8
-CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9
-CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10
-CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11
-DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12
-END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13
-FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14
-FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15
-LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16
-OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17
-SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18
-VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19
-DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20
-DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21
-DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22
-REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
-DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
+BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8
+CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9
+CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10
+CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11
+DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12
+END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13
+FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14
+FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15
+LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16
+OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17
+SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18
+VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19
+DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20
+DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21
+DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22
+DYNCFG_RESET, 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
+REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
+DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
#
# Streaming only keywords
#
-CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
-BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26
-SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
-END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
+CLAIMED_ID, 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26
+BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
+SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
+END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
#
# Streaming Replication keywords
#
-CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
-RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30
-RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31
-REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32
-RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33
-RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34
+CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30
+RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31
+RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32
+REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33
+RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34
+RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35
diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h
index 5bbf9fa98..b327d8d6d 100644
--- a/collectors/plugins.d/gperf-hashtable.h
+++ b/collectors/plugins.d/gperf-hashtable.h
@@ -30,12 +30,12 @@
#endif
-#define GPERF_PARSER_TOTAL_KEYWORDS 34
+#define GPERF_PARSER_TOTAL_KEYWORDS 35
#define GPERF_PARSER_MIN_WORD_LENGTH 3
#define GPERF_PARSER_MAX_WORD_LENGTH 22
#define GPERF_PARSER_MIN_HASH_VALUE 3
-#define GPERF_PARSER_MAX_HASH_VALUE 36
-/* maximum key range = 34, duplicates = 0 */
+#define GPERF_PARSER_MAX_HASH_VALUE 47
+/* maximum key range = 45, duplicates = 0 */
#ifdef __GNUC__
__inline
@@ -49,32 +49,32 @@ gperf_keyword_hash_function (register const char *str, register size_t len)
{
static unsigned char asso_values[] =
{
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 12, 28, 5, 2, 0,
- 0, 37, 3, 13, 37, 37, 14, 37, 0, 2,
- 37, 37, 1, 3, 37, 6, 10, 37, 32, 2,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
- 37, 37, 37, 37, 37, 37
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 11, 18, 0, 0, 0,
+ 6, 48, 9, 0, 48, 48, 20, 48, 0, 8,
+ 48, 48, 1, 12, 48, 20, 18, 48, 2, 0,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48, 48, 48, 48, 48,
+ 48, 48, 48, 48, 48, 48
};
return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]];
}
@@ -83,73 +83,79 @@ static PARSER_KEYWORD gperf_keywords[] =
{
{(char*)0}, {(char*)0}, {(char*)0},
#line 30 "gperf-config.txt"
- {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13},
-#line 48 "gperf-config.txt"
- {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
+ {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13},
+#line 49 "gperf-config.txt"
+ {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
+#line 56 "gperf-config.txt"
+ {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33},
+#line 17 "gperf-config.txt"
+ {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3},
+#line 16 "gperf-config.txt"
+ {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
#line 55 "gperf-config.txt"
- {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32},
+ {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32},
+#line 29 "gperf-config.txt"
+ {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12},
+#line 42 "gperf-config.txt"
+ {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25},
+ {(char*)0},
+#line 40 "gperf-config.txt"
+ {"DYNCFG_RESET", 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
+#line 37 "gperf-config.txt"
+ {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20},
+#line 26 "gperf-config.txt"
+ {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9},
#line 35 "gperf-config.txt"
- {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18},
-#line 47 "gperf-config.txt"
- {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
-#line 56 "gperf-config.txt"
- {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33},
-#line 18 "gperf-config.txt"
- {"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4},
-#line 54 "gperf-config.txt"
- {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31},
+ {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18},
+#line 48 "gperf-config.txt"
+ {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
#line 57 "gperf-config.txt"
- {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34},
+ {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34},
#line 41 "gperf-config.txt"
- {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
-#line 26 "gperf-config.txt"
- {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9},
-#line 31 "gperf-config.txt"
- {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14},
+ {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
+#line 39 "gperf-config.txt"
+ {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22},
+#line 58 "gperf-config.txt"
+ {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35},
+#line 18 "gperf-config.txt"
+ {"HOST", 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4},
+#line 38 "gperf-config.txt"
+ {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21},
+#line 25 "gperf-config.txt"
+ {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
+#line 47 "gperf-config.txt"
+ {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
+#line 54 "gperf-config.txt"
+ {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31},
+#line 27 "gperf-config.txt"
+ {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10},
#line 21 "gperf-config.txt"
- {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7},
+ {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7},
#line 19 "gperf-config.txt"
- {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5},
-#line 37 "gperf-config.txt"
- {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20},
-#line 40 "gperf-config.txt"
- {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
+ {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5},
+#line 53 "gperf-config.txt"
+ {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30},
+#line 46 "gperf-config.txt"
+ {"CLAIMED_ID", 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26},
#line 15 "gperf-config.txt"
- {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
+ {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
#line 20 "gperf-config.txt"
- {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6},
+ {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6},
+#line 28 "gperf-config.txt"
+ {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11},
+#line 31 "gperf-config.txt"
+ {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14},
#line 34 "gperf-config.txt"
- {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17},
-#line 16 "gperf-config.txt"
- {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
-#line 39 "gperf-config.txt"
- {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22},
-#line 29 "gperf-config.txt"
- {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12},
-#line 27 "gperf-config.txt"
- {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10},
-#line 38 "gperf-config.txt"
- {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21},
-#line 32 "gperf-config.txt"
- {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15},
-#line 52 "gperf-config.txt"
- {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
-#line 45 "gperf-config.txt"
- {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25},
-#line 36 "gperf-config.txt"
- {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19},
+ {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17},
#line 33 "gperf-config.txt"
- {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16},
-#line 28 "gperf-config.txt"
- {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11},
-#line 25 "gperf-config.txt"
- {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
-#line 46 "gperf-config.txt"
- {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26},
-#line 53 "gperf-config.txt"
- {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30},
-#line 17 "gperf-config.txt"
- {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}
+ {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16},
+#line 36 "gperf-config.txt"
+ {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19},
+ {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0},
+ {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0},
+ {(char*)0},
+#line 32 "gperf-config.txt"
+ {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15}
};
PARSER_KEYWORD *
diff --git a/collectors/plugins.d/local_listeners.c b/collectors/plugins.d/local_listeners.c
index a39de7974..f2c5e688b 100644
--- a/collectors/plugins.d/local_listeners.c
+++ b/collectors/plugins.d/local_listeners.c
@@ -338,25 +338,59 @@ bool read_proc_net_x(const char *filename, PROC_NET_PROTOCOLS protocol) {
}
// ----------------------------------------------------------------------------
-
-int main(int argc __maybe_unused, char **argv __maybe_unused) {
+typedef struct {
+ bool read_tcp;
+ bool read_tcp6;
+ bool read_udp;
+ bool read_udp6;
+} CommandLineArguments;
+
+int main(int argc, char **argv) {
char path[FILENAME_MAX + 1];
hashTable_key_inode_port_value = createHashTable();
netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
if(!netdata_configured_host_prefix) netdata_configured_host_prefix = "";
- snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp", netdata_configured_host_prefix);
- read_proc_net_x(path, PROC_NET_PROTOCOL_TCP);
+ CommandLineArguments args = {.read_tcp = false, .read_tcp6 = false, .read_udp = false, .read_udp6 = false};
+
+ for (int i = 1; i < argc; i++) {
+ if (strcmp("tcp", argv[i]) == 0) {
+ args.read_tcp = true;
+ continue;
+ } else if (strcmp("tcp6", argv[i]) == 0) {
+ args.read_tcp6 = true;
+ continue;
+ } else if (strcmp("udp", argv[i]) == 0) {
+ args.read_udp = true;
+ continue;
+ } else if (strcmp("udp6", argv[i]) == 0) {
+ args.read_udp6 = true;
+ continue;
+ }
+ }
+
+ bool read_all_files = (!args.read_tcp && !args.read_tcp6 && !args.read_udp && !args.read_udp6);
- snprintfz(path, FILENAME_MAX, "%s/proc/net/udp", netdata_configured_host_prefix);
- read_proc_net_x(path, PROC_NET_PROTOCOL_UDP);
+ if (read_all_files || args.read_tcp) {
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_TCP);
+ }
- snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp6", netdata_configured_host_prefix);
- read_proc_net_x(path, PROC_NET_PROTOCOL_TCP6);
+ if (read_all_files || args.read_udp) {
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/udp", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_UDP);
+ }
- snprintfz(path, FILENAME_MAX, "%s/proc/net/udp6", netdata_configured_host_prefix);
- read_proc_net_x(path, PROC_NET_PROTOCOL_UDP6);
+ if (read_all_files || args.read_tcp6) {
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/tcp6", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_TCP6);
+ }
+
+ if (read_all_files || args.read_udp6) {
+ snprintfz(path, FILENAME_MAX, "%s/proc/net/udp6", netdata_configured_host_prefix);
+ read_proc_net_x(path, PROC_NET_PROTOCOL_UDP6);
+ }
snprintfz(path, FILENAME_MAX, "%s/proc", netdata_configured_host_prefix);
find_all_sockets_in_proc(path);
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 08c26a198..20061ad29 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -47,8 +47,7 @@ static inline bool plugin_is_running(struct plugind *cd) {
return ret;
}
-static void pluginsd_worker_thread_cleanup(void *arg)
-{
+static void pluginsd_worker_thread_cleanup(void *arg) {
struct plugind *cd = (struct plugind *)arg;
worker_unregister();
@@ -143,41 +142,64 @@ static void *pluginsd_worker_thread(void *arg) {
netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg);
- struct plugind *cd = (struct plugind *)arg;
- plugin_set_running(cd);
+ {
+ struct plugind *cd = (struct plugind *) arg;
+ plugin_set_running(cd);
- size_t count = 0;
+ size_t count = 0;
- while (service_running(SERVICE_COLLECTORS)) {
- FILE *fp_child_input = NULL;
- FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
+ while(service_running(SERVICE_COLLECTORS)) {
+ FILE *fp_child_input = NULL;
+ FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
- if (unlikely(!fp_child_input || !fp_child_output)) {
- netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd);
- break;
- }
+ if(unlikely(!fp_child_input || !fp_child_output)) {
+ netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").",
+ rrdhost_hostname(cd->host), cd->cmd);
+ break;
+ }
- netdata_log_info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
- rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
+ rrdhost_hostname(cd->host),
+ cd->fullfilename, cd->unsafe.pid);
- count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
+ const char *plugin = strrchr(cd->fullfilename, '/');
+ if(plugin)
+ plugin++;
+ else
+ plugin = cd->fullfilename;
- netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
- rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
+ char module[100];
+ snprintfz(module, sizeof(module), "plugins.d[%s]", plugin);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_MODULE, module),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rrdhost_hostname(cd->host)),
+ ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "pluginsd"),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- killpid(cd->unsafe.pid);
+ count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
- int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
- if (likely(worker_ret_code == 0))
- pluginsd_worker_thread_handle_success(cd);
- else
- pluginsd_worker_thread_handle_error(cd, worker_ret_code);
+ killpid(cd->unsafe.pid);
- cd->unsafe.pid = 0;
- if (unlikely(!plugin_is_enabled(cd)))
- break;
- }
+ int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
+
+ if(likely(worker_ret_code == 0))
+ pluginsd_worker_thread_handle_success(cd);
+ else
+ pluginsd_worker_thread_handle_error(cd, worker_ret_code);
+
+ cd->unsafe.pid = 0;
+
+ if(unlikely(!plugin_is_enabled(cd)))
+ break;
+ }
+ }
netdata_thread_cleanup_pop(1);
return NULL;
@@ -217,6 +239,13 @@ void *pluginsd_main(void *ptr)
// disable some plugins by default
config_get_boolean(CONFIG_SECTION_PLUGINS, "slabinfo", CONFIG_BOOLEAN_NO);
+ config_get_boolean(CONFIG_SECTION_PLUGINS, "logs-management",
+#if defined(LOGS_MANAGEMENT_DEV_MODE)
+ CONFIG_BOOLEAN_YES
+#else
+ CONFIG_BOOLEAN_NO
+#endif
+ );
// it crashes (both threads) on Alpine after we made it multi-threaded
// works with "--device /dev/ipmi0", but this is not default
// see https://github.com/netdata/netdata/pull/15564 for details
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 7c5df4168..37c70f7e3 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -16,14 +16,11 @@
#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE"
#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE"
#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB "DYNCFG_REGISTER_JOB"
+#define PLUGINSD_KEYWORD_DYNCFG_RESET "DYNCFG_RESET"
#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS"
#define PLUGINSD_KEYWORD_DELETE_JOB "DELETE_JOB"
-#define PLUGINSD_LINE_MAX_SSL_READ 512
-
-#define PLUGINSD_MAX_WORDS 20
-
#define PLUGINSD_MAX_DIRECTORIES 20
extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES];
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 2e69c7da5..3b47c6c0f 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,8 +4,8 @@
#define LOG_FUNCTIONS false
-#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING)
-#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD)
+#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING)
+#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD)
static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
@@ -13,6 +13,11 @@ static ssize_t send_to_plugin(const char *txt, void *data) {
if(!txt || !*txt)
return 0;
+#ifdef ENABLE_H2O
+ if(parser->h2o_ctx)
+ return h2o_stream_write(parser->h2o_ctx, txt, strlen(txt));
+#endif
+
errno = 0;
spinlock_lock(&parser->writer.spinlock);
ssize_t bytes = -1;
@@ -110,23 +115,6 @@ static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
return false;
}
-void pluginsd_rrdset_cleanup(RRDSET *st) {
- spinlock_lock(&st->pluginsd.spinlock);
-
- for(size_t i = 0; i < st->pluginsd.size ; i++) {
- rrddim_acquired_release(st->pluginsd.rda[i]); // can be NULL
- st->pluginsd.rda[i] = NULL;
- }
-
- freez(st->pluginsd.rda);
- st->pluginsd.collector_tid = 0;
- st->pluginsd.rda = NULL;
- st->pluginsd.size = 0;
- st->pluginsd.pos = 0;
-
- spinlock_unlock(&st->pluginsd.spinlock);
-}
-
static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
if(stale)
@@ -150,7 +138,12 @@ static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const ch
static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
pluginsd_unlock_previous_scope_chart(parser, keyword, true);
+
+ if(parser->user.cleanup_slots && parser->user.st)
+ rrdset_pluginsd_receive_unslot(parser->user.st);
+
parser->user.st = NULL;
+ parser->user.cleanup_slots = false;
}
static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
@@ -160,11 +153,12 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch
if(unlikely(old_collector_tid)) {
if(old_collector_tid != my_collector_tid) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
- keyword ? keyword : "UNKNOWN",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- my_collector_tid, old_collector_tid);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
+ keyword ? keyword : "UNKNOWN",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ my_collector_tid, old_collector_tid);
return false;
}
@@ -176,61 +170,141 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch
pluginsd_clear_scope_chart(parser, keyword);
- size_t dims = dictionary_entries(st->rrddim_root_index);
- if(unlikely(st->pluginsd.size < dims)) {
- st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
+ st->pluginsd.pos = 0;
+ parser->user.st = st;
+ parser->user.cleanup_slots = false;
+
+ return true;
+}
+
+static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) {
+ size_t wanted_size = st->pluginsd.size;
+
+ if(slot >= 1) {
+ st->pluginsd.dims_with_slots = true;
+ wanted_size = slot;
+ }
+ else {
+ st->pluginsd.dims_with_slots = false;
+ wanted_size = dictionary_entries(st->rrddim_root_index);
+ }
+
+ if(wanted_size > st->pluginsd.size) {
+ st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim));
// initialize the empty slots
- for(ssize_t i = (ssize_t)dims - 1; i >= (ssize_t)st->pluginsd.size ;i--)
- st->pluginsd.rda[i] = NULL;
+ for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) {
+ st->pluginsd.prd_array[i].rda = NULL;
+ st->pluginsd.prd_array[i].rd = NULL;
+ st->pluginsd.prd_array[i].id = NULL;
+ }
- st->pluginsd.size = dims;
+ st->pluginsd.size = wanted_size;
}
- st->pluginsd.pos = 0;
- parser->user.st = st;
+ if(st->pluginsd.dims_with_slots) {
+ struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1];
- return true;
+ if(prd->rd != rd) {
+ prd->rda = rrddim_find_and_acquire(st, string2str(rd->id));
+ prd->rd = rrddim_acquired_to_rrddim(prd->rda);
+ prd->id = string2str(prd->rd->id);
+ }
+
+ if(obsolete)
+ parser->user.cleanup_slots = true;
+ }
}
-static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
+static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) {
if (unlikely(!dimension || !*dimension)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
- if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
- st->pluginsd.pos = 0;
+ if (unlikely(!st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
+ }
+
+ struct pluginsd_rrddim *prd;
+ RRDDIM *rd;
+
+ if(likely(st->pluginsd.dims_with_slots)) {
+ // caching with slots
+
+ if(unlikely(slot < 1 || slot > st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.",
+ rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size);
+ return NULL;
+ }
- RRDDIM_ACQUIRED *rda = st->pluginsd.rda[st->pluginsd.pos];
+ prd = &st->pluginsd.prd_array[slot - 1];
- if(likely(rda)) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
- if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
- // we found a cached RDA
- st->pluginsd.pos++;
+ rd = prd->rd;
+ if(likely(rd)) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(strcmp(prd->id, dimension) != 0) {
+ ssize_t t;
+ for(t = 0; t < st->pluginsd.size ;t++) {
+ if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0)
+ break;
+ }
+ if(t >= st->pluginsd.size)
+ t = -1;
+
+ internal_fatal(true,
+ "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', "
+ "the right slot is %zd",
+ dimension, slot, prd->id, t);
+ }
+#endif
return rd;
}
- else {
- // the collector is sending dimensions in a different order
- // release the previous one, to reuse this slot
- rrddim_acquired_release(rda);
- st->pluginsd.rda[st->pluginsd.pos] = NULL;
+ }
+ else {
+ // caching without slots
+
+ if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
+ st->pluginsd.pos = 0;
+
+ prd = &st->pluginsd.prd_array[st->pluginsd.pos++];
+
+ rd = prd->rd;
+ if(likely(rd)) {
+ const char *id = prd->id;
+
+ if(strcmp(id, dimension) == 0) {
+ // we found it cached
+ return rd;
+ }
+ else {
+ // the cached one is not good for us
+ rrddim_acquired_release(prd->rda);
+ prd->rda = NULL;
+ prd->rd = NULL;
+ prd->id = NULL;
+ }
}
}
- rda = rrddim_find_and_acquire(st, dimension);
+ // we need to find the dimension and set it to prd
+
+ RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
if (unlikely(!rda)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
- rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
return NULL;
}
- st->pluginsd.rda[st->pluginsd.pos++] = rda;
+ prd->rda = rda;
+ prd->rd = rd = rrddim_acquired_to_rrddim(rda);
+ prd->id = string2str(rd->id);
- return rrddim_acquired_to_rrddim(rda);
+ return rd;
}
static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
@@ -248,20 +322,89 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons
return st;
}
+static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) {
+ ssize_t slot = -1;
+ char *id = get_word(words, num_words, 1);
+ if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] &&
+ id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') {
+ slot = (ssize_t) str2ull_encoded(&id[5]);
+ if(slot < 0) slot = 0; // to make the caller increment its idx of the words
+ }
+
+ return slot;
+}
+
+static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) {
+ // clean possible old cached data
+ rrdset_pluginsd_receive_unslot(st);
+
+ if(unlikely(slot < 1 || slot >= INT32_MAX))
+ return;
+
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) {
+ spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size;
+ size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2;
+
+ if(new_slots < (size_t)slot)
+ new_slots = slot;
+
+ host->rrdpush.receive.pluginsd_chart_slots.array =
+ reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *));
+
+ for(size_t i = old_slots; i < new_slots ;i++)
+ host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL;
+
+ host->rrdpush.receive.pluginsd_chart_slots.size = new_slots;
+ spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ }
+
+ host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st;
+ st->pluginsd.last_slot = (int32_t)slot - 1;
+ parser->user.cleanup_slots = obsolete;
+}
+
+static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) {
+ if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size))
+ return pluginsd_find_chart(host, id, keyword);
+
+ RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1];
+
+ if(!st) {
+ st = pluginsd_find_chart(host, id, keyword);
+ if(st)
+ pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE));
+ }
+ else {
+ internal_fatal(string_strcmp(st->id, id) != 0,
+ "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'",
+ slot - 1, id, string2str(st->id));
+ }
+
+ return st;
+}
+
static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
parser->user.enabled = 0;
if(keyword && msg) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO,
+ "PLUGINSD: keyword %s: %s", keyword, msg);
}
return PARSER_RC_ERROR;
}
static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
- char *dimension = get_word(words, num_words, 1);
- char *value = get_word(words, num_words, 2);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *value = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -269,7 +412,7 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par
RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET);
if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
st->pluginsd.set = true;
@@ -285,13 +428,17 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par
}
static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
- char *id = get_word(words, num_words, 1);
- char *microseconds_txt = get_word(words, num_words, 2);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *microseconds_txt = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
+ RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN);
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
@@ -332,8 +479,9 @@ static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *p
}
static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
- UNUSED(words);
- UNUSED(num_words);
+ char *tv_sec = get_word(words, num_words, 1);
+ char *tv_usec = get_word(words, num_words, 2);
+ char *pending_rrdset_next = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -347,9 +495,15 @@ static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *par
pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
parser->user.data_collections_count++;
- struct timeval now;
- now_realtime_timeval(&now);
- rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
+ struct timeval tv = {
+ .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0,
+ .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0
+ };
+
+ if(!tv.tv_sec)
+ now_realtime_timeval(&tv);
+
+ rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false);
return PARSER_RC_OK;
}
@@ -419,30 +573,29 @@ static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, si
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
RRDHOST *host = rrdhost_find_or_create(
- string2str(parser->user.host_define.hostname),
- string2str(parser->user.host_define.hostname),
- parser->user.host_define.machine_guid_str,
- "Netdata Virtual Host 1.0",
- netdata_configured_timezone,
- netdata_configured_abbrev_timezone,
- netdata_configured_utc_offset,
- NULL,
- program_name,
- program_version,
- default_rrd_update_every,
- default_rrd_history_entries,
- default_rrd_memory_mode,
- default_health_enabled,
- default_rrdpush_enabled,
- default_rrdpush_destination,
- default_rrdpush_api_key,
- default_rrdpush_send_charts_matching,
- default_rrdpush_enable_replication,
- default_rrdpush_seconds_to_replicate,
- default_rrdpush_replication_step,
- rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
- false
- );
+ string2str(parser->user.host_define.hostname),
+ string2str(parser->user.host_define.hostname),
+ parser->user.host_define.machine_guid_str,
+ "Netdata Virtual Host 1.0",
+ netdata_configured_timezone,
+ netdata_configured_abbrev_timezone,
+ netdata_configured_utc_offset,
+ NULL,
+ program_name,
+ program_version,
+ default_rrd_update_every,
+ default_rrd_history_entries,
+ default_rrd_memory_mode,
+ default_health_enabled,
+ default_rrdpush_enabled,
+ default_rrdpush_destination,
+ default_rrdpush_api_key,
+ default_rrdpush_send_charts_matching,
+ default_rrdpush_enable_replication,
+ default_rrdpush_seconds_to_replicate,
+ default_rrdpush_replication_step,
+ rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
+ false);
rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
@@ -492,18 +645,22 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- char *type = get_word(words, num_words, 1);
- char *name = get_word(words, num_words, 2);
- char *title = get_word(words, num_words, 3);
- char *units = get_word(words, num_words, 4);
- char *family = get_word(words, num_words, 5);
- char *context = get_word(words, num_words, 6);
- char *chart = get_word(words, num_words, 7);
- char *priority_s = get_word(words, num_words, 8);
- char *update_every_s = get_word(words, num_words, 9);
- char *options = get_word(words, num_words, 10);
- char *plugin = get_word(words, num_words, 11);
- char *module = get_word(words, num_words, 12);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *type = get_word(words, num_words, idx++);
+ char *name = get_word(words, num_words, idx++);
+ char *title = get_word(words, num_words, idx++);
+ char *units = get_word(words, num_words, idx++);
+ char *family = get_word(words, num_words, idx++);
+ char *context = get_word(words, num_words, idx++);
+ char *chart = get_word(words, num_words, idx++);
+ char *priority_s = get_word(words, num_words, idx++);
+ char *update_every_s = get_word(words, num_words, idx++);
+ char *options = get_word(words, num_words, idx++);
+ char *plugin = get_word(words, num_words, idx++);
+ char *module = get_word(words, num_words, idx++);
// parse the id from type
char *id = NULL;
@@ -570,14 +727,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p
module, priority, update_every,
chart_type);
+ bool obsolete = false;
if (likely(st)) {
if (options && *options) {
if (strstr(options, "obsolete")) {
- pluginsd_rrdset_cleanup(st);
- rrdset_is_obsolete(st);
+ rrdset_is_obsolete___safe_from_collector_thread(st);
+ obsolete = true;
}
else
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
if (strstr(options, "detail"))
rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
@@ -595,13 +753,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
else {
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete);
}
else
pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
@@ -652,12 +812,16 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w
}
static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
- char *id = get_word(words, num_words, 1);
- char *name = get_word(words, num_words, 2);
- char *algorithm = get_word(words, num_words, 3);
- char *multiplier_s = get_word(words, num_words, 4);
- char *divisor_s = get_word(words, num_words, 5);
- char *options = get_word(words, num_words, 6);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *name = get_word(words, num_words, idx++);
+ char *algorithm = get_word(words, num_words, idx++);
+ char *multiplier_s = get_word(words, num_words, idx++);
+ char *divisor_s = get_word(words, num_words, idx++);
+ char *options = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -696,11 +860,14 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
int unhide_dimension = 1;
rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ bool obsolete = false;
if (options && *options) {
- if (strstr(options, "obsolete") != NULL)
- rrddim_is_obsolete(st, rd);
+ if (strstr(options, "obsolete") != NULL) {
+ obsolete = true;
+ rrddim_is_obsolete___safe_from_collector_thread(st, rd);
+ }
else
- rrddim_isnot_obsolete(st, rd);
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
unhide_dimension = !strstr(options, "hidden");
@@ -708,8 +875,9 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (strstr(options, "nooverflow") != NULL)
rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- } else
- rrddim_isnot_obsolete(st, rd);
+ }
+ else
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
bool should_update_dimension = false;
@@ -727,6 +895,8 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
}
+ pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete);
+
return PARSER_RC_OK;
}
@@ -759,7 +929,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
const char *transaction = dictionary_acquired_item_name(item);
char buffer[2048 + 1];
- snprintfz(buffer, 2048, "%s %s %d \"%s\"\n",
+ snprintfz(buffer, sizeof(buffer) - 1, "%s %s %d \"%s\"\n",
pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
transaction,
pf->timeout,
@@ -932,7 +1102,7 @@ void pluginsd_function_cancel(void *data) {
internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
char buffer[2048 + 1];
- snprintfz(buffer, 2048, "%s %s\n",
+ snprintfz(buffer, sizeof(buffer) - 1, "%s %s\n",
PLUGINSD_KEYWORD_FUNCTION_CANCEL,
transaction);
@@ -947,7 +1117,8 @@ void pluginsd_function_cancel(void *data) {
dfe_done(t);
if(sent <= 0)
- netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
}
// this is the function that is called from
@@ -1247,6 +1418,15 @@ static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *p
if(unlikely(!(parser->user.new_host_labels)))
parser->user.new_host_labels = rrdlabels_create();
+ if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) {
+ int is_ephemeral = appconfig_test_boolean_value((char *) value);
+ if (is_ephemeral) {
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL);
+ if (likely(host))
+ rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
+ }
+ }
+
rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
if (allocated_store)
@@ -1265,6 +1445,8 @@ static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t n
host->rrdlabels = rrdlabels_create();
rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
+ if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST))
+ rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG);
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
rrdlabels_destroy(parser->user.new_host_labels);
@@ -1311,16 +1493,21 @@ static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size
rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_metadata_updated(st);
+
parser->user.chart_rrdlabels_linked_temporarily = NULL;
return PARSER_RC_OK;
}
static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
- char *id = get_word(words, num_words, 1);
- char *start_time_str = get_word(words, num_words, 2);
- char *end_time_str = get_word(words, num_words, 3);
- char *child_now_str = get_word(words, num_words, 4);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *start_time_str = get_word(words, num_words, idx++);
+ char *end_time_str = get_word(words, num_words, idx++);
+ char *child_now_str = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1329,7 +1516,7 @@ static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PA
if (likely(!id || !*id))
st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
else
- st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1444,9 +1631,13 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str
}
static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
- char *dimension = get_word(words, num_words, 1);
- char *value_str = get_word(words, num_words, 2);
- char *flags_str = get_word(words, num_words, 3);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *value_str = get_word(words, num_words, idx++);
+ char *flags_str = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1455,15 +1646,16 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(!parser->user.replay.rset_enabled) {
- error_limit_static_thread_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
- rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ nd_log_limit_static_thread_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR,
+ "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
+ rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
// we have to return OK here
return PARSER_RC_OK;
}
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_SET);
if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
st->pluginsd.set = true;
@@ -1504,8 +1696,10 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS
rd->collector.counter++;
}
else {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. "
+ "Ignoring data.",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
}
}
@@ -1517,11 +1711,15 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si
if(parser->user.replay.rset_enabled == false)
return PARSER_RC_OK;
- char *dimension = get_word(words, num_words, 1);
- char *last_collected_ut_str = get_word(words, num_words, 2);
- char *last_collected_value_str = get_word(words, num_words, 3);
- char *last_calculated_value_str = get_word(words, num_words, 4);
- char *last_stored_value_str = get_word(words, num_words, 5);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *last_collected_ut_str = get_word(words, num_words, idx++);
+ char *last_collected_value_str = get_word(words, num_words, idx++);
+ char *last_calculated_value_str = get_word(words, num_words, idx++);
+ char *last_stored_value_str = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1535,7 +1733,7 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si
st->pluginsd.set = false;
}
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
@@ -1699,10 +1897,14 @@ static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARS
static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
- char *id = get_word(words, num_words, 1);
- char *update_every_str = get_word(words, num_words, 2);
- char *end_time_str = get_word(words, num_words, 3);
- char *wall_clock_time_str = get_word(words, num_words, 4);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *update_every_str = get_word(words, num_words, idx++);
+ char *end_time_str = get_word(words, num_words, idx++);
+ char *wall_clock_time_str = get_word(words, num_words, idx++);
if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
@@ -1712,14 +1914,15 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
timing_step(TIMING_STEP_BEGIN2_PREPARE);
- RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
+ RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2);
+
if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
@@ -1759,9 +1962,12 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
- // check if receiver and sender have the same number parsing capabilities
+ // check receiver capabilities
bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
- NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+
+ // check sender capabilities
+ bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
BUFFER *wb = parser->user.v2.stream_buffer.wb;
@@ -1770,28 +1976,35 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "' ", 2);
if(can_copy)
buffer_strcat(wb, update_every_str);
else
- buffer_print_uint64_encoded(wb, encoding, update_every);
+ buffer_print_uint64_encoded(wb, integer_encoding, update_every);
buffer_fast_strcat(wb, " ", 1);
if(can_copy)
buffer_strcat(wb, end_time_str);
else
- buffer_print_uint64_encoded(wb, encoding, end_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, end_time);
buffer_fast_strcat(wb, " ", 1);
if(can_copy)
buffer_strcat(wb, wall_clock_time_str);
else
- buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
@@ -1824,10 +2037,14 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
- char *dimension = get_word(words, num_words, 1);
- char *collected_str = get_word(words, num_words, 2);
- char *value_str = get_word(words, num_words, 3);
- char *flags_str = get_word(words, num_words, 4);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *collected_str = get_word(words, num_words, idx++);
+ char *value_str = get_word(words, num_words, idx++);
+ char *flags_str = get_word(words, num_words, idx++);
if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
@@ -1840,13 +2057,13 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *
timing_step(TIMING_STEP_SET2_PREPARE);
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2);
if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
st->pluginsd.set = true;
if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
- rrddim_isnot_obsolete(st, rd);
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
@@ -1892,12 +2109,22 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *
if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
// check if receiver and sender have the same number parsing capabilities
bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+
+ // check the sender capabilities
+ bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
BUFFER *wb = parser->user.v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
if(can_copy)
@@ -1978,13 +2205,27 @@ static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_
// ------------------------------------------------------------------------
// cleanup RRDSET / RRDDIM
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- rd->collector.calculated_value = 0;
- rd->collector.collected_value = 0;
- rrddim_clear_updated(rd);
+ if(likely(st->pluginsd.dims_with_slots)) {
+ for(size_t i = 0; i < st->pluginsd.size ;i++) {
+ RRDDIM *rd = st->pluginsd.prd_array[i].rd;
+
+ if(!rd)
+ continue;
+
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
+ }
+ else {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st){
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
+ rrddim_foreach_done(rd);
}
- rrddim_foreach_done(rd);
// ------------------------------------------------------------------------
// reset state
@@ -2438,24 +2679,35 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si
}
static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) {
- if (atol(words[3]) < 0)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags");
- dyncfg_job_flg_t flags = atol(words[3]);
+ const char *module_name = words[0];
+ const char *job_name = words[1];
+ const char *job_type_str = words[2];
+ const char *flags_str = words[3];
+
+ long f = str2l(flags_str);
+
+ if (f < 0)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags received");
+
+ dyncfg_job_flg_t flags = f;
+
if (SERVING_PLUGINSD(parser))
flags |= JOB_FLG_PLUGIN_PUSHED;
else
flags |= JOB_FLG_STREAMING_PUSHED;
- enum job_type job_type = str2job_type(words[2]);
+ enum job_type job_type = dyncfg_str2job_type(job_type_str);
if (job_type == JOB_TYPE_UNKNOWN)
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type");
+
if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER)
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)");
- if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
+ if (register_job(parser->user.host->configurable_plugins, plugin_name, module_name, job_name, job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job");
- rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags);
+ rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, module_name, job_name, job_type, flags);
+
return PARSER_RC_OK;
}
@@ -2474,6 +2726,25 @@ static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_
return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]);
}
+static inline PARSER_RC pluginsd_dyncfg_reset(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ if (unlikely(num_words != (SERVING_PLUGINSD(parser) ? 1 : 2)))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, SERVING_PLUGINSD(parser) ? "expected 0 parameters" : "expected 1 parameter: plugin_name");
+
+ if (SERVING_PLUGINSD(parser)) {
+ unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item);
+ rrdpush_send_dyncfg_reset(parser->user.host, parser->user.cd->configuration->name);
+ parser->user.cd->configuration = NULL;
+ } else {
+ const DICTIONARY_ITEM *di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
+ if (unlikely(di == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, "plugin not found");
+ unregister_plugin(parser->user.host->configurable_plugins, di);
+ rrdpush_send_dyncfg_reset(parser->user.host, words[1]);
+ }
+
+ return PARSER_RC_OK;
+}
+
static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) {
int state = str2i(words[3]);
@@ -2482,7 +2753,7 @@ static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_word
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status");
char *message = NULL;
- if (num_words == 5)
+ if (num_words == 5 && strlen(words[4]) > 0)
message = words[4];
const DICTIONARY_ITEM *plugin_item;
@@ -2584,70 +2855,49 @@ static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PAR
// ----------------------------------------------------------------------------
-static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if(reader->read_buffer[reader->read_len] != '\0')
- fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
-#endif
-
- ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
- if(unlikely(bytes_read <= 0))
- return false;
-
- reader->read_len += bytes_read;
- reader->read_buffer[reader->read_len] = '\0';
-
- return true;
-}
-
-static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
- errno = 0;
- struct pollfd fds[1];
+void pluginsd_process_thread_cleanup(void *ptr) {
+ PARSER *parser = (PARSER *)ptr;
- fds[0].fd = fd;
- fds[0].events = POLLIN;
+ pluginsd_cleanup_v2(parser);
+ pluginsd_host_define_cleanup(parser);
- int ret = poll(fds, 1, timeout_ms);
+ rrd_collector_finished();
- if (ret > 0) {
- /* There is data to read */
- if (fds[0].revents & POLLIN)
- return buffered_reader_read(reader, fd);
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ if(parser->user.stream_log_fp) {
+ fclose(parser->user.stream_log_fp);
+ parser->user.stream_log_fp = NULL;
+ }
+#endif
- else if(fds[0].revents & POLLERR) {
- netdata_log_error("PARSER: read failed: POLLERR.");
- return false;
- }
- else if(fds[0].revents & POLLHUP) {
- netdata_log_error("PARSER: read failed: POLLHUP.");
- return false;
- }
- else if(fds[0].revents & POLLNVAL) {
- netdata_log_error("PARSER: read failed: POLLNVAL.");
- return false;
- }
+ parser_destroy(parser);
+}
- netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
- return false;
- }
- else if (ret == 0) {
- netdata_log_error("PARSER: timeout while waiting for data.");
+bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.host)
return false;
- }
- netdata_log_error("PARSER: poll() failed with code %d.", ret);
- return false;
+ buffer_strcat(wb, rrdhost_hostname(parser->user.host));
+ return true;
}
-void pluginsd_process_thread_cleanup(void *ptr) {
- PARSER *parser = (PARSER *)ptr;
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
- pluginsd_cleanup_v2(parser);
- pluginsd_host_define_cleanup(parser);
+ buffer_strcat(wb, rrdset_name(parser->user.st));
+ return true;
+}
- rrd_collector_finished();
+bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
- parser_destroy(parser);
+ buffer_strcat(wb, string2str(parser->user.st->context));
+ return true;
}
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
@@ -2697,33 +2947,51 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- buffered_reader_init(&parser->reader);
- BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
- while(likely(service_running(SERVICE_COLLECTORS))) {
- if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
- if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
- break;
-
- continue;
- }
-
- if(unlikely(parser_action(parser, buffer->buffer)))
- break;
-
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
-
- cd->unsafe.enabled = parser->user.enabled;
- count = parser->user.data_collections_count;
-
- if (likely(count)) {
- cd->successful_collections += count;
- cd->serial_failures = 0;
- }
- else
- cd->serial_failures++;
+ {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ buffered_reader_init(&parser->reader);
+ BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
+ while(likely(service_running(SERVICE_COLLECTORS))) {
+
+ if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
+ buffered_reader_ret_t ret = buffered_reader_read_timeout(
+ &parser->reader,
+ fileno((FILE *) parser->fp_input),
+ 2 * 60 * MSEC_PER_SEC, true
+ );
+
+ if(unlikely(ret != BUFFERED_READER_READ_OK))
+ break;
+
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer)))
+ break;
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ buffer_free(buffer);
+
+ cd->unsafe.enabled = parser->user.enabled;
+ count = parser->user.data_collections_count;
+
+ if(likely(count)) {
+ cd->successful_collections += count;
+ cd->serial_failures = 0;
+ }
+ else
+ cd->serial_failures++;
+ }
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -2855,6 +3123,9 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words,
case 103:
return pluginsd_register_job(words, num_words, parser);
+ case 104:
+ return pluginsd_dyncfg_reset(words, num_words, parser);
+
case 110:
return pluginsd_job_status(words, num_words, parser);
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index 74767569b..1fce9a89a 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -11,8 +11,11 @@
#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
// this controls the max response size of a function
-#define PLUGINSD_MAX_DEFERRED_SIZE (20 * 1024 * 1024)
+#define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024)
+#define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024
+
+#define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral"
// PARSER return codes
typedef enum __attribute__ ((__packed__)) parser_rc {
PARSER_RC_OK, // Callback was successful, go on
@@ -28,6 +31,7 @@ typedef enum __attribute__ ((__packed__)) parser_input_type {
typedef enum __attribute__ ((__packed__)) {
PARSER_INIT_PLUGINSD = (1 << 1),
PARSER_INIT_STREAMING = (1 << 2),
+ PARSER_REP_METADATA = (1 << 3),
} PARSER_REPERTOIRE;
struct parser;
@@ -41,6 +45,7 @@ typedef struct parser_keyword {
} PARSER_KEYWORD;
typedef struct parser_user_object {
+ bool cleanup_slots;
RRDSET *st;
RRDHOST *host;
void *opaque;
@@ -51,6 +56,11 @@ typedef struct parser_user_object {
size_t data_collections_count;
int enabled;
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ FILE *stream_log_fp;
+ PARSER_REPERTOIRE stream_log_repertoire;
+#endif
+
STREAM_CAPABILITIES capabilities; // receiver capabilities
struct {
@@ -88,17 +98,21 @@ typedef struct parser {
PARSER_REPERTOIRE repertoire;
uint32_t flags;
int fd; // Socket
- size_t line;
FILE *fp_input; // Input source e.g. stream
FILE *fp_output; // Stream to send commands to plugin
#ifdef ENABLE_HTTPS
NETDATA_SSL *ssl_output;
#endif
+#ifdef ENABLE_H2O
+ void *h2o_ctx; // if set we use h2o_stream functions to send data
+#endif
PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
struct buffered_reader reader;
+ struct line_splitter line;
+ PARSER_KEYWORD *keyword;
struct {
const char *end_keyword;
@@ -150,8 +164,17 @@ static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *co
return NULL;
}
+bool parser_reconstruct_node(BUFFER *wb, void *ptr);
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr);
+bool parser_reconstruct_context(BUFFER *wb, void *ptr);
+
static inline int parser_action(PARSER *parser, char *input) {
- parser->line++;
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ static __thread char line[PLUGINSD_LINE_MAX + 1];
+ strncpyz(line, input, sizeof(line) - 1);
+#endif
+
+ parser->line.count++;
if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
char command[100 + 1];
@@ -183,18 +206,25 @@ static inline int parser_action(PARSER *parser, char *input) {
return 0;
}
- static __thread char *words[PLUGINSD_MAX_WORDS];
- size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
- const char *command = get_word(words, num_words, 0);
+ parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(parser->line.words, parser->line.num_words, 0);
- if(unlikely(!command))
+ if(unlikely(!command)) {
+ line_splitter_reset(&parser->line);
return 0;
+ }
PARSER_RC rc;
- PARSER_KEYWORD *t = parser_find_keyword(parser, command);
- if(likely(t)) {
- worker_is_busy(t->worker_job_id);
- rc = parser_execute(parser, t, words, num_words);
+ parser->keyword = parser_find_keyword(parser, command);
+ if(likely(parser->keyword)) {
+ worker_is_busy(parser->keyword->worker_job_id);
+
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire)
+ fprintf(parser->user.stream_log_fp, "%s", line);
+#endif
+
+ rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words);
// rc = (*t->func)(words, num_words, parser);
worker_is_idle();
}
@@ -202,22 +232,13 @@ static inline int parser_action(PARSER *parser, char *input) {
rc = PARSER_RC_ERROR;
if(rc == PARSER_RC_ERROR) {
- BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
- for(size_t i = 0; i < num_words ;i++) {
- if(i) buffer_fast_strcat(wb, " ", 1);
-
- buffer_fast_strcat(wb, "\"", 1);
- const char *s = get_word(words, num_words, i);
- buffer_strcat(wb, s?s:"");
- buffer_fast_strcat(wb, "\"", 1);
- }
-
+ CLEAN_BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
+ line_splitter_reconstruct_line(wb, &parser->line);
netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
- command, parser->line, buffer_tostring(wb));
-
- buffer_free(wb);
+ command, parser->line.count, buffer_tostring(wb));
}
+ line_splitter_reset(&parser->line);
return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
}