summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c651
1 files changed, 531 insertions, 120 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index bc265a3af..68667c785 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,6 +4,9 @@
#define LOG_FUNCTIONS false
+#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING)
+#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD)
+
static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
@@ -353,7 +356,7 @@ static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *par
static void pluginsd_host_define_cleanup(PARSER *parser) {
string_freez(parser->user.host_define.hostname);
- dictionary_destroy(parser->user.host_define.rrdlabels);
+ rrdlabels_destroy(parser->user.host_define.rrdlabels);
parser->user.host_define.hostname = NULL;
parser->user.host_define.rrdlabels = NULL;
@@ -390,17 +393,17 @@ static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PAR
return PARSER_RC_OK;
}
-static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) {
+static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) {
char *name = get_word(words, num_words, 1);
char *value = get_word(words, num_words, 2);
if(!name || !*name || !value)
return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
- if(!parser->user.host_define.parsing_host || !dict)
+ if(!parser->user.host_define.parsing_host || !labels)
return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
- rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
+ rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG);
return PARSER_RC_OK;
}
@@ -733,14 +736,16 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
struct inflight_function {
int code;
int timeout;
- BUFFER *destination_wb;
STRING *function;
- void (*callback)(BUFFER *wb, int code, void *callback_data);
- void *callback_data;
+ BUFFER *result_body_wb;
+ rrd_function_result_callback_t result_cb;
+ void *result_cb_data;
usec_t timeout_ut;
usec_t started_ut;
usec_t sent_ut;
const char *payload;
+ PARSER *parser;
+ bool virtual;
};
static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
@@ -751,42 +756,44 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
// leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
+ const char *transaction = dictionary_acquired_item_name(item);
+
char buffer[2048 + 1];
snprintfz(buffer, 2048, "%s %s %d \"%s\"\n",
pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
- dictionary_acquired_item_name(item),
+ transaction,
pf->timeout,
string2str(pf->function));
// send the command to the plugin
- int ret = send_to_plugin(buffer, parser);
+ ssize_t ret = send_to_plugin(buffer, parser);
pf->sent_ut = now_realtime_usec();
if(ret < 0) {
- netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret);
- rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
+ netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret);
+ rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
}
else {
internal_error(LOG_FUNCTIONS,
- "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
+ "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
string2str(pf->function), dictionary_acquired_item_name(item), ret,
pf->sent_ut - pf->started_ut);
}
if (!pf->payload)
return;
-
+
// send the payload to the plugin
ret = send_to_plugin(pf->payload, parser);
if(ret < 0) {
- netdata_log_error("FUNCTION_PAYLOAD: failed to send function to plugin, error %d", ret);
- rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
+ netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret);
+ rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
}
else {
internal_error(LOG_FUNCTIONS,
- "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
+ "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
string2str(pf->function), dictionary_acquired_item_name(item), ret,
pf->sent_ut - pf->started_ut);
}
@@ -798,23 +805,90 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m
struct inflight_function *pf = new_func;
netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
- pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
- pf->callback(pf->destination_wb, pf->code, pf->callback_data);
+ pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
+ pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
string_freez(pf->function);
return false;
}
-static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
+void delete_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug, const char *fnc_sig, int code) {
+ if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
+ return;
+
+ char *params_local = strdupz(fnc_sig);
+ char *words[DYNCFG_MAX_WORDS];
+ size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
+
+ if (words_c != 3) {
+ netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job");
+ freez(params_local);
+ return;
+ }
+
+ const char *module = words[1];
+ const char *job = words[2];
+
+ delete_job(plug, module, job);
+
+ unlink_job(plug->name, module, job);
+
+ rrdpush_send_job_deleted(localhost, plug->name, module, job);
+
+ freez(params_local);
+}
+
+void set_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug __maybe_unused, const char *fnc_sig, int code) {
+ if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
+ return;
+
+ char *params_local = strdupz(fnc_sig);
+ char *words[DYNCFG_MAX_WORDS];
+ size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
+
+ if (words_c != 3) {
+ netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config");
+ freez(params_local);
+ return;
+ }
+
+ const char *module_name = get_word(words, words_c, 1);
+ const char *job_name = get_word(words, words_c, 2);
+
+ if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) {
+ freez(params_local);
+ return;
+ }
+
+ // only send this if it is not existing already (register_job cares for that)
+ rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED);
+
+ freez(params_local);
+}
+
+static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
struct inflight_function *pf = func;
+ struct parser *parser = (struct parser *)parser_ptr;
internal_error(LOG_FUNCTIONS,
- "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
+ "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
string2str(pf->function), dictionary_acquired_item_name(item),
- buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
+ buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
+
+ if (pf->virtual && SERVING_PLUGINSD(parser)) {
+ if (pf->payload) {
+ if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0)
+ set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
+ dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration);
+ } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) {
+ delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
+ }
+ }
+
+ pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
- pf->callback(pf->destination_wb, pf->code, pf->callback_data);
string_freez(pf->function);
+ freez((void *)pf->payload);
}
void inflight_functions_init(PARSER *parser) {
@@ -830,11 +904,11 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
dfe_start_write(parser->inflight.functions, pf) {
if (pf->timeout_ut < now) {
internal_error(true,
- "FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
+ "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.",
string2str(pf->function), pf_dfe.name, now - pf->started_ut);
- if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
- pf->code = rrd_call_function_error(pf->destination_wb,
+ if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK)
+ pf->code = rrd_call_function_error(pf->result_body_wb,
"Timeout waiting for collector response.",
HTTP_RESP_GATEWAY_TIMEOUT);
@@ -847,35 +921,73 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
dfe_done(pf);
}
+void pluginsd_function_cancel(void *data) {
+ struct inflight_function *look_for = data, *t;
+
+ bool sent = false;
+ dfe_start_read(look_for->parser->inflight.functions, t) {
+ if(look_for == t) {
+ const char *transaction = t_dfe.name;
+
+ internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
+
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, "%s %s\n",
+ PLUGINSD_KEYWORD_FUNCTION_CANCEL,
+ transaction);
+
+ // send the command to the plugin
+ ssize_t ret = send_to_plugin(buffer, t->parser);
+ if(ret < 0)
+ sent = true;
+
+ break;
+ }
+ }
+ dfe_done(t);
+
+ if(sent <= 0)
+ netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
+}
+
// this is the function that is called from
// rrd_call_function_and_wait() and rrd_call_function_async()
-static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
- PARSER *parser = collector_data;
+static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function,
+ void *execute_cb_data,
+ rrd_function_result_callback_t result_cb, void *result_cb_data,
+ rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused,
+ void *is_cancelled_cb_data __maybe_unused,
+ rrd_function_register_canceller_cb_t register_canceller_cb,
+ void *register_canceller_db_data) {
+ PARSER *parser = execute_cb_data;
usec_t now = now_realtime_usec();
struct inflight_function tmp = {
.started_ut = now,
- .timeout_ut = now + timeout * USEC_PER_SEC,
- .destination_wb = destination_wb,
+ .timeout_ut = now + timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT,
+ .result_body_wb = result_body_wb,
.timeout = timeout,
.function = string_strdupz(function),
- .callback = callback,
- .callback_data = callback_data,
- .payload = NULL
+ .result_cb = result_cb,
+ .result_cb_data = result_cb_data,
+ .payload = NULL,
+ .parser = parser,
};
uuid_t uuid;
- uuid_generate_time(uuid);
+ uuid_generate_random(uuid);
- char key[UUID_STR_LEN];
- uuid_unparse_lower(uuid, key);
+ char transaction[UUID_STR_LEN];
+ uuid_unparse_lower(uuid, transaction);
dictionary_write_lock(parser->inflight.functions);
// if there is any error, our dictionary callbacks will call the caller callback to notify
// the caller about the error - no need for error handling here.
- dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
+ void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function));
+ if(register_canceller_cb)
+ register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t);
if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
parser->inflight.smaller_timeout = tmp.timeout_ut;
@@ -890,6 +1002,8 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou
}
static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
+ // a plugin or a child is registering a function
+
bool global = false;
size_t i = 1;
if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
@@ -926,7 +1040,7 @@ static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER
timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
}
- rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
+ rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser);
parser->user.data_collections_count++;
@@ -973,18 +1087,18 @@ static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_
}
else {
if(format && *format)
- pf->destination_wb->content_type = functions_format_to_content_type(format);
+ pf->result_body_wb->content_type = functions_format_to_content_type(format);
pf->code = code;
- pf->destination_wb->expires = expiration;
+ pf->result_body_wb->expires = expiration;
if(expiration <= now_realtime_sec())
- buffer_no_cacheable(pf->destination_wb);
+ buffer_no_cacheable(pf->result_body_wb);
else
- buffer_cacheable(pf->destination_wb);
+ buffer_cacheable(pf->result_body_wb);
}
- parser->defer.response = (pf) ? pf->destination_wb : NULL;
+ parser->defer.response = (pf) ? pf->result_body_wb : NULL;
parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
parser->defer.action = pluginsd_function_result_end;
parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
@@ -1163,7 +1277,7 @@ static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *
const char *value = get_word(words, num_words, 2);
const char *label_source = get_word(words, num_words, 3);
- if (!name || !value || !*label_source) {
+ if (!name || !value || !label_source) {
netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
}
@@ -1604,7 +1718,7 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED)))
+ if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
rrdset_isnot_obsolete(st);
timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
@@ -1894,7 +2008,7 @@ struct mutex_cond {
int rc;
};
-static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data)
+static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *callback_data)
{
struct mutex_cond *ctx = callback_data;
pthread_mutex_lock(&ctx->lock);
@@ -1904,9 +2018,81 @@ static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data)
}
#define VIRT_FNC_TIMEOUT 1
+#define VIRT_FNC_BUF_SIZE (4096)
+void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) {
+ PARSER *parser = NULL;
+
+ //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all)
+ char *words[PLUGINSD_MAX_WORDS];
+ char *function_with_params = strdupz(name);
+ size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd);
+
+ if (num_words < 2) {
+ netdata_log_error("PLUGINSD: virtual function name is empty.");
+ freez(function_with_params);
+ return;
+ }
+
+ const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1));
+ if (unlikely(cpi == NULL)) {
+ netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name);
+ freez(function_with_params);
+ return;
+ }
+ struct configurable_plugin *cp = dictionary_acquired_item_value(cpi);
+ parser = (PARSER *)cp->cb_usr_ctx;
+
+ BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
+ // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name)
+ buffer_strcat(function_out, get_word(words, num_words, 0));
+ for (size_t i = 1; i < num_words; i++) {
+ if (i == 1 && SERVING_PLUGINSD(parser))
+ continue;
+ buffer_sprintf(function_out, " %s", get_word(words, num_words, i));
+ }
+ freez(function_with_params);
+
+ usec_t now = now_realtime_usec();
+
+ struct inflight_function tmp = {
+ .started_ut = now,
+ .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
+ .result_body_wb = wb,
+ .timeout = VIRT_FNC_TIMEOUT * 10,
+ .function = string_strdupz(buffer_tostring(function_out)),
+ .result_cb = callback,
+ .result_cb_data = callback_data,
+ .payload = payload != NULL ? strdupz(payload) : NULL,
+ .virtual = true,
+ };
+ buffer_free(function_out);
+
+ uuid_t uuid;
+ uuid_generate_time(uuid);
+
+ char key[UUID_STR_LEN];
+ uuid_unparse_lower(uuid, key);
+
+ dictionary_write_lock(parser->inflight.functions);
+
+ // if there is any error, our dictionary callbacks will call the caller callback to notify
+ // the caller about the error - no need for error handling here.
+ dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
+
+ if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = tmp.timeout_ut;
+
+ // garbage collect stale inflight functions
+ if(parser->inflight.smaller_timeout < now)
+ inflight_functions_garbage_collect(parser, now);
+
+ dictionary_write_unlock(parser->inflight.functions);
+}
+
+
dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) {
usec_t now = now_realtime_usec();
- BUFFER *wb = buffer_create(4096, NULL);
+ BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
struct mutex_cond cond = {
.lock = PTHREAD_MUTEX_INITIALIZER,
@@ -1916,12 +2102,13 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name,
struct inflight_function tmp = {
.started_ut = now,
.timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
- .destination_wb = wb,
+ .result_body_wb = wb,
.timeout = VIRT_FNC_TIMEOUT,
.function = string_strdupz(name),
- .callback = virt_fnc_got_data_cb,
- .callback_data = &cond,
- .payload = payload,
+ .result_cb = virt_fnc_got_data_cb,
+ .result_cb_data = &cond,
+ .payload = payload != NULL ? strdupz(payload) : NULL,
+ .virtual = true,
};
uuid_t uuid;
@@ -1968,98 +2155,188 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name,
return cfg;
}
-static dyncfg_config_t get_plugin_config_cb(void *usr_ctx)
+#define CVF_MAX_LEN (1024)
+static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name)
{
PARSER *parser = usr_ctx;
- return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL);
+
+ if (SERVING_STREAMING(parser)) {
+ char buf[CVF_MAX_LEN + 1];
+ snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ }
+
+ return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL);
}
-static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx)
+static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name)
{
PARSER *parser = usr_ctx;
+
+ if (SERVING_STREAMING(parser)) {
+ char buf[CVF_MAX_LEN + 1];
+ snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ }
+
return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL);
}
-static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_module_config %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG);
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA);
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name)
+static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg)
+enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
int rc;
- call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data);
- if(rc != 1)
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
+
+ buffer_free(wb);
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg)
+enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "set_module_config %s", module_name);
- call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
+enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name);
- call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name)
+enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name);
- call_virtual_function_blocking(parser, buf, &rc, NULL);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
@@ -2079,37 +2356,65 @@ static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, si
cfg->get_config_schema_cb = get_plugin_config_schema_cb;
cfg->cb_usr_ctx = parser;
- parser->user.cd->cfg_dict_item = register_plugin(cfg);
-
- if (unlikely(parser->user.cd->cfg_dict_item == NULL)) {
+ const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser));
+ if (unlikely(di == NULL)) {
freez(cfg->name);
freez(cfg);
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin");
}
- parser->user.cd->configuration = cfg;
+ if (SERVING_PLUGINSD(parser)) {
+ // this is optimization for pluginsd to avoid extra dictionary lookup
+ // as we know which plugin is comunicating with us
+ parser->user.cd->cfg_dict_item = di;
+ parser->user.cd->configuration = cfg;
+ } else {
+ // register_plugin keeps the item acquired, so we need to release it
+ dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
+ }
+
+ rrdpush_send_dyncfg_enable(parser->user.host, cfg->name);
+
return PARSER_RC_OK;
}
+#define LOG_MSG_SIZE (1024)
+#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2)
+#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3)
static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE");
- struct configurable_plugin *plug_cfg = parser->user.cd->configuration;
- if (unlikely(plug_cfg == NULL))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
-
- if (unlikely(num_words != 3))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type");
+ size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4;
+
+ if (unlikely(num_words != expected_num_words)) {
+ char log[LOG_MSG_SIZE + 1];
+ snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log);
+ }
+
+ struct configurable_plugin *plug_cfg;
+ const DICTIONARY_ITEM *di = NULL;
+ if (SERVING_PLUGINSD(parser)) {
+ plug_cfg = parser->user.cd->configuration;
+ if (unlikely(plug_cfg == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
+ } else {
+ di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
+ if (unlikely(di == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found");
+
+ plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di);
+ }
struct module *mod = callocz(1, sizeof(struct module));
- mod->type = str2_module_type(words[2]);
+ mod->type = str2_module_type(words[MODULE_TYPE_IDX]);
if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) {
freez(mod);
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)");
}
- mod->name = strdupz(words[1]);
+ mod->name = strdupz(words[MODULE_NAME_IDX]);
mod->set_config_cb = set_module_config_cb;
mod->get_config_cb = get_module_config_cb;
@@ -2122,27 +2427,111 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si
mod->delete_job_cb = delete_job_cb;
mod->job_config_cb_usr_ctx = parser;
- register_module(plug_cfg, mod);
+ register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser));
+
+ if (di != NULL)
+ dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
+
+ rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type);
+
return PARSER_RC_OK;
}
-// job_status <module_name> <job_name> <status_code> <state> <message>
-static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser)
-{
- if (unlikely(num_words != 6 && num_words != 5))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
+static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) {
+ if (atol(words[3]) < 0)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags");
+ dyncfg_job_flg_t flags = atol(words[3]);
+ if (SERVING_PLUGINSD(parser))
+ flags |= JOB_FLG_PLUGIN_PUSHED;
+ else
+ flags |= JOB_FLG_STREAMING_PUSHED;
- int state = atoi(words[4]);
+ enum job_type job_type = str2job_type(words[2]);
+ if (job_type == JOB_TYPE_UNKNOWN)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type");
+ if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)");
- enum job_status job_status = str2job_state(words[3]);
- if (unlikely(job_status == JOB_STATUS_UNKNOWN))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state");
+ if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job");
+
+ rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags);
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6;
+
+ if (unlikely(num_words != expected_num_words)) {
+ char log[LOG_MSG_SIZE + 1];
+ snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log);
+ }
+
+ if (SERVING_PLUGINSD(parser)) {
+ return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
+ }
+ return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]);
+}
+
+static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) {
+ int state = str2i(words[3]);
+
+ enum job_status status = str2job_state(words[2]);
+ if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status");
char *message = NULL;
- if (num_words == 6)
- message = strdupz(words[5]);
+ if (num_words == 5)
+ message = words[4];
+
+ const DICTIONARY_ITEM *plugin_item;
+ DICTIONARY *job_dict;
+ const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message);
+
+ if (job_item != NULL) {
+ struct job *job = dictionary_acquired_item_value(job_item);
+ rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job);
+
+ pthread_mutex_unlock(&job->lock);
+ dictionary_acquired_item_release(job_dict, job_item);
+ dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item);
+ }
+
+ return PARSER_RC_OK;
+}
- report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message);
+// job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message]
+static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) {
+ if (SERVING_PLUGINSD(parser)) {
+ if (unlikely(num_words != 5 && num_words != 6))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
+ } else {
+ if (unlikely(num_words != 6 && num_words != 7))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]");
+ }
+
+ if (SERVING_PLUGINSD(parser)) {
+ return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
+ }
+ return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]);
+}
+
+static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) {
+ // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function
+ // they are of opossite direction
+ if (num_words != 4)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name");
+
+ const char *plugin_name = get_word(words, num_words, 1);
+ const char *module_name = get_word(words, num_words, 2);
+ const char *job_name = get_word(words, num_words, 3);
+
+ if (SERVING_STREAMING(parser))
+ delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name);
+
+ // forward to parent if any
+ rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name);
return PARSER_RC_OK;
}
@@ -2309,15 +2698,22 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
buffered_reader_init(&parser->reader);
- char buffer[PLUGINSD_LINE_MAX + 2];
+ BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
while(likely(service_running(SERVICE_COLLECTORS))) {
- if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) {
+ if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
break;
+
+ continue;
}
- else if(unlikely(parser_action(parser, buffer)))
+
+ if(unlikely(parser_action(parser, buffer->buffer)))
break;
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
}
+ buffer_free(buffer);
cd->unsafe.enabled = parser->user.enabled;
count = parser->user.data_collections_count;
@@ -2452,10 +2848,19 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words,
case 101:
return pluginsd_register_plugin(words, num_words, parser);
-
+
case 102:
return pluginsd_register_module(words, num_words, parser);
+ case 103:
+ return pluginsd_register_job(words, num_words, parser);
+
+ case 110:
+ return pluginsd_job_status(words, num_words, parser);
+
+ case 111:
+ return pluginsd_delete_job(words, num_words, parser);
+
default:
fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
}
@@ -2472,14 +2877,20 @@ void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
}
}
+static void parser_destroy_dyncfg(PARSER *parser) {
+ if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
+ unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item);
+ parser->user.cd->configuration = NULL;
+ } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){
+ dictionary_flush(parser->user.host->configurable_plugins);
+ }
+}
+
void parser_destroy(PARSER *parser) {
if (unlikely(!parser))
return;
- if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
- unregister_plugin(parser->user.cd->cfg_dict_item);
- parser->user.cd->configuration = NULL;
- }
+ parser_destroy_dyncfg(parser);
dictionary_destroy(parser->inflight.functions);
freez(parser);