diff options
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 293 |
1 files changed, 292 insertions, 1 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index cda17710..19aa4544 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -699,6 +699,7 @@ struct inflight_function { usec_t timeout_ut; usec_t started_ut; usec_t sent_ut; + const char *payload; }; static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { @@ -710,7 +711,8 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void pf->code = HTTP_RESP_GATEWAY_TIMEOUT; char buffer[2048 + 1]; - snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n", + snprintfz(buffer, 2048, "%s %s %d \"%s\"\n", + pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION", dictionary_acquired_item_name(item), pf->timeout, string2str(pf->function)); @@ -730,6 +732,25 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void string2str(pf->function), dictionary_acquired_item_name(item), ret, pf->sent_ut - pf->started_ut); } + + if (!pf->payload) + return; + + // send the payload to the plugin + ret = send_to_plugin(pf->payload, parser); + + if(ret < 0) { + netdata_log_error("FUNCTION_PAYLOAD: failed to send function to plugin, error %d", ret); + rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); + } + else { + internal_error(LOG_FUNCTIONS, + "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + string2str(pf->function), dictionary_acquired_item_name(item), ret, + pf->sent_ut - pf->started_ut); + } + + send_to_plugin("\nFUNCTION_PAYLOAD_END\n", parser); } static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) { @@ -800,6 +821,7 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou .function = string_strdupz(function), .callback = callback, .callback_data = callback_data, + .payload = NULL }; uuid_t uuid; @@ -1807,6 +1829,264 @@ static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_wo return PARSER_RC_STOP; } +struct mutex_cond { + pthread_mutex_t lock; + pthread_cond_t cond; + int rc; +}; + +static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data) +{ + struct mutex_cond *ctx = callback_data; + pthread_mutex_lock(&ctx->lock); + ctx->rc = code; + pthread_cond_broadcast(&ctx->cond); + pthread_mutex_unlock(&ctx->lock); +} + +#define VIRT_FNC_TIMEOUT 1 +dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) { + usec_t now = now_realtime_usec(); + BUFFER *wb = buffer_create(4096, NULL); + + struct mutex_cond cond = { + .lock = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER + }; + + struct inflight_function tmp = { + .started_ut = now, + .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, + .destination_wb = wb, + .timeout = VIRT_FNC_TIMEOUT, + .function = string_strdupz(name), + .callback = virt_fnc_got_data_cb, + .callback_data = &cond, + .payload = payload, + }; + + uuid_t uuid; + uuid_generate_time(uuid); + + char key[UUID_STR_LEN]; + uuid_unparse_lower(uuid, key); + + dictionary_write_lock(parser->inflight.functions); + + // if there is any error, our dictionary callbacks will call the caller callback to notify + // the caller about the error - no need for error handling here. + dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + + if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = tmp.timeout_ut; + + // garbage collect stale inflight functions + if(parser->inflight.smaller_timeout < now) + inflight_functions_garbage_collect(parser, now); + + dictionary_write_unlock(parser->inflight.functions); + + struct timespec tp; + clock_gettime(CLOCK_REALTIME, &tp); + tp.tv_sec += (time_t)VIRT_FNC_TIMEOUT; + + pthread_mutex_lock(&cond.lock); + + int ret = pthread_cond_timedwait(&cond.cond, &cond.lock, &tp); + if (ret == ETIMEDOUT) + netdata_log_error("PLUGINSD: DYNCFG virtual function %s timed out", name); + + pthread_mutex_unlock(&cond.lock); + + dyncfg_config_t cfg; + cfg.data = strdupz(buffer_tostring(wb)); + cfg.data_size = buffer_strlen(wb); + + if (rc != NULL) + *rc = cond.rc; + + buffer_free(wb); + return cfg; +} + +static dyncfg_config_t get_plugin_config_cb(void *usr_ctx) +{ + PARSER *parser = usr_ctx; + return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL); +} + +static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx) +{ + PARSER *parser = usr_ctx; + return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL); +} + +static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name) +{ + PARSER *parser = usr_ctx; + char buf[1024]; + snprintfz(buf, sizeof(buf), "get_module_config %s", module_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); +} + +static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name) +{ + PARSER *parser = usr_ctx; + char buf[1024]; + snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); +} + +static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name) +{ + PARSER *parser = usr_ctx; + char buf[1024]; + snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); +} + +static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name) +{ + PARSER *parser = usr_ctx; + char buf[1024]; + snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); +} + +enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg) +{ + PARSER *parser = usr_ctx; + int rc; + call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data); + if(rc != 1) + return SET_CONFIG_REJECTED; + return SET_CONFIG_ACCEPTED; +} + +enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg) +{ + PARSER *parser = usr_ctx; + int rc; + + char buf[1024]; + snprintfz(buf, sizeof(buf), "set_module_config %s", module_name); + call_virtual_function_blocking(parser, buf, &rc, cfg->data); + + if(rc != 1) + return SET_CONFIG_REJECTED; + return SET_CONFIG_ACCEPTED; +} + +enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg) +{ + PARSER *parser = usr_ctx; + int rc; + + char buf[1024]; + snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name); + call_virtual_function_blocking(parser, buf, &rc, cfg->data); + + if(rc != 1) + return SET_CONFIG_REJECTED; + return SET_CONFIG_ACCEPTED; +} + +enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name) +{ + PARSER *parser = usr_ctx; + int rc; + + char buf[1024]; + snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name); + call_virtual_function_blocking(parser, buf, &rc, NULL); + + if(rc != 1) + return SET_CONFIG_REJECTED; + return SET_CONFIG_ACCEPTED; +} + + +static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + netdata_log_info("PLUGINSD: DYNCFG_ENABLE"); + + if (unlikely (num_words != 2)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "missing name parameter"); + + struct configurable_plugin *cfg = callocz(1, sizeof(struct configurable_plugin)); + + cfg->name = strdupz(words[1]); + cfg->set_config_cb = set_plugin_config_cb; + cfg->get_config_cb = get_plugin_config_cb; + cfg->get_config_schema_cb = get_plugin_config_schema_cb; + cfg->cb_usr_ctx = parser; + + parser->user.cd->cfg_dict_item = register_plugin(cfg); + + if (unlikely(parser->user.cd->cfg_dict_item == NULL)) { + freez(cfg->name); + freez(cfg); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin"); + } + + parser->user.cd->configuration = cfg; + return PARSER_RC_OK; +} + +static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE"); + + struct configurable_plugin *plug_cfg = parser->user.cd->configuration; + if (unlikely(plug_cfg == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); + + if (unlikely(num_words != 3)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type"); + + struct module *mod = callocz(1, sizeof(struct module)); + + mod->type = str2_module_type(words[2]); + if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) { + freez(mod); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)"); + } + + mod->name = strdupz(words[1]); + + mod->set_config_cb = set_module_config_cb; + mod->get_config_cb = get_module_config_cb; + mod->get_config_schema_cb = get_module_config_schema_cb; + mod->config_cb_usr_ctx = parser; + + mod->get_job_config_cb = get_job_config_cb; + mod->get_job_config_schema_cb = get_job_config_schema_cb; + mod->set_job_config_cb = set_job_config_cb; + mod->delete_job_cb = delete_job_cb; + mod->job_config_cb_usr_ctx = parser; + + register_module(plug_cfg, mod); + return PARSER_RC_OK; +} + +// job_status <module_name> <job_name> <status_code> <state> <message> +static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) +{ + if (unlikely(num_words != 6 && num_words != 5)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); + + int state = atoi(words[4]); + + enum job_status job_status = str2job_state(words[3]); + if (unlikely(job_status == JOB_STATUS_UNKNOWN)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state"); + + char *message = NULL; + if (num_words == 6) + message = strdupz(words[5]); + + report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message); + return PARSER_RC_OK; +} + static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser) { const char *host_uuid_str = get_word(words, num_words, 1); @@ -2111,6 +2391,12 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, case 99: return pluginsd_exit(words, num_words, parser); + case 101: + return pluginsd_register_plugin(words, num_words, parser); + + case 102: + return pluginsd_register_module(words, num_words, parser); + default: fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id); } @@ -2131,6 +2417,11 @@ void parser_destroy(PARSER *parser) { if (unlikely(!parser)) return; + if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { + unregister_plugin(parser->user.cd->cfg_dict_item); + parser->user.cd->configuration = NULL; + } + dictionary_destroy(parser->inflight.functions); freez(parser); } |