summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c293
1 files changed, 292 insertions, 1 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index cda17710c..19aa4544b 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -699,6 +699,7 @@ struct inflight_function {
usec_t timeout_ut;
usec_t started_ut;
usec_t sent_ut;
+ const char *payload;
};
static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
@@ -710,7 +711,8 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
char buffer[2048 + 1];
- snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
+ snprintfz(buffer, 2048, "%s %s %d \"%s\"\n",
+ pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
dictionary_acquired_item_name(item),
pf->timeout,
string2str(pf->function));
@@ -730,6 +732,25 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
string2str(pf->function), dictionary_acquired_item_name(item), ret,
pf->sent_ut - pf->started_ut);
}
+
+ if (!pf->payload)
+ return;
+
+ // send the payload to the plugin
+ ret = send_to_plugin(pf->payload, parser);
+
+ if(ret < 0) {
+ netdata_log_error("FUNCTION_PAYLOAD: failed to send function to plugin, error %d", ret);
+ rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
+ }
+ else {
+ internal_error(LOG_FUNCTIONS,
+ "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item), ret,
+ pf->sent_ut - pf->started_ut);
+ }
+
+ send_to_plugin("\nFUNCTION_PAYLOAD_END\n", parser);
}
static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
@@ -800,6 +821,7 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou
.function = string_strdupz(function),
.callback = callback,
.callback_data = callback_data,
+ .payload = NULL
};
uuid_t uuid;
@@ -1807,6 +1829,264 @@ static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_wo
return PARSER_RC_STOP;
}
+struct mutex_cond {
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ int rc;
+};
+
+static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data)
+{
+ struct mutex_cond *ctx = callback_data;
+ pthread_mutex_lock(&ctx->lock);
+ ctx->rc = code;
+ pthread_cond_broadcast(&ctx->cond);
+ pthread_mutex_unlock(&ctx->lock);
+}
+
+#define VIRT_FNC_TIMEOUT 1
+dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) {
+ usec_t now = now_realtime_usec();
+ BUFFER *wb = buffer_create(4096, NULL);
+
+ struct mutex_cond cond = {
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .cond = PTHREAD_COND_INITIALIZER
+ };
+
+ struct inflight_function tmp = {
+ .started_ut = now,
+ .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
+ .destination_wb = wb,
+ .timeout = VIRT_FNC_TIMEOUT,
+ .function = string_strdupz(name),
+ .callback = virt_fnc_got_data_cb,
+ .callback_data = &cond,
+ .payload = payload,
+ };
+
+ uuid_t uuid;
+ uuid_generate_time(uuid);
+
+ char key[UUID_STR_LEN];
+ uuid_unparse_lower(uuid, key);
+
+ dictionary_write_lock(parser->inflight.functions);
+
+ // if there is any error, our dictionary callbacks will call the caller callback to notify
+ // the caller about the error - no need for error handling here.
+ dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
+
+ if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = tmp.timeout_ut;
+
+ // garbage collect stale inflight functions
+ if(parser->inflight.smaller_timeout < now)
+ inflight_functions_garbage_collect(parser, now);
+
+ dictionary_write_unlock(parser->inflight.functions);
+
+ struct timespec tp;
+ clock_gettime(CLOCK_REALTIME, &tp);
+ tp.tv_sec += (time_t)VIRT_FNC_TIMEOUT;
+
+ pthread_mutex_lock(&cond.lock);
+
+ int ret = pthread_cond_timedwait(&cond.cond, &cond.lock, &tp);
+ if (ret == ETIMEDOUT)
+ netdata_log_error("PLUGINSD: DYNCFG virtual function %s timed out", name);
+
+ pthread_mutex_unlock(&cond.lock);
+
+ dyncfg_config_t cfg;
+ cfg.data = strdupz(buffer_tostring(wb));
+ cfg.data_size = buffer_strlen(wb);
+
+ if (rc != NULL)
+ *rc = cond.rc;
+
+ buffer_free(wb);
+ return cfg;
+}
+
+static dyncfg_config_t get_plugin_config_cb(void *usr_ctx)
+{
+ PARSER *parser = usr_ctx;
+ return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL);
+}
+
+static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx)
+{
+ PARSER *parser = usr_ctx;
+ return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL);
+}
+
+static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name)
+{
+ PARSER *parser = usr_ctx;
+ char buf[1024];
+ snprintfz(buf, sizeof(buf), "get_module_config %s", module_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+}
+
+static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name)
+{
+ PARSER *parser = usr_ctx;
+ char buf[1024];
+ snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+}
+
+static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name)
+{
+ PARSER *parser = usr_ctx;
+ char buf[1024];
+ snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+}
+
+static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name)
+{
+ PARSER *parser = usr_ctx;
+ char buf[1024];
+ snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+}
+
+enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg)
+{
+ PARSER *parser = usr_ctx;
+ int rc;
+ call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data);
+ if(rc != 1)
+ return SET_CONFIG_REJECTED;
+ return SET_CONFIG_ACCEPTED;
+}
+
+enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg)
+{
+ PARSER *parser = usr_ctx;
+ int rc;
+
+ char buf[1024];
+ snprintfz(buf, sizeof(buf), "set_module_config %s", module_name);
+ call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+
+ if(rc != 1)
+ return SET_CONFIG_REJECTED;
+ return SET_CONFIG_ACCEPTED;
+}
+
+enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
+{
+ PARSER *parser = usr_ctx;
+ int rc;
+
+ char buf[1024];
+ snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name);
+ call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+
+ if(rc != 1)
+ return SET_CONFIG_REJECTED;
+ return SET_CONFIG_ACCEPTED;
+}
+
+enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name)
+{
+ PARSER *parser = usr_ctx;
+ int rc;
+
+ char buf[1024];
+ snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name);
+ call_virtual_function_blocking(parser, buf, &rc, NULL);
+
+ if(rc != 1)
+ return SET_CONFIG_REJECTED;
+ return SET_CONFIG_ACCEPTED;
+}
+
+
+static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ netdata_log_info("PLUGINSD: DYNCFG_ENABLE");
+
+ if (unlikely (num_words != 2))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "missing name parameter");
+
+ struct configurable_plugin *cfg = callocz(1, sizeof(struct configurable_plugin));
+
+ cfg->name = strdupz(words[1]);
+ cfg->set_config_cb = set_plugin_config_cb;
+ cfg->get_config_cb = get_plugin_config_cb;
+ cfg->get_config_schema_cb = get_plugin_config_schema_cb;
+ cfg->cb_usr_ctx = parser;
+
+ parser->user.cd->cfg_dict_item = register_plugin(cfg);
+
+ if (unlikely(parser->user.cd->cfg_dict_item == NULL)) {
+ freez(cfg->name);
+ freez(cfg);
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin");
+ }
+
+ parser->user.cd->configuration = cfg;
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE");
+
+ struct configurable_plugin *plug_cfg = parser->user.cd->configuration;
+ if (unlikely(plug_cfg == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
+
+ if (unlikely(num_words != 3))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type");
+
+ struct module *mod = callocz(1, sizeof(struct module));
+
+ mod->type = str2_module_type(words[2]);
+ if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) {
+ freez(mod);
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)");
+ }
+
+ mod->name = strdupz(words[1]);
+
+ mod->set_config_cb = set_module_config_cb;
+ mod->get_config_cb = get_module_config_cb;
+ mod->get_config_schema_cb = get_module_config_schema_cb;
+ mod->config_cb_usr_ctx = parser;
+
+ mod->get_job_config_cb = get_job_config_cb;
+ mod->get_job_config_schema_cb = get_job_config_schema_cb;
+ mod->set_job_config_cb = set_job_config_cb;
+ mod->delete_job_cb = delete_job_cb;
+ mod->job_config_cb_usr_ctx = parser;
+
+ register_module(plug_cfg, mod);
+ return PARSER_RC_OK;
+}
+
+// job_status <module_name> <job_name> <status_code> <state> <message>
+static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser)
+{
+ if (unlikely(num_words != 6 && num_words != 5))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
+
+ int state = atoi(words[4]);
+
+ enum job_status job_status = str2job_state(words[3]);
+ if (unlikely(job_status == JOB_STATUS_UNKNOWN))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state");
+
+ char *message = NULL;
+ if (num_words == 6)
+ message = strdupz(words[5]);
+
+ report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message);
+ return PARSER_RC_OK;
+}
+
static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
{
const char *host_uuid_str = get_word(words, num_words, 1);
@@ -2111,6 +2391,12 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words,
case 99:
return pluginsd_exit(words, num_words, parser);
+ case 101:
+ return pluginsd_register_plugin(words, num_words, parser);
+
+ case 102:
+ return pluginsd_register_module(words, num_words, parser);
+
default:
fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
}
@@ -2131,6 +2417,11 @@ void parser_destroy(PARSER *parser) {
if (unlikely(!parser))
return;
+ if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
+ unregister_plugin(parser->user.cd->cfg_dict_item);
+ parser->user.cd->configuration = NULL;
+ }
+
dictionary_destroy(parser->inflight.functions);
freez(parser);
}