diff options
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 651 |
1 files changed, 531 insertions, 120 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index bc265a3af..68667c785 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,6 +4,9 @@ #define LOG_FUNCTIONS false +#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING) +#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD) + static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; @@ -353,7 +356,7 @@ static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *par static void pluginsd_host_define_cleanup(PARSER *parser) { string_freez(parser->user.host_define.hostname); - dictionary_destroy(parser->user.host_define.rrdlabels); + rrdlabels_destroy(parser->user.host_define.rrdlabels); parser->user.host_define.hostname = NULL; parser->user.host_define.rrdlabels = NULL; @@ -390,17 +393,17 @@ static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PAR return PARSER_RC_OK; } -static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) { +static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) { char *name = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); if(!name || !*name || !value) return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters"); - if(!parser->user.host_define.parsing_host || !dict) + if(!parser->user.host_define.parsing_host || !labels) return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); - rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG); + rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG); return PARSER_RC_OK; } @@ -733,14 +736,16 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE struct inflight_function { int code; int timeout; - BUFFER *destination_wb; STRING *function; - void (*callback)(BUFFER *wb, int code, void *callback_data); - void *callback_data; + BUFFER *result_body_wb; + rrd_function_result_callback_t result_cb; + void *result_cb_data; usec_t timeout_ut; usec_t started_ut; usec_t sent_ut; const char *payload; + PARSER *parser; + bool virtual; }; static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { @@ -751,42 +756,44 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller pf->code = HTTP_RESP_GATEWAY_TIMEOUT; + const char *transaction = dictionary_acquired_item_name(item); + char buffer[2048 + 1]; snprintfz(buffer, 2048, "%s %s %d \"%s\"\n", pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION", - dictionary_acquired_item_name(item), + transaction, pf->timeout, string2str(pf->function)); // send the command to the plugin - int ret = send_to_plugin(buffer, parser); + ssize_t ret = send_to_plugin(buffer, parser); pf->sent_ut = now_realtime_usec(); if(ret < 0) { - netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret); - rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); + netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret); + rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); } else { internal_error(LOG_FUNCTIONS, - "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), ret, pf->sent_ut - pf->started_ut); } if (!pf->payload) return; - + // send the payload to the plugin ret = send_to_plugin(pf->payload, parser); if(ret < 0) { - netdata_log_error("FUNCTION_PAYLOAD: failed to send function to plugin, error %d", ret); - rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); + netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret); + rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); } else { internal_error(LOG_FUNCTIONS, - "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), ret, pf->sent_ut - pf->started_ut); } @@ -798,23 +805,90 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m struct inflight_function *pf = new_func; netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); - pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); - pf->callback(pf->destination_wb, pf->code, pf->callback_data); + pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); string_freez(pf->function); return false; } -static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) { +void delete_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job"); + freez(params_local); + return; + } + + const char *module = words[1]; + const char *job = words[2]; + + delete_job(plug, module, job); + + unlink_job(plug->name, module, job); + + rrdpush_send_job_deleted(localhost, plug->name, module, job); + + freez(params_local); +} + +void set_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug __maybe_unused, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config"); + freez(params_local); + return; + } + + const char *module_name = get_word(words, words_c, 1); + const char *job_name = get_word(words, words_c, 2); + + if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) { + freez(params_local); + return; + } + + // only send this if it is not existing already (register_job cares for that) + rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED); + + freez(params_local); +} + +static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) { struct inflight_function *pf = func; + struct parser *parser = (struct parser *)parser_ptr; internal_error(LOG_FUNCTIONS, - "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)", + "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), - buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + + if (pf->virtual && SERVING_PLUGINSD(parser)) { + if (pf->payload) { + if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) + set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration); + } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) { + delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + } + } + + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); - pf->callback(pf->destination_wb, pf->code, pf->callback_data); string_freez(pf->function); + freez((void *)pf->payload); } void inflight_functions_init(PARSER *parser) { @@ -830,11 +904,11 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { dfe_start_write(parser->inflight.functions, pf) { if (pf->timeout_ut < now) { internal_error(true, - "FUNCTION '%s' removing expired transaction '%s', after %llu usec.", + "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.", string2str(pf->function), pf_dfe.name, now - pf->started_ut); - if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK) - pf->code = rrd_call_function_error(pf->destination_wb, + if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK) + pf->code = rrd_call_function_error(pf->result_body_wb, "Timeout waiting for collector response.", HTTP_RESP_GATEWAY_TIMEOUT); @@ -847,35 +921,73 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { dfe_done(pf); } +void pluginsd_function_cancel(void *data) { + struct inflight_function *look_for = data, *t; + + bool sent = false; + dfe_start_read(look_for->parser->inflight.functions, t) { + if(look_for == t) { + const char *transaction = t_dfe.name; + + internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction); + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, "%s %s\n", + PLUGINSD_KEYWORD_FUNCTION_CANCEL, + transaction); + + // send the command to the plugin + ssize_t ret = send_to_plugin(buffer, t->parser); + if(ret < 0) + sent = true; + + break; + } + } + dfe_done(t); + + if(sent <= 0) + netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); +} + // this is the function that is called from // rrd_call_function_and_wait() and rrd_call_function_async() -static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) { - PARSER *parser = collector_data; +static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function, + void *execute_cb_data, + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused, + void *is_cancelled_cb_data __maybe_unused, + rrd_function_register_canceller_cb_t register_canceller_cb, + void *register_canceller_db_data) { + PARSER *parser = execute_cb_data; usec_t now = now_realtime_usec(); struct inflight_function tmp = { .started_ut = now, - .timeout_ut = now + timeout * USEC_PER_SEC, - .destination_wb = destination_wb, + .timeout_ut = now + timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT, + .result_body_wb = result_body_wb, .timeout = timeout, .function = string_strdupz(function), - .callback = callback, - .callback_data = callback_data, - .payload = NULL + .result_cb = result_cb, + .result_cb_data = result_cb_data, + .payload = NULL, + .parser = parser, }; uuid_t uuid; - uuid_generate_time(uuid); + uuid_generate_random(uuid); - char key[UUID_STR_LEN]; - uuid_unparse_lower(uuid, key); + char transaction[UUID_STR_LEN]; + uuid_unparse_lower(uuid, transaction); dictionary_write_lock(parser->inflight.functions); // if there is any error, our dictionary callbacks will call the caller callback to notify // the caller about the error - no need for error handling here. - dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function)); + if(register_canceller_cb) + register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t); if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) parser->inflight.smaller_timeout = tmp.timeout_ut; @@ -890,6 +1002,8 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou } static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) { + // a plugin or a child is registering a function + bool global = false; size_t i = 1; if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { @@ -926,7 +1040,7 @@ static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; } - rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser); + rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser); parser->user.data_collections_count++; @@ -973,18 +1087,18 @@ static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_ } else { if(format && *format) - pf->destination_wb->content_type = functions_format_to_content_type(format); + pf->result_body_wb->content_type = functions_format_to_content_type(format); pf->code = code; - pf->destination_wb->expires = expiration; + pf->result_body_wb->expires = expiration; if(expiration <= now_realtime_sec()) - buffer_no_cacheable(pf->destination_wb); + buffer_no_cacheable(pf->result_body_wb); else - buffer_cacheable(pf->destination_wb); + buffer_cacheable(pf->result_body_wb); } - parser->defer.response = (pf) ? pf->destination_wb : NULL; + parser->defer.response = (pf) ? pf->result_body_wb : NULL; parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END; parser->defer.action = pluginsd_function_result_end; parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL @@ -1163,7 +1277,7 @@ static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER * const char *value = get_word(words, num_words, 2); const char *label_source = get_word(words, num_words, 3); - if (!name || !value || !*label_source) { + if (!name || !value || !label_source) { netdata_log_error("Ignoring malformed or empty CHART LABEL command."); return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } @@ -1604,7 +1718,7 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED))) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) rrdset_isnot_obsolete(st); timing_step(TIMING_STEP_BEGIN2_FIND_CHART); @@ -1894,7 +2008,7 @@ struct mutex_cond { int rc; }; -static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data) +static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *callback_data) { struct mutex_cond *ctx = callback_data; pthread_mutex_lock(&ctx->lock); @@ -1904,9 +2018,81 @@ static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data) } #define VIRT_FNC_TIMEOUT 1 +#define VIRT_FNC_BUF_SIZE (4096) +void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) { + PARSER *parser = NULL; + + //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all) + char *words[PLUGINSD_MAX_WORDS]; + char *function_with_params = strdupz(name); + size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd); + + if (num_words < 2) { + netdata_log_error("PLUGINSD: virtual function name is empty."); + freez(function_with_params); + return; + } + + const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1)); + if (unlikely(cpi == NULL)) { + netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name); + freez(function_with_params); + return; + } + struct configurable_plugin *cp = dictionary_acquired_item_value(cpi); + parser = (PARSER *)cp->cb_usr_ctx; + + BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL); + // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name) + buffer_strcat(function_out, get_word(words, num_words, 0)); + for (size_t i = 1; i < num_words; i++) { + if (i == 1 && SERVING_PLUGINSD(parser)) + continue; + buffer_sprintf(function_out, " %s", get_word(words, num_words, i)); + } + freez(function_with_params); + + usec_t now = now_realtime_usec(); + + struct inflight_function tmp = { + .started_ut = now, + .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, + .result_body_wb = wb, + .timeout = VIRT_FNC_TIMEOUT * 10, + .function = string_strdupz(buffer_tostring(function_out)), + .result_cb = callback, + .result_cb_data = callback_data, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, + }; + buffer_free(function_out); + + uuid_t uuid; + uuid_generate_time(uuid); + + char key[UUID_STR_LEN]; + uuid_unparse_lower(uuid, key); + + dictionary_write_lock(parser->inflight.functions); + + // if there is any error, our dictionary callbacks will call the caller callback to notify + // the caller about the error - no need for error handling here. + dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + + if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = tmp.timeout_ut; + + // garbage collect stale inflight functions + if(parser->inflight.smaller_timeout < now) + inflight_functions_garbage_collect(parser, now); + + dictionary_write_unlock(parser->inflight.functions); +} + + dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) { usec_t now = now_realtime_usec(); - BUFFER *wb = buffer_create(4096, NULL); + BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL); struct mutex_cond cond = { .lock = PTHREAD_MUTEX_INITIALIZER, @@ -1916,12 +2102,13 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, struct inflight_function tmp = { .started_ut = now, .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, - .destination_wb = wb, + .result_body_wb = wb, .timeout = VIRT_FNC_TIMEOUT, .function = string_strdupz(name), - .callback = virt_fnc_got_data_cb, - .callback_data = &cond, - .payload = payload, + .result_cb = virt_fnc_got_data_cb, + .result_cb_data = &cond, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, }; uuid_t uuid; @@ -1968,98 +2155,188 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, return cfg; } -static dyncfg_config_t get_plugin_config_cb(void *usr_ctx) +#define CVF_MAX_LEN (1024) +static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; - return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL); + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + + return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL); } -static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx) +static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL); } -static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name) +static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg) +enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + int rc; - call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data); - if(rc != 1) + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); + + buffer_free(wb); + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg) +enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_module_config %s", module_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg) +enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name) +enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL); - char buf[1024]; - snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, NULL); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } @@ -2079,37 +2356,65 @@ static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, si cfg->get_config_schema_cb = get_plugin_config_schema_cb; cfg->cb_usr_ctx = parser; - parser->user.cd->cfg_dict_item = register_plugin(cfg); - - if (unlikely(parser->user.cd->cfg_dict_item == NULL)) { + const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser)); + if (unlikely(di == NULL)) { freez(cfg->name); freez(cfg); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin"); } - parser->user.cd->configuration = cfg; + if (SERVING_PLUGINSD(parser)) { + // this is optimization for pluginsd to avoid extra dictionary lookup + // as we know which plugin is comunicating with us + parser->user.cd->cfg_dict_item = di; + parser->user.cd->configuration = cfg; + } else { + // register_plugin keeps the item acquired, so we need to release it + dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); + } + + rrdpush_send_dyncfg_enable(parser->user.host, cfg->name); + return PARSER_RC_OK; } +#define LOG_MSG_SIZE (1024) +#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2) +#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3) static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE"); - struct configurable_plugin *plug_cfg = parser->user.cd->configuration; - if (unlikely(plug_cfg == NULL)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); - - if (unlikely(num_words != 3)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type"); + size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4; + + if (unlikely(num_words != expected_num_words)) { + char log[LOG_MSG_SIZE + 1]; + snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log); + } + + struct configurable_plugin *plug_cfg; + const DICTIONARY_ITEM *di = NULL; + if (SERVING_PLUGINSD(parser)) { + plug_cfg = parser->user.cd->configuration; + if (unlikely(plug_cfg == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); + } else { + di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); + if (unlikely(di == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found"); + + plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di); + } struct module *mod = callocz(1, sizeof(struct module)); - mod->type = str2_module_type(words[2]); + mod->type = str2_module_type(words[MODULE_TYPE_IDX]); if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) { freez(mod); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)"); } - mod->name = strdupz(words[1]); + mod->name = strdupz(words[MODULE_NAME_IDX]); mod->set_config_cb = set_module_config_cb; mod->get_config_cb = get_module_config_cb; @@ -2122,27 +2427,111 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si mod->delete_job_cb = delete_job_cb; mod->job_config_cb_usr_ctx = parser; - register_module(plug_cfg, mod); + register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser)); + + if (di != NULL) + dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); + + rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type); + return PARSER_RC_OK; } -// job_status <module_name> <job_name> <status_code> <state> <message> -static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) -{ - if (unlikely(num_words != 6 && num_words != 5)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); +static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) { + if (atol(words[3]) < 0) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags"); + dyncfg_job_flg_t flags = atol(words[3]); + if (SERVING_PLUGINSD(parser)) + flags |= JOB_FLG_PLUGIN_PUSHED; + else + flags |= JOB_FLG_STREAMING_PUSHED; - int state = atoi(words[4]); + enum job_type job_type = str2job_type(words[2]); + if (job_type == JOB_TYPE_UNKNOWN) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type"); + if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)"); - enum job_status job_status = str2job_state(words[3]); - if (unlikely(job_status == JOB_STATUS_UNKNOWN)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state"); + if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job"); + + rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags); + return PARSER_RC_OK; +} + +static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6; + + if (unlikely(num_words != expected_num_words)) { + char log[LOG_MSG_SIZE + 1]; + snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log); + } + + if (SERVING_PLUGINSD(parser)) { + return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); + } + return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]); +} + +static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) { + int state = str2i(words[3]); + + enum job_status status = str2job_state(words[2]); + if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status"); char *message = NULL; - if (num_words == 6) - message = strdupz(words[5]); + if (num_words == 5) + message = words[4]; + + const DICTIONARY_ITEM *plugin_item; + DICTIONARY *job_dict; + const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message); + + if (job_item != NULL) { + struct job *job = dictionary_acquired_item_value(job_item); + rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job); + + pthread_mutex_unlock(&job->lock); + dictionary_acquired_item_release(job_dict, job_item); + dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item); + } + + return PARSER_RC_OK; +} - report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message); +// job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message] +static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) { + if (SERVING_PLUGINSD(parser)) { + if (unlikely(num_words != 5 && num_words != 6)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); + } else { + if (unlikely(num_words != 6 && num_words != 7)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]"); + } + + if (SERVING_PLUGINSD(parser)) { + return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); + } + return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]); +} + +static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) { + // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function + // they are of opossite direction + if (num_words != 4) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name"); + + const char *plugin_name = get_word(words, num_words, 1); + const char *module_name = get_word(words, num_words, 2); + const char *job_name = get_word(words, num_words, 3); + + if (SERVING_STREAMING(parser)) + delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name); + + // forward to parent if any + rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name); return PARSER_RC_OK; } @@ -2309,15 +2698,22 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); buffered_reader_init(&parser->reader); - char buffer[PLUGINSD_LINE_MAX + 2]; + BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); while(likely(service_running(SERVICE_COLLECTORS))) { - if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) { + if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC))) break; + + continue; } - else if(unlikely(parser_action(parser, buffer))) + + if(unlikely(parser_action(parser, buffer->buffer))) break; + + buffer->len = 0; + buffer->buffer[0] = '\0'; } + buffer_free(buffer); cd->unsafe.enabled = parser->user.enabled; count = parser->user.data_collections_count; @@ -2452,10 +2848,19 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, case 101: return pluginsd_register_plugin(words, num_words, parser); - + case 102: return pluginsd_register_module(words, num_words, parser); + case 103: + return pluginsd_register_job(words, num_words, parser); + + case 110: + return pluginsd_job_status(words, num_words, parser); + + case 111: + return pluginsd_delete_job(words, num_words, parser); + default: fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id); } @@ -2472,14 +2877,20 @@ void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) { } } +static void parser_destroy_dyncfg(PARSER *parser) { + if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { + unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item); + parser->user.cd->configuration = NULL; + } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){ + dictionary_flush(parser->user.host->configurable_plugins); + } +} + void parser_destroy(PARSER *parser) { if (unlikely(!parser)) return; - if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { - unregister_plugin(parser->user.cd->cfg_dict_item); - parser->user.cd->configuration = NULL; - } + parser_destroy_dyncfg(parser); dictionary_destroy(parser->inflight.functions); freez(parser); |