diff options
Diffstat (limited to 'libnetdata/dyn_conf/dyn_conf.c')
-rw-r--r-- | libnetdata/dyn_conf/dyn_conf.c | 503 |
1 files changed, 347 insertions, 156 deletions
diff --git a/libnetdata/dyn_conf/dyn_conf.c b/libnetdata/dyn_conf/dyn_conf.c index 00289fdf5..ee4b4733a 100644 --- a/libnetdata/dyn_conf/dyn_conf.c +++ b/libnetdata/dyn_conf/dyn_conf.c @@ -3,7 +3,7 @@ #include "dyn_conf.h" #define DYN_CONF_PATH_MAX (4096) -#define DYN_CONF_DIR VARLIB_DIR "/etc" +#define DYN_CONF_DIR VARLIB_DIR "/dynconf" #define DYN_CONF_JOB_SCHEMA "job_schema" #define DYN_CONF_SCHEMA "schema" @@ -11,9 +11,20 @@ #define DYN_CONF_JOB_LIST "jobs" #define DYN_CONF_CFG_EXT ".cfg" -DICTIONARY *plugins_dict = NULL; +void job_flags_wallkthrough(dyncfg_job_flg_t flags, void (*cb)(const char *str, void *data), void *data) +{ + if (flags & JOB_FLG_PS_LOADED) + cb("JOB_FLG_PS_LOADED", data); + if (flags & JOB_FLG_PLUGIN_PUSHED) + cb("JOB_FLG_PLUGIN_PUSHED", data); + if (flags & JOB_FLG_STREAMING_PUSHED) + cb("JOB_FLG_STREAMING_PUSHED", data); + if (flags & JOB_FLG_USER_CREATED) + cb("JOB_FLG_USER_CREATED", data); +} struct deferred_cfg_send { + DICTIONARY *plugins_dict; char *plugin_name; char *module_name; char *job_name; @@ -33,7 +44,7 @@ static void deferred_config_free(struct deferred_cfg_send *dcs) freez(dcs); } -static void deferred_config_push_back(const char *plugin_name, const char *module_name, const char *job_name) +static void deferred_config_push_back(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name) { struct deferred_cfg_send *deferred = callocz(1, sizeof(struct deferred_cfg_send)); deferred->plugin_name = strdupz(plugin_name); @@ -42,6 +53,7 @@ static void deferred_config_push_back(const char *plugin_name, const char *modul if (job_name != NULL) deferred->job_name = strdupz(job_name); } + deferred->plugins_dict = plugins_dict; pthread_mutex_lock(&deferred_configs_lock); if (dyncfg_shutdown) { pthread_mutex_unlock(&deferred_configs_lock); @@ -95,7 +107,7 @@ static int _get_list_of_plugins_json_cb(const DICTIONARY_ITEM *item, void *entry return 0; } -json_object *get_list_of_plugins_json() +json_object *get_list_of_plugins_json(DICTIONARY *plugins_dict) { json_object *obj = json_object_new_array(); @@ -114,18 +126,7 @@ static int _get_list_of_modules_json_cb(const DICTIONARY_ITEM *item, void *entry json_object *json_item = json_object_new_string(module->name); json_object_object_add(json_module, "name", json_item); - const char *module_type; - switch (module->type) { - case MOD_TYPE_SINGLE: - module_type = "single"; - break; - case MOD_TYPE_ARRAY: - module_type = "job_array"; - break; - default: - module_type = "unknown"; - break; - } + const char *module_type = module_type2str(module->type); json_item = json_object_new_string(module_type); json_object_object_add(json_module, "type", json_item); @@ -163,17 +164,31 @@ const char *job_status2str(enum job_status status) } } -static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +static void _job_flags2str_cb(const char *str, void *data) { - UNUSED(item); - json_object *obj = (json_object *)data; - struct job *job = (struct job *)entry; + json_object *json_item = json_object_new_string(str); + json_object_array_add((json_object *)data, json_item); +} +json_object *job2json(struct job *job) { json_object *json_job = json_object_new_object(); + json_object *json_item = json_object_new_string(job->name); json_object_object_add(json_job, "name", json_item); + + json_item = json_object_new_string(job_type2str(job->type)); + json_object_object_add(json_job, "type", json_item); + + netdata_mutex_lock(&job->lock); json_item = json_object_new_string(job_status2str(job->status)); + json_object_object_add(json_job, "status", json_item); + + json_item = json_object_new_int(job->state); json_object_object_add(json_job, "state", json_item); + + json_item = job->reason == NULL ? NULL : json_object_new_string(job->reason); + json_object_object_add(json_job, "reason", json_item); + int64_t last_state_update_s = job->last_state_update / USEC_PER_SEC; int64_t last_state_update_us = job->last_state_update % USEC_PER_SEC; @@ -183,6 +198,22 @@ static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, v json_item = json_object_new_int64(last_state_update_us); json_object_object_add(json_job, "last_state_update_us", json_item); + json_item = json_object_new_array(); + job_flags_wallkthrough(job->flags, _job_flags2str_cb, json_item); + json_object_object_add(json_job, "flags", json_item); + + netdata_mutex_unlock(&job->lock); + + return json_job; +} + +static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +{ + UNUSED(item); + json_object *obj = (json_object *)data; + + json_object *json_job = job2json((struct job *)entry); + json_object_array_add(obj, json_job); return 0; @@ -206,24 +237,59 @@ struct job *get_job_by_name(struct module *module, const char *job_name) return dictionary_get(module->jobs, job_name); } -int remove_job(struct module *module, struct job *job) +void unlink_job(const char *plugin_name, const char *module_name, const char *job_name) { // as we are going to do unlink here we better make sure we have all to build proper path - if (unlikely(job->name == NULL || module == NULL || module->name == NULL || module->plugin == NULL || module->plugin->name == NULL)) - return 0; + if (unlikely(job_name == NULL || module_name == NULL || plugin_name == NULL)) + return; + BUFFER *buffer = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(buffer, DYN_CONF_DIR "/%s/%s/%s" DYN_CONF_CFG_EXT, plugin_name, module_name, job_name); + if (unlink(buffer_tostring(buffer))) + netdata_log_error("Cannot remove file %s", buffer_tostring(buffer)); - enum set_config_result rc = module->delete_job_cb(module->job_config_cb_usr_ctx, module->name, job->name); + buffer_free(buffer); +} + +void delete_job(struct configurable_plugin *plugin, const char *module_name, const char *job_name) +{ + struct module *module = get_module_by_name(plugin, module_name); + if (module == NULL) { + error_report("DYNCFG module \"%s\" not found", module_name); + return; + } + + struct job *job_item = get_job_by_name(module, job_name); + if (job_item == NULL) { + error_report("DYNCFG job \"%s\" not found", job_name); + return; + } + + dictionary_del(module->jobs, job_name); +} + +void delete_job_pname(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name) +{ + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name); + if (plugin_item == NULL) { + error_report("DYNCFG plugin \"%s\" not found", plugin_name); + return; + } + struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item); + + delete_job(plugin, module_name, job_name); + + dictionary_acquired_item_release(plugins_dict, plugin_item); +} + +int remove_job(struct module *module, struct job *job) +{ + enum set_config_result rc = module->delete_job_cb(module->job_config_cb_usr_ctx, module->plugin->name, module->name, job->name); if (rc != SET_CONFIG_ACCEPTED) { error_report("DYNCFG module \"%s\" rejected delete job for \"%s\"", module->name, job->name); return 0; } - - BUFFER *buffer = buffer_create(DYN_CONF_PATH_MAX, NULL); - buffer_sprintf(buffer, DYN_CONF_DIR "/%s/%s/%s" DYN_CONF_CFG_EXT, module->plugin->name, module->name, job->name); - unlink(buffer_tostring(buffer)); - buffer_free(buffer); - return dictionary_del(module->jobs, job->name); + return 1; } struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name) @@ -231,7 +297,7 @@ struct module *get_module_by_name(struct configurable_plugin *plugin, const char return dictionary_get(plugin->modules, module_name); } -inline struct configurable_plugin *get_plugin_by_name(const char *name) +inline struct configurable_plugin *get_plugin_by_name(DICTIONARY *plugins_dict, const char *name) { return dictionary_get(plugins_dict, name); } @@ -282,6 +348,59 @@ static int store_config(const char *module_name, const char *submodule_name, con return 0; } +#ifdef NETDATA_DEV_MODE +#define netdata_dev_fatal(...) fatal(__VA_ARGS__) +#else +#define netdata_dev_fatal(...) error_report(__VA_ARGS__) +#endif + +void dyn_conf_store_config(const char *function, const char *payload, struct configurable_plugin *plugin) { + dyncfg_config_t config = { + .data = (char*)payload, + .data_size = strlen(payload) + }; + + char *fnc = strdupz(function); + // split fnc to words + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(fnc, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + const char *fnc_name = get_word(words, words_c, 0); + if (fnc_name == NULL) { + error_report("Function name expected \"%s\"", function); + goto CLEANUP; + } + if (strncmp(fnc_name, FUNCTION_NAME_SET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_SET_PLUGIN_CONFIG)) == 0) { + store_config(plugin->name, NULL, NULL, config); + goto CLEANUP; + } + + if (words_c < 2) { + error_report("Module name expected \"%s\"", function); + goto CLEANUP; + } + const char *module_name = get_word(words, words_c, 1); + if (strncmp(fnc_name, FUNCTION_NAME_SET_MODULE_CONFIG, strlen(FUNCTION_NAME_SET_MODULE_CONFIG)) == 0) { + store_config(plugin->name, module_name, NULL, config); + goto CLEANUP; + } + + if (words_c < 3) { + error_report("Job name expected \"%s\"", function); + goto CLEANUP; + } + const char *job_name = get_word(words, words_c, 2); + if (strncmp(fnc_name, FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) { + store_config(plugin->name, module_name, job_name, config); + goto CLEANUP; + } + + netdata_dev_fatal("Unknown function \"%s\"", function); + +CLEANUP: + freez(fnc); +} + dyncfg_config_t load_config(const char *plugin_name, const char *module_name, const char *job_id) { BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL); @@ -310,16 +429,12 @@ dyncfg_config_t load_config(const char *plugin_name, const char *module_name, co char *set_plugin_config(struct configurable_plugin *plugin, dyncfg_config_t cfg) { - enum set_config_result rc = plugin->set_config_cb(plugin->cb_usr_ctx, &cfg); + enum set_config_result rc = plugin->set_config_cb(plugin->cb_usr_ctx, plugin->name, &cfg); if (rc != SET_CONFIG_ACCEPTED) { error_report("DYNCFG plugin \"%s\" rejected config", plugin->name); return "plugin rejected config"; } - if (store_config(plugin->name, NULL, NULL, cfg)) { - error_report("DYNCFG could not store config for module \"%s\"", plugin->name); - return "could not store config on disk"; - } return NULL; } @@ -327,62 +442,38 @@ static char *set_module_config(struct module *mod, dyncfg_config_t cfg) { struct configurable_plugin *plugin = mod->plugin; - enum set_config_result rc = mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg); + enum set_config_result rc = mod->set_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name, &cfg); if (rc != SET_CONFIG_ACCEPTED) { error_report("DYNCFG module \"%s\" rejected config", plugin->name); return "module rejected config"; } - if (store_config(plugin->name, mod->name, NULL, cfg)) { - error_report("DYNCFG could not store config for module \"%s\"", mod->name); - return "could not store config on disk"; - } - return NULL; } -struct job *job_new() +struct job *job_new(const char *job_id) { struct job *job = callocz(1, sizeof(struct job)); job->state = JOB_STATUS_UNKNOWN; job->last_state_update = now_realtime_usec(); + job->name = strdupz(job_id); + netdata_mutex_init(&job->lock); return job; } -static int set_job_config(struct job *job, dyncfg_config_t cfg) +static inline void job_del(struct job *job) { - struct module *mod = job->module; - enum set_config_result rt = mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name, &cfg); - - if (rt != SET_CONFIG_ACCEPTED) { - error_report("DYNCFG module \"%s\" rejected config for job \"%s\"", mod->name, job->name); - return 1; - } - - if (store_config(mod->plugin->name, mod->name, job->name, cfg)) { - error_report("DYNCFG could not store config for module \"%s\"", mod->name); - return 1; - } - - return 0; + netdata_mutex_destroy(&job->lock); + freez(job->reason); + freez((void*)job->name); + freez(job); } -struct job *add_job(struct module *mod, const char *job_id, dyncfg_config_t cfg) +void job_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) { - struct job *job = job_new(); - job->name = strdupz(job_id); - job->module = mod; - - if (set_job_config(job, cfg)) { - freez(job->name); - freez(job); - return NULL; - } - - dictionary_set(mod->jobs, job->name, job, sizeof(job)); - - return job; - + UNUSED(item); + UNUSED(data); + job_del((struct job *)value); } void module_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) @@ -395,10 +486,9 @@ void module_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) freez(mod); } - -const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin) +const DICTIONARY_ITEM *register_plugin(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, bool localhost) { - if (get_plugin_by_name(plugin->name) != NULL) { + if (get_plugin_by_name(plugins_dict, plugin->name) != NULL) { error_report("DYNCFG plugin \"%s\" already registered", plugin->name); return NULL; } @@ -413,7 +503,8 @@ const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin) plugin->modules = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); dictionary_register_delete_callback(plugin->modules, module_del_cb, NULL); - deferred_config_push_back(plugin->name, NULL, NULL); + if (localhost) + deferred_config_push_back(plugins_dict, plugin->name, NULL, NULL); dictionary_set(plugins_dict, plugin->name, plugin, sizeof(plugin)); @@ -421,24 +512,14 @@ const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin) return dictionary_get_and_acquire_item(plugins_dict, plugin->name); } -void unregister_plugin(const DICTIONARY_ITEM *plugin) +void unregister_plugin(DICTIONARY *plugins_dict, const DICTIONARY_ITEM *plugin) { struct configurable_plugin *plug = dictionary_acquired_item_value(plugin); dictionary_acquired_item_release(plugins_dict, plugin); dictionary_del(plugins_dict, plug->name); } -void job_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) -{ - UNUSED(item); - UNUSED(data); - struct job *job = (struct job *)value; - freez(job->reason); - freez(job->name); - freez(job); -} - -int register_module(struct configurable_plugin *plugin, struct module *module) +int register_module(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, struct module *module, bool localhost) { if (get_module_by_name(plugin, module->name) != NULL) { error_report("DYNCFG module \"%s\" already registered", module->name); @@ -447,7 +528,8 @@ int register_module(struct configurable_plugin *plugin, struct module *module) pthread_mutex_init(&module->lock, NULL); - deferred_config_push_back(plugin->name, module->name, NULL); + if (localhost) + deferred_config_push_back(plugins_dict, plugin->name, module->name, NULL); module->plugin = plugin; @@ -455,34 +537,38 @@ int register_module(struct configurable_plugin *plugin, struct module *module) module->jobs = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); dictionary_register_delete_callback(module->jobs, job_del_cb, NULL); - // load all jobs from disk - BUFFER *path = buffer_create(DYN_CONF_PATH_MAX, NULL); - buffer_sprintf(path, "%s/%s/%s", DYN_CONF_DIR, plugin->name, module->name); - DIR *dir = opendir(buffer_tostring(path)); - if (dir != NULL) { - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - if (ent->d_name[0] == '.') - continue; - if (ent->d_type != DT_REG) - continue; - size_t len = strnlen(ent->d_name, NAME_MAX); - if (len <= strlen(DYN_CONF_CFG_EXT)) - continue; - if (strcmp(ent->d_name + len - strlen(DYN_CONF_CFG_EXT), DYN_CONF_CFG_EXT) != 0) - continue; - ent->d_name[len - strlen(DYN_CONF_CFG_EXT)] = '\0'; - - struct job *job = job_new(); - job->name = strdupz(ent->d_name); - job->module = module; - dictionary_set(module->jobs, job->name, job, sizeof(job)); - - deferred_config_push_back(plugin->name, module->name, job->name); + if (localhost) { + // load all jobs from disk + BUFFER *path = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(path, "%s/%s/%s", DYN_CONF_DIR, plugin->name, module->name); + DIR *dir = opendir(buffer_tostring(path)); + if (dir != NULL) { + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + if (ent->d_name[0] == '.') + continue; + if (ent->d_type != DT_REG) + continue; + size_t len = strnlen(ent->d_name, NAME_MAX); + if (len <= strlen(DYN_CONF_CFG_EXT)) + continue; + if (strcmp(ent->d_name + len - strlen(DYN_CONF_CFG_EXT), DYN_CONF_CFG_EXT) != 0) + continue; + ent->d_name[len - strlen(DYN_CONF_CFG_EXT)] = '\0'; + + struct job *job = job_new(ent->d_name); + job->module = module; + job->flags = JOB_FLG_PS_LOADED; + job->type = JOB_TYPE_USER; + + dictionary_set(module->jobs, job->name, job, sizeof(job)); + + deferred_config_push_back(plugins_dict, plugin->name, module->name, ent->d_name); + } + closedir(dir); } - closedir(dir); + buffer_free(path); } - buffer_free(path); } dictionary_set(plugin->modules, module->name, module, sizeof(module)); @@ -490,11 +576,49 @@ int register_module(struct configurable_plugin *plugin, struct module *module) return 0; } +int register_job(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_type job_type, dyncfg_job_flg_t flags, int ignore_existing) +{ + int rc = 1; + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name); + if (plugin_item == NULL) { + error_report("plugin \"%s\" not registered", plugin_name); + return rc; + } + struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item); + struct module *mod = get_module_by_name(plugin, module_name); + if (mod == NULL) { + error_report("module \"%s\" not registered", module_name); + goto ERR_EXIT; + } + if (mod->type != MOD_TYPE_ARRAY) { + error_report("module \"%s\" is not an array", module_name); + goto ERR_EXIT; + } + if (get_job_by_name(mod, job_name) != NULL) { + if (!ignore_existing) + error_report("job \"%s\" already registered", job_name); + goto ERR_EXIT; + } + + struct job *job = job_new(job_name); + job->module = mod; + job->flags = flags; + job->type = job_type; + + dictionary_set(mod->jobs, job->name, job, sizeof(job)); + + rc = 0; +ERR_EXIT: + dictionary_acquired_item_release(plugins_dict, plugin_item); + return rc; +} + void freez_dyncfg(void *ptr) { freez(ptr); } -static void handle_dyncfg_root(struct uni_http_response *resp, int method) +#ifdef NETDATA_TEST_DYNCFG +static void handle_dyncfg_root(DICTIONARY *plugins_dict, struct uni_http_response *resp, int method) { if (method != HTTP_METHOD_GET) { resp->content = "method not allowed"; @@ -502,7 +626,7 @@ static void handle_dyncfg_root(struct uni_http_response *resp, int method) resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; return; } - json_object *obj = get_list_of_plugins_json(); + json_object *obj = get_list_of_plugins_json(plugins_dict); json_object *wrapper = json_object_new_object(); json_object_object_add(wrapper, "configurable_plugins", obj); resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY)); @@ -518,7 +642,7 @@ static void handle_plugin_root(struct uni_http_response *resp, int method, struc switch(method) { case HTTP_METHOD_GET: { - dyncfg_config_t cfg = plugin->get_config_cb(plugin->cb_usr_ctx); + dyncfg_config_t cfg = plugin->get_config_cb(plugin->cb_usr_ctx, plugin->name); resp->content = mallocz(cfg.data_size); memcpy(resp->content, cfg.data, cfg.data_size); resp->status = HTTP_RESP_OK; @@ -558,11 +682,12 @@ static void handle_plugin_root(struct uni_http_response *resp, int method, struc return; } } +#endif void handle_module_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, const char *module, void *post_payload, size_t post_payload_size) { - if (strncmp(module, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) { - dyncfg_config_t cfg = plugin->get_config_schema_cb(plugin->cb_usr_ctx); + if (strncmp(module, DYN_CONF_SCHEMA, sizeof(DYN_CONF_SCHEMA)) == 0) { + dyncfg_config_t cfg = plugin->get_config_schema_cb(plugin->cb_usr_ctx, plugin->name); resp->content = mallocz(cfg.data_size); memcpy(resp->content, cfg.data, cfg.data_size); resp->status = HTTP_RESP_OK; @@ -570,7 +695,7 @@ void handle_module_root(struct uni_http_response *resp, int method, struct confi resp->content_length = cfg.data_size; return; } - if (strncmp(module, DYN_CONF_MODULE_LIST, strlen(DYN_CONF_MODULE_LIST)) == 0) { + if (strncmp(module, DYN_CONF_MODULE_LIST, sizeof(DYN_CONF_MODULE_LIST)) == 0) { if (method != HTTP_METHOD_GET) { resp->content = "method not allowed (only GET)"; resp->content_length = strlen(resp->content); @@ -596,7 +721,7 @@ void handle_module_root(struct uni_http_response *resp, int method, struct confi return; } if (method == HTTP_METHOD_GET) { - dyncfg_config_t cfg = mod->get_config_cb(mod->config_cb_usr_ctx, mod->name); + dyncfg_config_t cfg = mod->get_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name); resp->content = mallocz(cfg.data_size); memcpy(resp->content, cfg.data, cfg.data_size); resp->status = HTTP_RESP_OK; @@ -651,8 +776,7 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method, .data = post_payload, .data_size = post_payload_size }; - job = add_job(mod, job_id, cont); - if (job == NULL) { + if (mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job_id, &cont)) { resp->content = "failed to add job"; resp->content_length = strlen(resp->content); resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR; @@ -672,7 +796,7 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method, switch (method) { case HTTP_METHOD_GET: { - dyncfg_config_t cfg = mod->get_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name); + dyncfg_config_t cfg = mod->get_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job->name); resp->content = mallocz(cfg.data_size); memcpy(resp->content, cfg.data, cfg.data_size); resp->status = HTTP_RESP_OK; @@ -692,10 +816,11 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method, .data = post_payload, .data_size = post_payload_size }; - if(set_job_config(job, cont)) { - resp->status = HTTP_RESP_BAD_REQUEST; + if (mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job->name, &cont) != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG module \"%s\" rejected config for job \"%s\"", mod->name, job->name); resp->content = "failed to set job config"; resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR; return; } resp->status = HTTP_RESP_OK; @@ -726,8 +851,8 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method, void handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size) { - if (strncmp(job_id, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) { - dyncfg_config_t cfg = mod->get_config_schema_cb(mod->config_cb_usr_ctx, mod->name); + if (strncmp(job_id, DYN_CONF_SCHEMA, sizeof(DYN_CONF_SCHEMA)) == 0) { + dyncfg_config_t cfg = mod->get_config_schema_cb(mod->config_cb_usr_ctx, mod->plugin->name, mod->name); resp->content = mallocz(cfg.data_size); memcpy(resp->content, cfg.data, cfg.data_size); resp->status = HTTP_RESP_OK; @@ -735,8 +860,8 @@ void handle_job_root(struct uni_http_response *resp, int method, struct module * resp->content_length = cfg.data_size; return; } - if (strncmp(job_id, DYN_CONF_JOB_SCHEMA, strlen(DYN_CONF_JOB_SCHEMA)) == 0) { - dyncfg_config_t cfg = mod->get_job_config_schema_cb(mod->job_config_cb_usr_ctx, mod->name); + if (strncmp(job_id, DYN_CONF_JOB_SCHEMA, sizeof(DYN_CONF_JOB_SCHEMA)) == 0) { + dyncfg_config_t cfg = mod->get_job_config_schema_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name); resp->content = mallocz(cfg.data_size); memcpy(resp->content, cfg.data, cfg.data_size); resp->status = HTTP_RESP_OK; @@ -744,7 +869,7 @@ void handle_job_root(struct uni_http_response *resp, int method, struct module * resp->content_length = cfg.data_size; return; } - if (strncmp(job_id, DYN_CONF_JOB_LIST, strlen(DYN_CONF_JOB_LIST)) == 0) { + if (strncmp(job_id, DYN_CONF_JOB_LIST, sizeof(DYN_CONF_JOB_LIST)) == 0) { if (mod->type != MOD_TYPE_ARRAY) { resp->content = "module type is not job_array (can't get the list of jobs)"; resp->content_length = strlen(resp->content); @@ -776,7 +901,14 @@ void handle_job_root(struct uni_http_response *resp, int method, struct module * dictionary_acquired_item_release(mod->jobs, job_item); } -struct uni_http_response dyn_conf_process_http_request(int method, const char *plugin, const char *module, const char *job_id, void *post_payload, size_t post_payload_size) +struct uni_http_response dyn_conf_process_http_request( + DICTIONARY *plugins_dict __maybe_unused, + int method __maybe_unused, + const char *plugin __maybe_unused, + const char *module __maybe_unused, + const char *job_id __maybe_unused, + void *post_payload __maybe_unused, + size_t post_payload_size __maybe_unused) { struct uni_http_response resp = { .status = HTTP_RESP_INTERNAL_SERVER_ERROR, @@ -785,8 +917,14 @@ struct uni_http_response dyn_conf_process_http_request(int method, const char *p .content_free = NULL, .content_length = 0 }; +#ifndef NETDATA_TEST_DYNCFG + resp.content = "DYNCFG is disabled (as it is for now developer only feature). This will be enabled by default when ready for technical preview."; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_PRECOND_FAIL; + return resp; +#else if (plugin == NULL) { - handle_dyncfg_root(&resp, method); + handle_dyncfg_root(plugins_dict, &resp, method); return resp; } const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin); @@ -814,9 +952,9 @@ struct uni_http_response dyn_conf_process_http_request(int method, const char *p goto EXIT_PLUGIN; } if (mod->type != MOD_TYPE_ARRAY) { - resp.content = "module is not array"; + resp.content = "400 - this module is not array type"; resp.content_length = strlen(resp.content); - resp.status = HTTP_RESP_NOT_FOUND; + resp.status = HTTP_RESP_BAD_REQUEST; goto EXIT_PLUGIN; } handle_job_root(&resp, method, mod, job_id, post_payload, post_payload_size); @@ -824,6 +962,7 @@ struct uni_http_response dyn_conf_process_http_request(int method, const char *p EXIT_PLUGIN: dictionary_acquired_item_release(plugins_dict, plugin_item); return resp; +#endif } void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) @@ -836,41 +975,53 @@ void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) freez(plugin); } -void report_job_status(struct configurable_plugin *plugin, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason) +// on failure - return NULL - all unlocked, nothing acquired +// on success - return pointer to job item - keep job and plugin acquired and locked!!! +// for caller convenience (to prevent another lock and races) +// caller is responsible to unlock the job and release it when not needed anymore +// this also avoids dependency creep +const DICTIONARY_ITEM *report_job_status_acq_lock(DICTIONARY *plugins_dict, const DICTIONARY_ITEM **plugin_acq_item, DICTIONARY **job_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason) { - const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(plugins_dict, plugin->name); - if (item == NULL) { - netdata_log_error("plugin %s not found", plugin->name); - return; + *plugin_acq_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name); + if (*plugin_acq_item == NULL) { + netdata_log_error("plugin %s not found", plugin_name); + return NULL; } - struct configurable_plugin *plug = dictionary_acquired_item_value(item); + + struct configurable_plugin *plug = dictionary_acquired_item_value(*plugin_acq_item); struct module *mod = get_module_by_name(plug, module_name); if (mod == NULL) { netdata_log_error("module %s not found", module_name); - goto EXIT_PLUGIN; + dictionary_acquired_item_release(plugins_dict, *plugin_acq_item); + return NULL; } if (mod->type != MOD_TYPE_ARRAY) { netdata_log_error("module %s is not array", module_name); - goto EXIT_PLUGIN; + dictionary_acquired_item_release(plugins_dict, *plugin_acq_item); + return NULL; } + *job_dict = mod->jobs; const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_name); if (job_item == NULL) { netdata_log_error("job %s not found", job_name); - goto EXIT_PLUGIN; + dictionary_acquired_item_release(plugins_dict, *plugin_acq_item); + return NULL; } struct job *job = dictionary_acquired_item_value(job_item); + + pthread_mutex_lock(&job->lock); job->status = status; job->state = status_code; if (job->reason != NULL) { freez(job->reason); } - job->reason = reason; + job->reason = reason != NULL ? strdupz(reason) : NULL; // reason is optional job->last_state_update = now_realtime_usec(); - dictionary_acquired_item_release(mod->jobs, job_item); + job->dirty = true; -EXIT_PLUGIN: - dictionary_acquired_item_release(plugins_dict, item); + // no unlock and acquired_item_release on success on purpose + return job_item; } int dyn_conf_init(void) @@ -882,9 +1033,6 @@ int dyn_conf_init(void) } } - plugins_dict = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); - dictionary_register_delete_callback(plugins_dict, plugin_del_cb, NULL); - return 0; } @@ -912,6 +1060,15 @@ void *dyncfg_main(void *ptr) while (!netdata_exit) { struct deferred_cfg_send *dcs = deferred_config_pop(ptr); + DICTIONARY *plugins_dict = dcs->plugins_dict; +#ifdef NETDATA_INTERNAL_CHECKS + if (plugins_dict == NULL) { + fatal("DYNCFG, plugins_dict is NULL"); + deferred_config_free(dcs); + continue; + } +#endif + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, dcs->plugin_name); if (plugin_item == NULL) { error_report("DYNCFG, plugin %s not found", dcs->plugin_name); @@ -922,21 +1079,21 @@ void *dyncfg_main(void *ptr) if (dcs->module_name == NULL) { dyncfg_config_t cfg = load_config(dcs->plugin_name, NULL, NULL); if (cfg.data != NULL) { - plugin->set_config_cb(plugin->cb_usr_ctx, &cfg); + plugin->set_config_cb(plugin->cb_usr_ctx, plugin->name, &cfg); freez(cfg.data); } } else if (dcs->job_name == NULL) { dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, NULL); if (cfg.data != NULL) { struct module *mod = get_module_by_name(plugin, dcs->module_name); - mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg); + mod->set_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name, &cfg); freez(cfg.data); } } else { dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, dcs->job_name); if (cfg.data != NULL) { struct module *mod = get_module_by_name(plugin, dcs->module_name); - mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, dcs->job_name, &cfg); + mod->set_job_config_cb(mod->job_config_cb_usr_ctx, plugin->name, mod->name, dcs->job_name, &cfg); freez(cfg.data); } } @@ -947,3 +1104,37 @@ void *dyncfg_main(void *ptr) netdata_thread_cleanup_pop(1); return NULL; } + +bool is_dyncfg_function(const char *function_name, uint8_t type) { + // TODO add hash to speed things up + if (type & (DYNCFG_FUNCTION_TYPE_GET | DYNCFG_FUNCTION_TYPE_REGULAR)) { + if (strncmp(function_name, FUNCTION_NAME_GET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_GET_PLUGIN_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_MODULE_CONFIG, strlen(FUNCTION_NAME_GET_MODULE_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_JOB_CONFIG, strlen(FUNCTION_NAME_GET_JOB_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA)) == 0) + return true; + } + + if (type & (DYNCFG_FUNCTION_TYPE_SET | DYNCFG_FUNCTION_TYPE_PAYLOAD)) { + if (strncmp(function_name, FUNCTION_NAME_SET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_SET_PLUGIN_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_SET_MODULE_CONFIG, strlen(FUNCTION_NAME_SET_MODULE_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) + return true; + } + + if (type & (DYNCFG_FUNCTION_TYPE_DELETE | DYNCFG_FUNCTION_TYPE_REGULAR)) { + if (strncmp(function_name, FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) + return true; + } + + return false; +} |