From dd814a7c1a8de056a79f7238578b09236edd5506 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 10 Aug 2023 11:18:49 +0200 Subject: Adding upstream version 1.42.0. Signed-off-by: Daniel Baumann --- libnetdata/dyn_conf/dyn_conf.c | 909 +++++++++++++++++++++++++++++++++++++++++ libnetdata/dyn_conf/dyn_conf.h | 136 ++++++ 2 files changed, 1045 insertions(+) create mode 100644 libnetdata/dyn_conf/dyn_conf.c create mode 100644 libnetdata/dyn_conf/dyn_conf.h (limited to 'libnetdata/dyn_conf') diff --git a/libnetdata/dyn_conf/dyn_conf.c b/libnetdata/dyn_conf/dyn_conf.c new file mode 100644 index 000000000..3e098fb7f --- /dev/null +++ b/libnetdata/dyn_conf/dyn_conf.c @@ -0,0 +1,909 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "dyn_conf.h" + +#define DYN_CONF_PATH_MAX (4096) +#define DYN_CONF_DIR VARLIB_DIR "/etc" + +#define DYN_CONF_JOB_SCHEMA "job_schema" +#define DYN_CONF_SCHEMA "schema" +#define DYN_CONF_MODULE_LIST "modules" +#define DYN_CONF_JOB_LIST "jobs" +#define DYN_CONF_CFG_EXT ".cfg" + +DICTIONARY *plugins_dict = NULL; + +struct deferred_cfg_send { + char *plugin_name; + char *module_name; + char *job_name; + struct deferred_cfg_send *next; +}; + +struct deferred_cfg_send *deferred_configs = NULL; +pthread_mutex_t deferred_configs_lock = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t deferred_configs_cond = PTHREAD_COND_INITIALIZER; + +static void deferred_config_push_back(const char *plugin_name, const char *module_name, const char *job_name) +{ + struct deferred_cfg_send *deferred = callocz(1, sizeof(struct deferred_cfg_send)); + deferred->plugin_name = strdupz(plugin_name); + if (module_name != NULL) { + deferred->module_name = strdupz(module_name); + if (job_name != NULL) + deferred->job_name = strdupz(job_name); + } + pthread_mutex_lock(&deferred_configs_lock); + struct deferred_cfg_send *last = deferred_configs; + if (last == NULL) + deferred_configs = deferred; + else { + while (last->next != NULL) + last = last->next; + last->next = deferred; + } + pthread_cond_signal(&deferred_configs_cond); + pthread_mutex_unlock(&deferred_configs_lock); +} + +static struct deferred_cfg_send *deferred_config_pop() +{ + pthread_mutex_lock(&deferred_configs_lock); + while (deferred_configs == NULL) + pthread_cond_wait(&deferred_configs_cond, &deferred_configs_lock); + struct deferred_cfg_send *deferred = deferred_configs; + deferred_configs = deferred_configs->next; + pthread_mutex_unlock(&deferred_configs_lock); + return deferred; +} + +static void deferred_config_free(struct deferred_cfg_send *dcs) +{ + freez(dcs->plugin_name); + freez(dcs->module_name); + freez(dcs->job_name); + freez(dcs); +} + +static int _get_list_of_plugins_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +{ + UNUSED(item); + json_object *obj = (json_object *)data; + struct configurable_plugin *plugin = (struct configurable_plugin *)entry; + + json_object *plugin_name = json_object_new_string(plugin->name); + json_object_array_add(obj, plugin_name); + + return 0; +} + +json_object *get_list_of_plugins_json() +{ + json_object *obj = json_object_new_array(); + + dictionary_walkthrough_read(plugins_dict, _get_list_of_plugins_json_cb, obj); + + return obj; +} + +static int _get_list_of_modules_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +{ + UNUSED(item); + json_object *obj = (json_object *)data; + struct module *module = (struct module *)entry; + + json_object *json_module = json_object_new_object(); + + json_object *json_item = json_object_new_string(module->name); + json_object_object_add(json_module, "name", json_item); + const char *module_type; + switch (module->type) { + case MOD_TYPE_SINGLE: + module_type = "single"; + break; + case MOD_TYPE_ARRAY: + module_type = "job_array"; + break; + default: + module_type = "unknown"; + break; + } + json_item = json_object_new_string(module_type); + json_object_object_add(json_module, "type", json_item); + + json_object_array_add(obj, json_module); + + return 0; +} + +json_object *get_list_of_modules_json(struct configurable_plugin *plugin) +{ + json_object *obj = json_object_new_array(); + + pthread_mutex_lock(&plugin->lock); + + dictionary_walkthrough_read(plugin->modules, _get_list_of_modules_json_cb, obj); + + pthread_mutex_unlock(&plugin->lock); + + return obj; +} + +const char *job_status2str(enum job_status status) +{ + switch (status) { + case JOB_STATUS_UNKNOWN: + return "unknown"; + case JOB_STATUS_STOPPED: + return "stopped"; + case JOB_STATUS_RUNNING: + return "running"; + case JOB_STATUS_ERROR: + return "error"; + default: + return "unknown"; + } +} + +static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +{ + UNUSED(item); + json_object *obj = (json_object *)data; + struct job *job = (struct job *)entry; + + json_object *json_job = json_object_new_object(); + json_object *json_item = json_object_new_string(job->name); + json_object_object_add(json_job, "name", json_item); + json_item = json_object_new_string(job_status2str(job->status)); + json_object_object_add(json_job, "state", json_item); + int64_t last_state_update_s = job->last_state_update / USEC_PER_SEC; + int64_t last_state_update_us = job->last_state_update % USEC_PER_SEC; + + json_item = json_object_new_int64(last_state_update_s); + json_object_object_add(json_job, "last_state_update_s", json_item); + + json_item = json_object_new_int64(last_state_update_us); + json_object_object_add(json_job, "last_state_update_us", json_item); + + json_object_array_add(obj, json_job); + + return 0; +} + +json_object *get_list_of_jobs_json(struct module *module) +{ + json_object *obj = json_object_new_array(); + + pthread_mutex_lock(&module->lock); + + dictionary_walkthrough_read(module->jobs, _get_list_of_jobs_json_cb, obj); + + pthread_mutex_unlock(&module->lock); + + return obj; +} + +struct job *get_job_by_name(struct module *module, const char *job_name) +{ + return dictionary_get(module->jobs, job_name); +} + +int remove_job(struct module *module, struct job *job) +{ + // as we are going to do unlink here we better make sure we have all to build proper path + if (unlikely(job->name == NULL || module == NULL || module->name == NULL || module->plugin == NULL || module->plugin->name == NULL)) + return 0; + + enum set_config_result rc = module->delete_job_cb(module->job_config_cb_usr_ctx, module->name, job->name); + + if (rc != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG module \"%s\" rejected delete job for \"%s\"", module->name, job->name); + return 0; + } + + BUFFER *buffer = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(buffer, DYN_CONF_DIR "/%s/%s/%s" DYN_CONF_CFG_EXT, module->plugin->name, module->name, job->name); + unlink(buffer_tostring(buffer)); + buffer_free(buffer); + return dictionary_del(module->jobs, job->name); +} + +struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name) +{ + return dictionary_get(plugin->modules, module_name); +} + +inline struct configurable_plugin *get_plugin_by_name(const char *name) +{ + return dictionary_get(plugins_dict, name); +} + +static int store_config(const char *module_name, const char *submodule_name, const char *cfg_idx, dyncfg_config_t cfg) +{ + BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(filename, DYN_CONF_DIR "/%s", module_name); + if (mkdir(buffer_tostring(filename), 0755) == -1) { + if (errno != EEXIST) { + netdata_log_error("DYNCFG store_config: failed to create module directory %s", buffer_tostring(filename)); + buffer_free(filename); + return 1; + } + } + + if (submodule_name != NULL) { + buffer_sprintf(filename, "/%s", submodule_name); + if (mkdir(buffer_tostring(filename), 0755) == -1) { + if (errno != EEXIST) { + netdata_log_error("DYNCFG store_config: failed to create submodule directory %s", buffer_tostring(filename)); + buffer_free(filename); + return 1; + } + } + } + + if (cfg_idx != NULL) + buffer_sprintf(filename, "/%s", cfg_idx); + + buffer_strcat(filename, DYN_CONF_CFG_EXT); + + + error_report("DYNCFG store_config: %s", buffer_tostring(filename)); + + //write to file + FILE *f = fopen(buffer_tostring(filename), "w"); + if (f == NULL) { + error_report("DYNCFG store_config: failed to open %s for writing", buffer_tostring(filename)); + buffer_free(filename); + return 1; + } + + fwrite(cfg.data, cfg.data_size, 1, f); + fclose(f); + + buffer_free(filename); + return 0; +} + +dyncfg_config_t load_config(const char *plugin_name, const char *module_name, const char *job_id) +{ + BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(filename, DYN_CONF_DIR "/%s", plugin_name); + if (module_name != NULL) + buffer_sprintf(filename, "/%s", module_name); + + if (job_id != NULL) + buffer_sprintf(filename, "/%s", job_id); + + buffer_strcat(filename, DYN_CONF_CFG_EXT); + + dyncfg_config_t config; + long bytes; + config.data = read_by_filename(buffer_tostring(filename), &bytes); + + if (config.data == NULL) + error_report("DYNCFG load_config: failed to load config from %s", buffer_tostring(filename)); + + config.data_size = bytes; + + buffer_free(filename); + + return config; +} + +char *set_plugin_config(struct configurable_plugin *plugin, dyncfg_config_t cfg) +{ + enum set_config_result rc = plugin->set_config_cb(plugin->cb_usr_ctx, &cfg); + if (rc != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG plugin \"%s\" rejected config", plugin->name); + return "plugin rejected config"; + } + + if (store_config(plugin->name, NULL, NULL, cfg)) { + error_report("DYNCFG could not store config for module \"%s\"", plugin->name); + return "could not store config on disk"; + } + return NULL; +} + +static char *set_module_config(struct module *mod, dyncfg_config_t cfg) +{ + struct configurable_plugin *plugin = mod->plugin; + + enum set_config_result rc = mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg); + if (rc != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG module \"%s\" rejected config", plugin->name); + return "module rejected config"; + } + + if (store_config(plugin->name, mod->name, NULL, cfg)) { + error_report("DYNCFG could not store config for module \"%s\"", mod->name); + return "could not store config on disk"; + } + + return NULL; +} + +struct job *job_new() +{ + struct job *job = callocz(1, sizeof(struct job)); + job->state = JOB_STATUS_UNKNOWN; + job->last_state_update = now_realtime_usec(); + return job; +} + +static int set_job_config(struct job *job, dyncfg_config_t cfg) +{ + struct module *mod = job->module; + enum set_config_result rt = mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name, &cfg); + + if (rt != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG module \"%s\" rejected config for job \"%s\"", mod->name, job->name); + return 1; + } + + if (store_config(mod->plugin->name, mod->name, job->name, cfg)) { + error_report("DYNCFG could not store config for module \"%s\"", mod->name); + return 1; + } + + return 0; +} + +struct job *add_job(struct module *mod, const char *job_id, dyncfg_config_t cfg) +{ + struct job *job = job_new(); + job->name = strdupz(job_id); + job->module = mod; + + if (set_job_config(job, cfg)) { + freez(job->name); + freez(job); + return NULL; + } + + dictionary_set(mod->jobs, job->name, job, sizeof(job)); + + return job; + +} + +void module_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) +{ + UNUSED(item); + UNUSED(data); + struct module *mod = (struct module *)value; + dictionary_destroy(mod->jobs); + freez(mod->name); + freez(mod); +} + + +const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin) +{ + if (get_plugin_by_name(plugin->name) != NULL) { + error_report("DYNCFG plugin \"%s\" already registered", plugin->name); + return NULL; + } + + if (plugin->set_config_cb == NULL) { + error_report("DYNCFG plugin \"%s\" has no set_config_cb", plugin->name); + return NULL; + } + + pthread_mutex_init(&plugin->lock, NULL); + + plugin->modules = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); + dictionary_register_delete_callback(plugin->modules, module_del_cb, NULL); + + deferred_config_push_back(plugin->name, NULL, NULL); + + dictionary_set(plugins_dict, plugin->name, plugin, sizeof(plugin)); + + // the plugin keeps the pointer to the dictionary item, so we need to acquire it + return dictionary_get_and_acquire_item(plugins_dict, plugin->name); +} + +void unregister_plugin(const DICTIONARY_ITEM *plugin) +{ + struct configurable_plugin *plug = dictionary_acquired_item_value(plugin); + dictionary_acquired_item_release(plugins_dict, plugin); + dictionary_del(plugins_dict, plug->name); +} + +void job_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) +{ + UNUSED(item); + UNUSED(data); + struct job *job = (struct job *)value; + freez(job->reason); + freez(job->name); + freez(job); +} + +int register_module(struct configurable_plugin *plugin, struct module *module) +{ + if (get_module_by_name(plugin, module->name) != NULL) { + error_report("DYNCFG module \"%s\" already registered", module->name); + return 1; + } + + pthread_mutex_init(&module->lock, NULL); + + deferred_config_push_back(plugin->name, module->name, NULL); + + module->plugin = plugin; + + if (module->type == MOD_TYPE_ARRAY) { + module->jobs = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); + dictionary_register_delete_callback(module->jobs, job_del_cb, NULL); + + // load all jobs from disk + BUFFER *path = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(path, "%s/%s/%s", DYN_CONF_DIR, plugin->name, module->name); + DIR *dir = opendir(buffer_tostring(path)); + if (dir != NULL) { + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + if (ent->d_name[0] == '.') + continue; + if (ent->d_type != DT_REG) + continue; + size_t len = strnlen(ent->d_name, NAME_MAX); + if (len <= strlen(DYN_CONF_CFG_EXT)) + continue; + if (strcmp(ent->d_name + len - strlen(DYN_CONF_CFG_EXT), DYN_CONF_CFG_EXT) != 0) + continue; + ent->d_name[len - strlen(DYN_CONF_CFG_EXT)] = '\0'; + + struct job *job = job_new(); + job->name = strdupz(ent->d_name); + job->module = module; + dictionary_set(module->jobs, job->name, job, sizeof(job)); + + deferred_config_push_back(plugin->name, module->name, job->name); + } + closedir(dir); + } + buffer_free(path); + } + + dictionary_set(plugin->modules, module->name, module, sizeof(module)); + + return 0; +} + +void freez_dyncfg(void *ptr) { + freez(ptr); +} + +static void handle_dyncfg_root(struct uni_http_response *resp, int method) +{ + if (method != HTTP_METHOD_GET) { + resp->content = "method not allowed"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } + json_object *obj = get_list_of_plugins_json(); + json_object *wrapper = json_object_new_object(); + json_object_object_add(wrapper, "configurable_plugins", obj); + resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY)); + json_object_put(wrapper); + resp->status = HTTP_RESP_OK; + resp->content_type = CT_APPLICATION_JSON; + resp->content_free = freez_dyncfg; + resp->content_length = strlen(resp->content); +} + +static void handle_plugin_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, void *post_payload, size_t post_payload_size) +{ + switch(method) { + case HTTP_METHOD_GET: + { + dyncfg_config_t cfg = plugin->get_config_cb(plugin->cb_usr_ctx); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + case HTTP_METHOD_PUT: + { + char *response; + if (post_payload == NULL) { + resp->content = "no payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + response = set_plugin_config(plugin, cont); + if (response == NULL) { + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + } else { + resp->status = HTTP_RESP_BAD_REQUEST; + resp->content = response; + resp->content_length = strlen(resp->content); + } + return; + } + default: + resp->content = "method not allowed"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } +} + +void handle_module_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, const char *module, void *post_payload, size_t post_payload_size) +{ + if (strncmp(module, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) { + dyncfg_config_t cfg = plugin->get_config_schema_cb(plugin->cb_usr_ctx); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + if (strncmp(module, DYN_CONF_MODULE_LIST, strlen(DYN_CONF_MODULE_LIST)) == 0) { + if (method != HTTP_METHOD_GET) { + resp->content = "method not allowed (only GET)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } + json_object *obj = get_list_of_modules_json(plugin); + json_object *wrapper = json_object_new_object(); + json_object_object_add(wrapper, "modules", obj); + resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY)); + json_object_put(wrapper); + resp->status = HTTP_RESP_OK; + resp->content_type = CT_APPLICATION_JSON; + resp->content_free = freez_dyncfg; + resp->content_length = strlen(resp->content); + return; + } + struct module *mod = get_module_by_name(plugin, module); + if (mod == NULL) { + resp->content = "module not found"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_NOT_FOUND; + return; + } + if (method == HTTP_METHOD_GET) { + dyncfg_config_t cfg = mod->get_config_cb(mod->config_cb_usr_ctx, mod->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } else if (method == HTTP_METHOD_PUT) { + char *response; + if (post_payload == NULL) { + resp->content = "no payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + response = set_module_config(mod, cont); + if (response == NULL) { + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + } else { + resp->status = HTTP_RESP_BAD_REQUEST; + resp->content = response; + resp->content_length = strlen(resp->content); + } + return; + } + resp->content = "method not allowed"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; +} + +static inline void _handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size, struct job *job) +{ + if (method == HTTP_METHOD_POST) { + if (job != NULL) { + resp->content = "can't POST, job already exists (use PUT to update?)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + if (post_payload == NULL) { + resp->content = "no payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + job = add_job(mod, job_id, cont); + if (job == NULL) { + resp->content = "failed to add job"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR; + return; + } + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + return; + } + if (job == NULL) { + resp->content = "job not found"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_NOT_FOUND; + return; + } + switch (method) { + case HTTP_METHOD_GET: + { + dyncfg_config_t cfg = mod->get_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + case HTTP_METHOD_PUT: + { + if (post_payload == NULL) { + resp->content = "missing payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + if(set_job_config(job, cont)) { + resp->status = HTTP_RESP_BAD_REQUEST; + resp->content = "failed to set job config"; + resp->content_length = strlen(resp->content); + return; + } + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + return; + } + case HTTP_METHOD_DELETE: + { + if (!remove_job(mod, job)) { + resp->content = "failed to remove job"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR; + return; + } + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + return; + } + default: + resp->content = "method not allowed (only GET, PUT, DELETE)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } +} + +void handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size) +{ + if (strncmp(job_id, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) { + dyncfg_config_t cfg = mod->get_config_schema_cb(mod->config_cb_usr_ctx, mod->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + if (strncmp(job_id, DYN_CONF_JOB_SCHEMA, strlen(DYN_CONF_JOB_SCHEMA)) == 0) { + dyncfg_config_t cfg = mod->get_job_config_schema_cb(mod->job_config_cb_usr_ctx, mod->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + if (strncmp(job_id, DYN_CONF_JOB_LIST, strlen(DYN_CONF_JOB_LIST)) == 0) { + if (mod->type != MOD_TYPE_ARRAY) { + resp->content = "module type is not job_array (can't get the list of jobs)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_NOT_FOUND; + return; + } + if (method != HTTP_METHOD_GET) { + resp->content = "method not allowed (only GET)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } + json_object *obj = get_list_of_jobs_json(mod); + json_object *wrapper = json_object_new_object(); + json_object_object_add(wrapper, "jobs", obj); + resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY)); + json_object_put(wrapper); + resp->status = HTTP_RESP_OK; + resp->content_type = CT_APPLICATION_JSON; + resp->content_free = freez_dyncfg; + resp->content_length = strlen(resp->content); + return; + } + const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_id); + struct job *job = dictionary_acquired_item_value(job_item); + + _handle_job_root(resp, method, mod, job_id, post_payload, post_payload_size, job); + + dictionary_acquired_item_release(mod->jobs, job_item); +} + +struct uni_http_response dyn_conf_process_http_request(int method, const char *plugin, const char *module, const char *job_id, void *post_payload, size_t post_payload_size) +{ + struct uni_http_response resp = { + .status = HTTP_RESP_INTERNAL_SERVER_ERROR, + .content_type = CT_TEXT_PLAIN, + .content = HTTP_RESP_INTERNAL_SERVER_ERROR_STR, + .content_free = NULL, + .content_length = 0 + }; + if (plugin == NULL) { + handle_dyncfg_root(&resp, method); + return resp; + } + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin); + if (plugin_item == NULL) { + resp.content = "plugin not found"; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_NOT_FOUND; + return resp; + } + struct configurable_plugin *plug = dictionary_acquired_item_value(plugin_item); + if (module == NULL) { + handle_plugin_root(&resp, method, plug, post_payload, post_payload_size); + goto EXIT_PLUGIN; + } + if (job_id == NULL) { + handle_module_root(&resp, method, plug, module, post_payload, post_payload_size); + goto EXIT_PLUGIN; + } + // for modules we do not do get_and_acquire as modules are never removed (only together with the plugin) + struct module *mod = get_module_by_name(plug, module); + if (mod == NULL) { + resp.content = "module not found"; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_NOT_FOUND; + goto EXIT_PLUGIN; + } + if (mod->type != MOD_TYPE_ARRAY) { + resp.content = "module is not array"; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_NOT_FOUND; + goto EXIT_PLUGIN; + } + handle_job_root(&resp, method, mod, job_id, post_payload, post_payload_size); + +EXIT_PLUGIN: + dictionary_acquired_item_release(plugins_dict, plugin_item); + return resp; +} + +void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) +{ + UNUSED(item); + UNUSED(data); + struct configurable_plugin *plugin = (struct configurable_plugin *)value; + dictionary_destroy(plugin->modules); + freez(plugin->name); + freez(plugin); +} + +void report_job_status(struct configurable_plugin *plugin, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason) +{ + const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(plugins_dict, plugin->name); + if (item == NULL) { + netdata_log_error("plugin %s not found", plugin->name); + return; + } + struct configurable_plugin *plug = dictionary_acquired_item_value(item); + struct module *mod = get_module_by_name(plug, module_name); + if (mod == NULL) { + netdata_log_error("module %s not found", module_name); + goto EXIT_PLUGIN; + } + if (mod->type != MOD_TYPE_ARRAY) { + netdata_log_error("module %s is not array", module_name); + goto EXIT_PLUGIN; + } + const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_name); + if (job_item == NULL) { + netdata_log_error("job %s not found", job_name); + goto EXIT_PLUGIN; + } + struct job *job = dictionary_acquired_item_value(job_item); + job->status = status; + job->state = status_code; + if (job->reason != NULL) { + freez(job->reason); + } + job->reason = reason; + job->last_state_update = now_realtime_usec(); + + dictionary_acquired_item_release(mod->jobs, job_item); + +EXIT_PLUGIN: + dictionary_acquired_item_release(plugins_dict, item); +} + +int dyn_conf_init(void) +{ + if (mkdir(DYN_CONF_DIR, 0755) == -1) { + if (errno != EEXIST) { + netdata_log_error("failed to create directory for dynamic configuration"); + return 1; + } + } + + plugins_dict = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); + dictionary_register_delete_callback(plugins_dict, plugin_del_cb, NULL); + + return 0; +} + +void *dyncfg_main(void *in) +{ + while (!netdata_exit) { + struct deferred_cfg_send *dcs = deferred_config_pop(); + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, dcs->plugin_name); + if (plugin_item == NULL) { + error_report("DYNCFG, plugin %s not found", dcs->plugin_name); + deferred_config_free(dcs); + continue; + } + struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item); + if (dcs->module_name == NULL) { + dyncfg_config_t cfg = load_config(dcs->plugin_name, NULL, NULL); + if (cfg.data != NULL) { + plugin->set_config_cb(plugin->cb_usr_ctx, &cfg); + freez(cfg.data); + } + } else if (dcs->job_name == NULL) { + dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, NULL); + if (cfg.data != NULL) { + struct module *mod = get_module_by_name(plugin, dcs->module_name); + mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg); + freez(cfg.data); + } + } else { + dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, dcs->job_name); + if (cfg.data != NULL) { + struct module *mod = get_module_by_name(plugin, dcs->module_name); + mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, dcs->job_name, &cfg); + freez(cfg.data); + } + } + deferred_config_free(dcs); + dictionary_acquired_item_release(plugins_dict, plugin_item); + } + return NULL; +} diff --git a/libnetdata/dyn_conf/dyn_conf.h b/libnetdata/dyn_conf/dyn_conf.h new file mode 100644 index 000000000..f10ae6a12 --- /dev/null +++ b/libnetdata/dyn_conf/dyn_conf.h @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef DYN_CONF_H +#define DYN_CONF_H + +#include "../libnetdata.h" + +enum module_type { + MOD_TYPE_UNKNOWN = 0, + MOD_TYPE_ARRAY, + MOD_TYPE_SINGLE +}; + +static inline enum module_type str2_module_type(const char *type_name) +{ + if (strcmp(type_name, "job_array") == 0) + return MOD_TYPE_ARRAY; + else if (strcmp(type_name, "single") == 0) + return MOD_TYPE_SINGLE; + return MOD_TYPE_UNKNOWN; +} + +struct dyncfg_config { + void *data; + size_t data_size; +}; + +typedef struct dyncfg_config dyncfg_config_t; + +struct configurable_plugin; +struct module; + +enum job_status { + JOB_STATUS_UNKNOWN = 0, // State used until plugin reports first status + JOB_STATUS_STOPPED, + JOB_STATUS_RUNNING, + JOB_STATUS_ERROR +}; + +inline enum job_status str2job_state(const char *state_name) { + if (strcmp(state_name, "stopped") == 0) + return JOB_STATUS_STOPPED; + else if (strcmp(state_name, "running") == 0) + return JOB_STATUS_RUNNING; + else if (strcmp(state_name, "error") == 0) + return JOB_STATUS_ERROR; + return JOB_STATUS_UNKNOWN; +} + +enum set_config_result { + SET_CONFIG_ACCEPTED = 0, + SET_CONFIG_REJECTED, + SET_CONFIG_DEFFER +}; + +struct job +{ + char *name; + + //state reported by config + enum job_status status; // reported by plugin, enum as this has to be interpreted by UI + int state; // code reported by plugin which can mean anything plugin wants + char *reason; // reported by plugin, can be NULL (optional) + + usec_t last_state_update; + + struct module *module; +}; + +struct module +{ + pthread_mutex_t lock; + char *name; + enum module_type type; + + struct configurable_plugin *plugin; + + // module config + enum set_config_result (*set_config_cb)(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg); + dyncfg_config_t (*get_config_cb)(void *usr_ctx, const char *name); + dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx, const char *name); + void *config_cb_usr_ctx; + + DICTIONARY *jobs; + + // jobs config + dyncfg_config_t (*get_job_config_cb)(void *usr_ctx, const char *module_name, const char *job_name); + dyncfg_config_t (*get_job_config_schema_cb)(void *usr_ctx, const char *module_name); + enum set_config_result (*set_job_config_cb)(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg); + enum set_config_result (*delete_job_cb)(void *usr_ctx, const char *module_name, const char *job_name); + void *job_config_cb_usr_ctx; +}; + +struct configurable_plugin { + pthread_mutex_t lock; + char *name; + DICTIONARY *modules; + const char *schema; + + dyncfg_config_t (*get_config_cb)(void *usr_ctx); + dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx); + enum set_config_result (*set_config_cb)(void *usr_ctx, dyncfg_config_t *cfg); + void *cb_usr_ctx; // context for all callbacks (split if needed in future) +}; + +// API to be used by plugins +const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin); +void unregister_plugin(const DICTIONARY_ITEM *plugin); +int register_module(struct configurable_plugin *plugin, struct module *module); + +void report_job_status(struct configurable_plugin *plugin, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason); + +// API to be used by the web server(s) +json_object *get_list_of_plugins_json(); +struct configurable_plugin *get_plugin_by_name(const char *name); + +json_object *get_list_of_modules_json(struct configurable_plugin *plugin); +struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name); + +// helper struct to make interface between internal webserver and h2o same +struct uni_http_response { + int status; + char *content; + size_t content_length; + HTTP_CONTENT_TYPE content_type; + void (*content_free)(void *); +}; + +struct uni_http_response dyn_conf_process_http_request(int method, const char *plugin, const char *module, const char *job_id, void *payload, size_t payload_size); + +// API to be used by main netdata process, initialization and destruction etc. +int dyn_conf_init(void); +void freez_dyncfg(void *ptr); +void *dyncfg_main(void *in); + +#endif //DYN_CONF_H -- cgit v1.2.3