diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /libnetdata/dyn_conf | |
parent | Initial commit. (diff) | |
download | netdata-upstream/1.44.3.tar.xz netdata-upstream/1.44.3.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'libnetdata/dyn_conf')
-rw-r--r-- | libnetdata/dyn_conf/README.md | 188 | ||||
-rw-r--r-- | libnetdata/dyn_conf/dyn_conf.c | 1140 | ||||
-rw-r--r-- | libnetdata/dyn_conf/dyn_conf.h | 237 | ||||
-rw-r--r-- | libnetdata/dyn_conf/tests/sample_test_config.json | 22 | ||||
-rw-r--r-- | libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb | 192 | ||||
-rwxr-xr-x | libnetdata/dyn_conf/tests/test_dyncfg.rb | 266 | ||||
-rwxr-xr-x | libnetdata/dyn_conf/tests/test_plugin/test.plugin | 250 |
7 files changed, 2295 insertions, 0 deletions
diff --git a/libnetdata/dyn_conf/README.md b/libnetdata/dyn_conf/README.md new file mode 100644 index 00000000..17d059b0 --- /dev/null +++ b/libnetdata/dyn_conf/README.md @@ -0,0 +1,188 @@ +# Netdata Dynamic Configuration + +Purpose of Netdata Dynamic Configuration is to allow configuration of select Netdata plugins and options through the +Netdata API and by extension by UI. + +## HTTP API documentation + +### Summary API + +For summary of all jobs and their statuses (for all children that stream to parent) use the following URL: + +| Method | Endpoint | Description | +|:-------:|-------------------------------|------------------------------------------------------------| +| **GET** | `api/v2/job_statuses` | list of Jobs | +| **GET** | `api/v2/job_statuses?grouped` | list of Jobs (hierarchical, grouped by host/plugin/module) | + +### Dyncfg API + +### Top level + +| Method | Endpoint | Description | +|:-------:|------------------|-----------------------------------------| +| **GET** | `/api/v2/config` | registered Plugins (sent DYNCFG_ENABLE) | + +### Plugin level + +| Method | Endpoint | Description | +|:-------:|-----------------------------------|------------------------------| +| **GET** | `/api/v2/config/[plugin]` | Plugin config | +| **PUT** | `/api/v2/config/[plugin]` | update Plugin config | +| **GET** | `/api/v2/config/[plugin]/modules` | Modules registered by Plugin | +| **GET** | `/api/v2/config/[plugin]/schema` | Plugin config schema | + +### Module level + +| Method | Endpoint | Description | +|:-------:|-----------------------------------------------|---------------------------| +| **GET** | `/api/v2/config/<plugin>/[module]` | Module config | +| **PUT** | `/api/v2/config/[plugin]/[module]` | update Module config | +| **GET** | `/api/v2/config/[plugin]/[module]/jobs` | Jobs registered by Module | +| **GET** | `/api/v2/config/[plugin]/[module]/job_schema` | Job config schema | +| **GET** | `/api/v2/config/[plugin]/[module]/schema` | Module config schema | + +### Job level - only for modules where `module_type == job_array` + +| Method | Endpoint | Description | +|:----------:|------------------------------------------|--------------------------------| +| **GET** | `/api/v2/config/[plugin]/[module]/[job]` | Job config | +| **PUT** | `/api/v2/config/[plugin]/[module]/[job]` | update Job config | +| **POST** | `/api/v2/config/[plugin]/[module]/[job]` | create Job | +| **DELETE** | `/api/v2/config/[plugin]/[module]/[job]` | delete Job (created by Dyncfg) | + +## Internal Plugins API + +TBD + +## External Plugins API + +### Commands plugins can use + +#### DYNCFG_ENABLE + +Plugin signifies to agent its ability to use new dynamic config and the name it wishes to use by sending + +``` +DYNCFG_ENABLE [{PLUGIN_NAME}] +``` + +This can be sent only once per lifetime of the plugin (at startup or later) sending it multiple times is considered a +protocol violation and plugin might get terminated. + +After this command is sent the plugin has to be ready to accept all the new commands/keywords related to dynamic +configuration (this command lets agent know this plugin is dyncfg capable and wishes to use dyncfg functionality). + +#### DYNCFG_RESET + +Sending this, will reset the internal state of the agent, considering this a `DYNCFG_ENABLE`. + +``` +DYNCFG_RESET +``` + + +#### DYNCFG_REGISTER_MODULE + +``` +DYNCFG_REGISTER_MODULE {MODULE_NAME} {MODULE_TYPE} +``` + +Module has to choose one of following types at registration: + +- `single` - module itself has configuration but does not accept any jobs *(this is useful mainly for internal netdata + configurable things like webserver etc.)* + +- `job_array` - module itself **can** *(not must)* have configuration and it has an array of jobs which can be added, + modified and deleted. **this is what plugin developer needs in most cases** + +After a module has been registered agent can call `set_module_config`, `get_module_config` and `get_module_config_schema`. + +When `MODULE_TYPE` is `job_array` the agent may also send `set_job_config`, `get_job_config` and `get_job_config_schema`. + +#### DYNCFG_REGISTER_JOB + +The plugin can use `DYNCFG_REGISTER_JOB` to register its own configuration jobs. It should not register jobs configured +via DYNCFG (doing so, the agent will shutdown the plugin). + + +``` +DYNCFG_REGISTER_JOB {MODULE_NAME} {JOB_NAME} {JOB_TYPE} {FLAGS} +``` + +Where: + +- `MODULE_NAME` is the name of the module. +- `JOB_NAME` is the name of the job. +- `JOB_TYPE` is either `stock` or `autodiscovered`. +- `FLAGS`, just send zero. + +#### REPORT_JOB_STATUS + +``` +REPORT_JOB_STATUS {MODULE_NAME} {JOB_NAME} {STATUS} {STATE} ["REASON"] +``` + +Note the REASON parameter is optional and can be entirelly ommited (for example when state is OK there is no need to send any reason). + +Where: + +- `MODULE_NAME` is the name of the module. +- `JOB_NAME` is the name of the job. +- `STATUS` is one of `stopped`, `running`, or `error`. +- `STATE`, just send zero. +- `REASON` is a message describing the status. In case you don't want to send any reason string it is preferable to omit this parameter altogether (as opposed to sending empty string `""`). + + +### Commands plugins must serve + +Once a plugin calls `DYNCFG_ENABLE`, the must be able to handle these calls. + +function|parameters|prerequisites|request payload|response payload| +:---:|:---:|:---:|:---:|:---:| +`set_plugin_config`|none|`DYNCFG_ENABLE`|plugin configuration|none| +`get_plugin_config`|none|`DYNCFG_ENABLE`|none|plugin configuration| +`get_plugin_config_schema`|none|`DYNCFG_ENABLE`|none|plugin configuration schema| +`set_module_config`|`module_name`|`DYNCFG_REGISTER_MODULE`|module configuration|none| +`get_module_config`|`module_name`|`DYNCFG_REGISTER_MODULE`|none|module configuration| +`get_module_config_schema`|`module_name`|`DYNCFG_REGISTER_MODULE`|none|module configuration schema| +`set_job_config`|`module_name`, `job_name`|`DYNCFG_REGISTER_MODULE`|job configuration|none| +`get_job_config`|`module_name`, `job_name`|`DYNCFG_REGISTER_MODULE`|none|job configuration| +`get_job_config_schema`|`module_name`, `job_name`|`DYNCFG_REGISTER_MODULE`|none|job configuration schema| + +All of them work like this: + +If the request payload is `none`, then the request looks like this: + +```bash +FUNCTION {TRANSACTION_UUID} {TIMEOUT_SECONDS} "{function} {parameters}" +``` + +When there is payload, the request looks like this: + +```bash +FUNCTION_PAYLOAD {TRANSACTION_UUID} {TIMEOUT_SECONDS} "{function} {parameters}" +<payload> +FUNCTION_PAYLOAD_END +``` + +In all cases, the response is like this: + +```bash +FUNCTION_RESULT_BEGIN {TRANSACTION_UUID} {HTTP_RESPONSE_CODE} "{CONTENT_TYPE}" {EXPIRATION_TIMESTAMP} +<payload> +FUNCTION_RESULT_END +``` +Where: +- `TRANSACTION_UUID` is the same UUID received with the request. +- `HTTP_RESPONSE_CODE` is either `0` (rejected) or `1` (accepted). +- `CONTENT_TYPE` should reflect the `payload` returned. +- `EXPIRATION_TIMESTAMP` can be zero. + + +## DYNCFG with streaming + +When above commands are transferred trough streaming additionally `plugin_name` is prefixed as first parameter. This is +done to allow routing to appropriate plugin @child. + +As a plugin developer you don't need to concern yourself with this detail as that parameter is stripped when sent to the +plugin *(and added when sent trough streaming)* automagically. diff --git a/libnetdata/dyn_conf/dyn_conf.c b/libnetdata/dyn_conf/dyn_conf.c new file mode 100644 index 00000000..ee4b4733 --- /dev/null +++ b/libnetdata/dyn_conf/dyn_conf.c @@ -0,0 +1,1140 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "dyn_conf.h" + +#define DYN_CONF_PATH_MAX (4096) +#define DYN_CONF_DIR VARLIB_DIR "/dynconf" + +#define DYN_CONF_JOB_SCHEMA "job_schema" +#define DYN_CONF_SCHEMA "schema" +#define DYN_CONF_MODULE_LIST "modules" +#define DYN_CONF_JOB_LIST "jobs" +#define DYN_CONF_CFG_EXT ".cfg" + +void job_flags_wallkthrough(dyncfg_job_flg_t flags, void (*cb)(const char *str, void *data), void *data) +{ + if (flags & JOB_FLG_PS_LOADED) + cb("JOB_FLG_PS_LOADED", data); + if (flags & JOB_FLG_PLUGIN_PUSHED) + cb("JOB_FLG_PLUGIN_PUSHED", data); + if (flags & JOB_FLG_STREAMING_PUSHED) + cb("JOB_FLG_STREAMING_PUSHED", data); + if (flags & JOB_FLG_USER_CREATED) + cb("JOB_FLG_USER_CREATED", data); +} + +struct deferred_cfg_send { + DICTIONARY *plugins_dict; + char *plugin_name; + char *module_name; + char *job_name; + struct deferred_cfg_send *next; +}; + +bool dyncfg_shutdown = false; +struct deferred_cfg_send *deferred_configs = NULL; +pthread_mutex_t deferred_configs_lock = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t deferred_configs_cond = PTHREAD_COND_INITIALIZER; + +static void deferred_config_free(struct deferred_cfg_send *dcs) +{ + freez(dcs->plugin_name); + freez(dcs->module_name); + freez(dcs->job_name); + freez(dcs); +} + +static void deferred_config_push_back(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name) +{ + struct deferred_cfg_send *deferred = callocz(1, sizeof(struct deferred_cfg_send)); + deferred->plugin_name = strdupz(plugin_name); + if (module_name != NULL) { + deferred->module_name = strdupz(module_name); + if (job_name != NULL) + deferred->job_name = strdupz(job_name); + } + deferred->plugins_dict = plugins_dict; + pthread_mutex_lock(&deferred_configs_lock); + if (dyncfg_shutdown) { + pthread_mutex_unlock(&deferred_configs_lock); + deferred_config_free(deferred); + return; + } + struct deferred_cfg_send *last = deferred_configs; + if (last == NULL) + deferred_configs = deferred; + else { + while (last->next != NULL) + last = last->next; + last->next = deferred; + } + pthread_cond_signal(&deferred_configs_cond); + pthread_mutex_unlock(&deferred_configs_lock); +} + +static void deferred_configs_unlock() +{ + dyncfg_shutdown = true; + // if we get cancelled in pthread_cond_wait + // we will arrive at cancelled cleanup handler + // with mutex locked we need to unlock it + pthread_mutex_unlock(&deferred_configs_lock); +} + +static struct deferred_cfg_send *deferred_config_pop(void *ptr) +{ + pthread_mutex_lock(&deferred_configs_lock); + while (deferred_configs == NULL) { + netdata_thread_cleanup_push(deferred_configs_unlock, ptr); + pthread_cond_wait(&deferred_configs_cond, &deferred_configs_lock); + netdata_thread_cleanup_pop(0); + } + struct deferred_cfg_send *deferred = deferred_configs; + deferred_configs = deferred_configs->next; + pthread_mutex_unlock(&deferred_configs_lock); + return deferred; +} + +static int _get_list_of_plugins_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +{ + UNUSED(item); + json_object *obj = (json_object *)data; + struct configurable_plugin *plugin = (struct configurable_plugin *)entry; + + json_object *plugin_name = json_object_new_string(plugin->name); + json_object_array_add(obj, plugin_name); + + return 0; +} + +json_object *get_list_of_plugins_json(DICTIONARY *plugins_dict) +{ + json_object *obj = json_object_new_array(); + + dictionary_walkthrough_read(plugins_dict, _get_list_of_plugins_json_cb, obj); + + return obj; +} + +static int _get_list_of_modules_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +{ + UNUSED(item); + json_object *obj = (json_object *)data; + struct module *module = (struct module *)entry; + + json_object *json_module = json_object_new_object(); + + json_object *json_item = json_object_new_string(module->name); + json_object_object_add(json_module, "name", json_item); + const char *module_type = module_type2str(module->type); + json_item = json_object_new_string(module_type); + json_object_object_add(json_module, "type", json_item); + + json_object_array_add(obj, json_module); + + return 0; +} + +json_object *get_list_of_modules_json(struct configurable_plugin *plugin) +{ + json_object *obj = json_object_new_array(); + + pthread_mutex_lock(&plugin->lock); + + dictionary_walkthrough_read(plugin->modules, _get_list_of_modules_json_cb, obj); + + pthread_mutex_unlock(&plugin->lock); + + return obj; +} + +const char *job_status2str(enum job_status status) +{ + switch (status) { + case JOB_STATUS_UNKNOWN: + return "unknown"; + case JOB_STATUS_STOPPED: + return "stopped"; + case JOB_STATUS_RUNNING: + return "running"; + case JOB_STATUS_ERROR: + return "error"; + default: + return "unknown"; + } +} + +static void _job_flags2str_cb(const char *str, void *data) +{ + json_object *json_item = json_object_new_string(str); + json_object_array_add((json_object *)data, json_item); +} + +json_object *job2json(struct job *job) { + json_object *json_job = json_object_new_object(); + + json_object *json_item = json_object_new_string(job->name); + json_object_object_add(json_job, "name", json_item); + + json_item = json_object_new_string(job_type2str(job->type)); + json_object_object_add(json_job, "type", json_item); + + netdata_mutex_lock(&job->lock); + json_item = json_object_new_string(job_status2str(job->status)); + json_object_object_add(json_job, "status", json_item); + + json_item = json_object_new_int(job->state); + json_object_object_add(json_job, "state", json_item); + + json_item = job->reason == NULL ? NULL : json_object_new_string(job->reason); + json_object_object_add(json_job, "reason", json_item); + + int64_t last_state_update_s = job->last_state_update / USEC_PER_SEC; + int64_t last_state_update_us = job->last_state_update % USEC_PER_SEC; + + json_item = json_object_new_int64(last_state_update_s); + json_object_object_add(json_job, "last_state_update_s", json_item); + + json_item = json_object_new_int64(last_state_update_us); + json_object_object_add(json_job, "last_state_update_us", json_item); + + json_item = json_object_new_array(); + job_flags_wallkthrough(job->flags, _job_flags2str_cb, json_item); + json_object_object_add(json_job, "flags", json_item); + + netdata_mutex_unlock(&job->lock); + + return json_job; +} + +static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data) +{ + UNUSED(item); + json_object *obj = (json_object *)data; + + json_object *json_job = job2json((struct job *)entry); + + json_object_array_add(obj, json_job); + + return 0; +} + +json_object *get_list_of_jobs_json(struct module *module) +{ + json_object *obj = json_object_new_array(); + + pthread_mutex_lock(&module->lock); + + dictionary_walkthrough_read(module->jobs, _get_list_of_jobs_json_cb, obj); + + pthread_mutex_unlock(&module->lock); + + return obj; +} + +struct job *get_job_by_name(struct module *module, const char *job_name) +{ + return dictionary_get(module->jobs, job_name); +} + +void unlink_job(const char *plugin_name, const char *module_name, const char *job_name) +{ + // as we are going to do unlink here we better make sure we have all to build proper path + if (unlikely(job_name == NULL || module_name == NULL || plugin_name == NULL)) + return; + BUFFER *buffer = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(buffer, DYN_CONF_DIR "/%s/%s/%s" DYN_CONF_CFG_EXT, plugin_name, module_name, job_name); + if (unlink(buffer_tostring(buffer))) + netdata_log_error("Cannot remove file %s", buffer_tostring(buffer)); + + buffer_free(buffer); +} + +void delete_job(struct configurable_plugin *plugin, const char *module_name, const char *job_name) +{ + struct module *module = get_module_by_name(plugin, module_name); + if (module == NULL) { + error_report("DYNCFG module \"%s\" not found", module_name); + return; + } + + struct job *job_item = get_job_by_name(module, job_name); + if (job_item == NULL) { + error_report("DYNCFG job \"%s\" not found", job_name); + return; + } + + dictionary_del(module->jobs, job_name); +} + +void delete_job_pname(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name) +{ + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name); + if (plugin_item == NULL) { + error_report("DYNCFG plugin \"%s\" not found", plugin_name); + return; + } + struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item); + + delete_job(plugin, module_name, job_name); + + dictionary_acquired_item_release(plugins_dict, plugin_item); +} + +int remove_job(struct module *module, struct job *job) +{ + enum set_config_result rc = module->delete_job_cb(module->job_config_cb_usr_ctx, module->plugin->name, module->name, job->name); + + if (rc != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG module \"%s\" rejected delete job for \"%s\"", module->name, job->name); + return 0; + } + return 1; +} + +struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name) +{ + return dictionary_get(plugin->modules, module_name); +} + +inline struct configurable_plugin *get_plugin_by_name(DICTIONARY *plugins_dict, const char *name) +{ + return dictionary_get(plugins_dict, name); +} + +static int store_config(const char *module_name, const char *submodule_name, const char *cfg_idx, dyncfg_config_t cfg) +{ + BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(filename, DYN_CONF_DIR "/%s", module_name); + if (mkdir(buffer_tostring(filename), 0755) == -1) { + if (errno != EEXIST) { + netdata_log_error("DYNCFG store_config: failed to create module directory %s", buffer_tostring(filename)); + buffer_free(filename); + return 1; + } + } + + if (submodule_name != NULL) { + buffer_sprintf(filename, "/%s", submodule_name); + if (mkdir(buffer_tostring(filename), 0755) == -1) { + if (errno != EEXIST) { + netdata_log_error("DYNCFG store_config: failed to create submodule directory %s", buffer_tostring(filename)); + buffer_free(filename); + return 1; + } + } + } + + if (cfg_idx != NULL) + buffer_sprintf(filename, "/%s", cfg_idx); + + buffer_strcat(filename, DYN_CONF_CFG_EXT); + + + error_report("DYNCFG store_config: %s", buffer_tostring(filename)); + + //write to file + FILE *f = fopen(buffer_tostring(filename), "w"); + if (f == NULL) { + error_report("DYNCFG store_config: failed to open %s for writing", buffer_tostring(filename)); + buffer_free(filename); + return 1; + } + + fwrite(cfg.data, cfg.data_size, 1, f); + fclose(f); + + buffer_free(filename); + return 0; +} + +#ifdef NETDATA_DEV_MODE +#define netdata_dev_fatal(...) fatal(__VA_ARGS__) +#else +#define netdata_dev_fatal(...) error_report(__VA_ARGS__) +#endif + +void dyn_conf_store_config(const char *function, const char *payload, struct configurable_plugin *plugin) { + dyncfg_config_t config = { + .data = (char*)payload, + .data_size = strlen(payload) + }; + + char *fnc = strdupz(function); + // split fnc to words + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(fnc, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + const char *fnc_name = get_word(words, words_c, 0); + if (fnc_name == NULL) { + error_report("Function name expected \"%s\"", function); + goto CLEANUP; + } + if (strncmp(fnc_name, FUNCTION_NAME_SET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_SET_PLUGIN_CONFIG)) == 0) { + store_config(plugin->name, NULL, NULL, config); + goto CLEANUP; + } + + if (words_c < 2) { + error_report("Module name expected \"%s\"", function); + goto CLEANUP; + } + const char *module_name = get_word(words, words_c, 1); + if (strncmp(fnc_name, FUNCTION_NAME_SET_MODULE_CONFIG, strlen(FUNCTION_NAME_SET_MODULE_CONFIG)) == 0) { + store_config(plugin->name, module_name, NULL, config); + goto CLEANUP; + } + + if (words_c < 3) { + error_report("Job name expected \"%s\"", function); + goto CLEANUP; + } + const char *job_name = get_word(words, words_c, 2); + if (strncmp(fnc_name, FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) { + store_config(plugin->name, module_name, job_name, config); + goto CLEANUP; + } + + netdata_dev_fatal("Unknown function \"%s\"", function); + +CLEANUP: + freez(fnc); +} + +dyncfg_config_t load_config(const char *plugin_name, const char *module_name, const char *job_id) +{ + BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(filename, DYN_CONF_DIR "/%s", plugin_name); + if (module_name != NULL) + buffer_sprintf(filename, "/%s", module_name); + + if (job_id != NULL) + buffer_sprintf(filename, "/%s", job_id); + + buffer_strcat(filename, DYN_CONF_CFG_EXT); + + dyncfg_config_t config; + long bytes; + config.data = read_by_filename(buffer_tostring(filename), &bytes); + + if (config.data == NULL) + error_report("DYNCFG load_config: failed to load config from %s", buffer_tostring(filename)); + + config.data_size = bytes; + + buffer_free(filename); + + return config; +} + +char *set_plugin_config(struct configurable_plugin *plugin, dyncfg_config_t cfg) +{ + enum set_config_result rc = plugin->set_config_cb(plugin->cb_usr_ctx, plugin->name, &cfg); + if (rc != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG plugin \"%s\" rejected config", plugin->name); + return "plugin rejected config"; + } + + return NULL; +} + +static char *set_module_config(struct module *mod, dyncfg_config_t cfg) +{ + struct configurable_plugin *plugin = mod->plugin; + + enum set_config_result rc = mod->set_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name, &cfg); + if (rc != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG module \"%s\" rejected config", plugin->name); + return "module rejected config"; + } + + return NULL; +} + +struct job *job_new(const char *job_id) +{ + struct job *job = callocz(1, sizeof(struct job)); + job->state = JOB_STATUS_UNKNOWN; + job->last_state_update = now_realtime_usec(); + job->name = strdupz(job_id); + netdata_mutex_init(&job->lock); + return job; +} + +static inline void job_del(struct job *job) +{ + netdata_mutex_destroy(&job->lock); + freez(job->reason); + freez((void*)job->name); + freez(job); +} + +void job_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) +{ + UNUSED(item); + UNUSED(data); + job_del((struct job *)value); +} + +void module_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) +{ + UNUSED(item); + UNUSED(data); + struct module *mod = (struct module *)value; + dictionary_destroy(mod->jobs); + freez(mod->name); + freez(mod); +} + +const DICTIONARY_ITEM *register_plugin(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, bool localhost) +{ + if (get_plugin_by_name(plugins_dict, plugin->name) != NULL) { + error_report("DYNCFG plugin \"%s\" already registered", plugin->name); + return NULL; + } + + if (plugin->set_config_cb == NULL) { + error_report("DYNCFG plugin \"%s\" has no set_config_cb", plugin->name); + return NULL; + } + + pthread_mutex_init(&plugin->lock, NULL); + + plugin->modules = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); + dictionary_register_delete_callback(plugin->modules, module_del_cb, NULL); + + if (localhost) + deferred_config_push_back(plugins_dict, plugin->name, NULL, NULL); + + dictionary_set(plugins_dict, plugin->name, plugin, sizeof(plugin)); + + // the plugin keeps the pointer to the dictionary item, so we need to acquire it + return dictionary_get_and_acquire_item(plugins_dict, plugin->name); +} + +void unregister_plugin(DICTIONARY *plugins_dict, const DICTIONARY_ITEM *plugin) +{ + struct configurable_plugin *plug = dictionary_acquired_item_value(plugin); + dictionary_acquired_item_release(plugins_dict, plugin); + dictionary_del(plugins_dict, plug->name); +} + +int register_module(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, struct module *module, bool localhost) +{ + if (get_module_by_name(plugin, module->name) != NULL) { + error_report("DYNCFG module \"%s\" already registered", module->name); + return 1; + } + + pthread_mutex_init(&module->lock, NULL); + + if (localhost) + deferred_config_push_back(plugins_dict, plugin->name, module->name, NULL); + + module->plugin = plugin; + + if (module->type == MOD_TYPE_ARRAY) { + module->jobs = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE); + dictionary_register_delete_callback(module->jobs, job_del_cb, NULL); + + if (localhost) { + // load all jobs from disk + BUFFER *path = buffer_create(DYN_CONF_PATH_MAX, NULL); + buffer_sprintf(path, "%s/%s/%s", DYN_CONF_DIR, plugin->name, module->name); + DIR *dir = opendir(buffer_tostring(path)); + if (dir != NULL) { + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + if (ent->d_name[0] == '.') + continue; + if (ent->d_type != DT_REG) + continue; + size_t len = strnlen(ent->d_name, NAME_MAX); + if (len <= strlen(DYN_CONF_CFG_EXT)) + continue; + if (strcmp(ent->d_name + len - strlen(DYN_CONF_CFG_EXT), DYN_CONF_CFG_EXT) != 0) + continue; + ent->d_name[len - strlen(DYN_CONF_CFG_EXT)] = '\0'; + + struct job *job = job_new(ent->d_name); + job->module = module; + job->flags = JOB_FLG_PS_LOADED; + job->type = JOB_TYPE_USER; + + dictionary_set(module->jobs, job->name, job, sizeof(job)); + + deferred_config_push_back(plugins_dict, plugin->name, module->name, ent->d_name); + } + closedir(dir); + } + buffer_free(path); + } + } + + dictionary_set(plugin->modules, module->name, module, sizeof(module)); + + return 0; +} + +int register_job(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_type job_type, dyncfg_job_flg_t flags, int ignore_existing) +{ + int rc = 1; + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name); + if (plugin_item == NULL) { + error_report("plugin \"%s\" not registered", plugin_name); + return rc; + } + struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item); + struct module *mod = get_module_by_name(plugin, module_name); + if (mod == NULL) { + error_report("module \"%s\" not registered", module_name); + goto ERR_EXIT; + } + if (mod->type != MOD_TYPE_ARRAY) { + error_report("module \"%s\" is not an array", module_name); + goto ERR_EXIT; + } + if (get_job_by_name(mod, job_name) != NULL) { + if (!ignore_existing) + error_report("job \"%s\" already registered", job_name); + goto ERR_EXIT; + } + + struct job *job = job_new(job_name); + job->module = mod; + job->flags = flags; + job->type = job_type; + + dictionary_set(mod->jobs, job->name, job, sizeof(job)); + + rc = 0; +ERR_EXIT: + dictionary_acquired_item_release(plugins_dict, plugin_item); + return rc; +} + +void freez_dyncfg(void *ptr) { + freez(ptr); +} + +#ifdef NETDATA_TEST_DYNCFG +static void handle_dyncfg_root(DICTIONARY *plugins_dict, struct uni_http_response *resp, int method) +{ + if (method != HTTP_METHOD_GET) { + resp->content = "method not allowed"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } + json_object *obj = get_list_of_plugins_json(plugins_dict); + json_object *wrapper = json_object_new_object(); + json_object_object_add(wrapper, "configurable_plugins", obj); + resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY)); + json_object_put(wrapper); + resp->status = HTTP_RESP_OK; + resp->content_type = CT_APPLICATION_JSON; + resp->content_free = freez_dyncfg; + resp->content_length = strlen(resp->content); +} + +static void handle_plugin_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, void *post_payload, size_t post_payload_size) +{ + switch(method) { + case HTTP_METHOD_GET: + { + dyncfg_config_t cfg = plugin->get_config_cb(plugin->cb_usr_ctx, plugin->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + case HTTP_METHOD_PUT: + { + char *response; + if (post_payload == NULL) { + resp->content = "no payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + response = set_plugin_config(plugin, cont); + if (response == NULL) { + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + } else { + resp->status = HTTP_RESP_BAD_REQUEST; + resp->content = response; + resp->content_length = strlen(resp->content); + } + return; + } + default: + resp->content = "method not allowed"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } +} +#endif + +void handle_module_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, const char *module, void *post_payload, size_t post_payload_size) +{ + if (strncmp(module, DYN_CONF_SCHEMA, sizeof(DYN_CONF_SCHEMA)) == 0) { + dyncfg_config_t cfg = plugin->get_config_schema_cb(plugin->cb_usr_ctx, plugin->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + if (strncmp(module, DYN_CONF_MODULE_LIST, sizeof(DYN_CONF_MODULE_LIST)) == 0) { + if (method != HTTP_METHOD_GET) { + resp->content = "method not allowed (only GET)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } + json_object *obj = get_list_of_modules_json(plugin); + json_object *wrapper = json_object_new_object(); + json_object_object_add(wrapper, "modules", obj); + resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY)); + json_object_put(wrapper); + resp->status = HTTP_RESP_OK; + resp->content_type = CT_APPLICATION_JSON; + resp->content_free = freez_dyncfg; + resp->content_length = strlen(resp->content); + return; + } + struct module *mod = get_module_by_name(plugin, module); + if (mod == NULL) { + resp->content = "module not found"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_NOT_FOUND; + return; + } + if (method == HTTP_METHOD_GET) { + dyncfg_config_t cfg = mod->get_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } else if (method == HTTP_METHOD_PUT) { + char *response; + if (post_payload == NULL) { + resp->content = "no payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + response = set_module_config(mod, cont); + if (response == NULL) { + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + } else { + resp->status = HTTP_RESP_BAD_REQUEST; + resp->content = response; + resp->content_length = strlen(resp->content); + } + return; + } + resp->content = "method not allowed"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; +} + +static inline void _handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size, struct job *job) +{ + if (method == HTTP_METHOD_POST) { + if (job != NULL) { + resp->content = "can't POST, job already exists (use PUT to update?)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + if (post_payload == NULL) { + resp->content = "no payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + if (mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job_id, &cont)) { + resp->content = "failed to add job"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR; + return; + } + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + return; + } + if (job == NULL) { + resp->content = "job not found"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_NOT_FOUND; + return; + } + switch (method) { + case HTTP_METHOD_GET: + { + dyncfg_config_t cfg = mod->get_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + case HTTP_METHOD_PUT: + { + if (post_payload == NULL) { + resp->content = "missing payload"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_BAD_REQUEST; + return; + } + dyncfg_config_t cont = { + .data = post_payload, + .data_size = post_payload_size + }; + if (mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job->name, &cont) != SET_CONFIG_ACCEPTED) { + error_report("DYNCFG module \"%s\" rejected config for job \"%s\"", mod->name, job->name); + resp->content = "failed to set job config"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR; + return; + } + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + return; + } + case HTTP_METHOD_DELETE: + { + if (!remove_job(mod, job)) { + resp->content = "failed to remove job"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR; + return; + } + resp->status = HTTP_RESP_OK; + resp->content = "OK"; + resp->content_length = strlen(resp->content); + return; + } + default: + resp->content = "method not allowed (only GET, PUT, DELETE)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } +} + +void handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size) +{ + if (strncmp(job_id, DYN_CONF_SCHEMA, sizeof(DYN_CONF_SCHEMA)) == 0) { + dyncfg_config_t cfg = mod->get_config_schema_cb(mod->config_cb_usr_ctx, mod->plugin->name, mod->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + if (strncmp(job_id, DYN_CONF_JOB_SCHEMA, sizeof(DYN_CONF_JOB_SCHEMA)) == 0) { + dyncfg_config_t cfg = mod->get_job_config_schema_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name); + resp->content = mallocz(cfg.data_size); + memcpy(resp->content, cfg.data, cfg.data_size); + resp->status = HTTP_RESP_OK; + resp->content_free = freez_dyncfg; + resp->content_length = cfg.data_size; + return; + } + if (strncmp(job_id, DYN_CONF_JOB_LIST, sizeof(DYN_CONF_JOB_LIST)) == 0) { + if (mod->type != MOD_TYPE_ARRAY) { + resp->content = "module type is not job_array (can't get the list of jobs)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_NOT_FOUND; + return; + } + if (method != HTTP_METHOD_GET) { + resp->content = "method not allowed (only GET)"; + resp->content_length = strlen(resp->content); + resp->status = HTTP_RESP_METHOD_NOT_ALLOWED; + return; + } + json_object *obj = get_list_of_jobs_json(mod); + json_object *wrapper = json_object_new_object(); + json_object_object_add(wrapper, "jobs", obj); + resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY)); + json_object_put(wrapper); + resp->status = HTTP_RESP_OK; + resp->content_type = CT_APPLICATION_JSON; + resp->content_free = freez_dyncfg; + resp->content_length = strlen(resp->content); + return; + } + const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_id); + struct job *job = dictionary_acquired_item_value(job_item); + + _handle_job_root(resp, method, mod, job_id, post_payload, post_payload_size, job); + + dictionary_acquired_item_release(mod->jobs, job_item); +} + +struct uni_http_response dyn_conf_process_http_request( + DICTIONARY *plugins_dict __maybe_unused, + int method __maybe_unused, + const char *plugin __maybe_unused, + const char *module __maybe_unused, + const char *job_id __maybe_unused, + void *post_payload __maybe_unused, + size_t post_payload_size __maybe_unused) +{ + struct uni_http_response resp = { + .status = HTTP_RESP_INTERNAL_SERVER_ERROR, + .content_type = CT_TEXT_PLAIN, + .content = HTTP_RESP_INTERNAL_SERVER_ERROR_STR, + .content_free = NULL, + .content_length = 0 + }; +#ifndef NETDATA_TEST_DYNCFG + resp.content = "DYNCFG is disabled (as it is for now developer only feature). This will be enabled by default when ready for technical preview."; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_PRECOND_FAIL; + return resp; +#else + if (plugin == NULL) { + handle_dyncfg_root(plugins_dict, &resp, method); + return resp; + } + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin); + if (plugin_item == NULL) { + resp.content = "plugin not found"; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_NOT_FOUND; + return resp; + } + struct configurable_plugin *plug = dictionary_acquired_item_value(plugin_item); + if (module == NULL) { + handle_plugin_root(&resp, method, plug, post_payload, post_payload_size); + goto EXIT_PLUGIN; + } + if (job_id == NULL) { + handle_module_root(&resp, method, plug, module, post_payload, post_payload_size); + goto EXIT_PLUGIN; + } + // for modules we do not do get_and_acquire as modules are never removed (only together with the plugin) + struct module *mod = get_module_by_name(plug, module); + if (mod == NULL) { + resp.content = "module not found"; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_NOT_FOUND; + goto EXIT_PLUGIN; + } + if (mod->type != MOD_TYPE_ARRAY) { + resp.content = "400 - this module is not array type"; + resp.content_length = strlen(resp.content); + resp.status = HTTP_RESP_BAD_REQUEST; + goto EXIT_PLUGIN; + } + handle_job_root(&resp, method, mod, job_id, post_payload, post_payload_size); + +EXIT_PLUGIN: + dictionary_acquired_item_release(plugins_dict, plugin_item); + return resp; +#endif +} + +void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data) +{ + UNUSED(item); + UNUSED(data); + struct configurable_plugin *plugin = (struct configurable_plugin *)value; + dictionary_destroy(plugin->modules); + freez(plugin->name); + freez(plugin); +} + +// on failure - return NULL - all unlocked, nothing acquired +// on success - return pointer to job item - keep job and plugin acquired and locked!!! +// for caller convenience (to prevent another lock and races) +// caller is responsible to unlock the job and release it when not needed anymore +// this also avoids dependency creep +const DICTIONARY_ITEM *report_job_status_acq_lock(DICTIONARY *plugins_dict, const DICTIONARY_ITEM **plugin_acq_item, DICTIONARY **job_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason) +{ + *plugin_acq_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name); + if (*plugin_acq_item == NULL) { + netdata_log_error("plugin %s not found", plugin_name); + return NULL; + } + + struct configurable_plugin *plug = dictionary_acquired_item_value(*plugin_acq_item); + struct module *mod = get_module_by_name(plug, module_name); + if (mod == NULL) { + netdata_log_error("module %s not found", module_name); + dictionary_acquired_item_release(plugins_dict, *plugin_acq_item); + return NULL; + } + if (mod->type != MOD_TYPE_ARRAY) { + netdata_log_error("module %s is not array", module_name); + dictionary_acquired_item_release(plugins_dict, *plugin_acq_item); + return NULL; + } + *job_dict = mod->jobs; + const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_name); + if (job_item == NULL) { + netdata_log_error("job %s not found", job_name); + dictionary_acquired_item_release(plugins_dict, *plugin_acq_item); + return NULL; + } + struct job *job = dictionary_acquired_item_value(job_item); + + pthread_mutex_lock(&job->lock); + job->status = status; + job->state = status_code; + if (job->reason != NULL) { + freez(job->reason); + } + job->reason = reason != NULL ? strdupz(reason) : NULL; // reason is optional + job->last_state_update = now_realtime_usec(); + + job->dirty = true; + + // no unlock and acquired_item_release on success on purpose + return job_item; +} + +int dyn_conf_init(void) +{ + if (mkdir(DYN_CONF_DIR, 0755) == -1) { + if (errno != EEXIST) { + netdata_log_error("failed to create directory for dynamic configuration"); + return 1; + } + } + + return 0; +} + +static void dyncfg_cleanup(void *ptr) { + struct netdata_static_thread *static_thread = (struct netdata_static_thread *) ptr; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + netdata_log_info("cleaning up..."); + + pthread_mutex_lock(&deferred_configs_lock); + dyncfg_shutdown = true; + while (deferred_configs != NULL) { + struct deferred_cfg_send *dcs = deferred_configs; + deferred_configs = dcs->next; + deferred_config_free(dcs); + } + pthread_mutex_unlock(&deferred_configs_lock); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +void *dyncfg_main(void *ptr) +{ + netdata_thread_cleanup_push(dyncfg_cleanup, ptr); + + while (!netdata_exit) { + struct deferred_cfg_send *dcs = deferred_config_pop(ptr); + DICTIONARY *plugins_dict = dcs->plugins_dict; +#ifdef NETDATA_INTERNAL_CHECKS + if (plugins_dict == NULL) { + fatal("DYNCFG, plugins_dict is NULL"); + deferred_config_free(dcs); + continue; + } +#endif + + const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, dcs->plugin_name); + if (plugin_item == NULL) { + error_report("DYNCFG, plugin %s not found", dcs->plugin_name); + deferred_config_free(dcs); + continue; + } + struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item); + if (dcs->module_name == NULL) { + dyncfg_config_t cfg = load_config(dcs->plugin_name, NULL, NULL); + if (cfg.data != NULL) { + plugin->set_config_cb(plugin->cb_usr_ctx, plugin->name, &cfg); + freez(cfg.data); + } + } else if (dcs->job_name == NULL) { + dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, NULL); + if (cfg.data != NULL) { + struct module *mod = get_module_by_name(plugin, dcs->module_name); + mod->set_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name, &cfg); + freez(cfg.data); + } + } else { + dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, dcs->job_name); + if (cfg.data != NULL) { + struct module *mod = get_module_by_name(plugin, dcs->module_name); + mod->set_job_config_cb(mod->job_config_cb_usr_ctx, plugin->name, mod->name, dcs->job_name, &cfg); + freez(cfg.data); + } + } + deferred_config_free(dcs); + dictionary_acquired_item_release(plugins_dict, plugin_item); + } + + netdata_thread_cleanup_pop(1); + return NULL; +} + +bool is_dyncfg_function(const char *function_name, uint8_t type) { + // TODO add hash to speed things up + if (type & (DYNCFG_FUNCTION_TYPE_GET | DYNCFG_FUNCTION_TYPE_REGULAR)) { + if (strncmp(function_name, FUNCTION_NAME_GET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_GET_PLUGIN_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_MODULE_CONFIG, strlen(FUNCTION_NAME_GET_MODULE_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_JOB_CONFIG, strlen(FUNCTION_NAME_GET_JOB_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA)) == 0) + return true; + } + + if (type & (DYNCFG_FUNCTION_TYPE_SET | DYNCFG_FUNCTION_TYPE_PAYLOAD)) { + if (strncmp(function_name, FUNCTION_NAME_SET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_SET_PLUGIN_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_SET_MODULE_CONFIG, strlen(FUNCTION_NAME_SET_MODULE_CONFIG)) == 0) + return true; + if (strncmp(function_name, FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) + return true; + } + + if (type & (DYNCFG_FUNCTION_TYPE_DELETE | DYNCFG_FUNCTION_TYPE_REGULAR)) { + if (strncmp(function_name, FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) + return true; + } + + return false; +} diff --git a/libnetdata/dyn_conf/dyn_conf.h b/libnetdata/dyn_conf/dyn_conf.h new file mode 100644 index 00000000..d584343b --- /dev/null +++ b/libnetdata/dyn_conf/dyn_conf.h @@ -0,0 +1,237 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef DYN_CONF_H +#define DYN_CONF_H + +#include "../libnetdata.h" + +#define FUNCTION_NAME_GET_PLUGIN_CONFIG "get_plugin_config" +#define FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA "get_plugin_config_schema" +#define FUNCTION_NAME_GET_MODULE_CONFIG "get_module_config" +#define FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA "get_module_config_schema" +#define FUNCTION_NAME_GET_JOB_CONFIG "get_job_config" +#define FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA "get_job_config_schema" +#define FUNCTION_NAME_SET_PLUGIN_CONFIG "set_plugin_config" +#define FUNCTION_NAME_SET_MODULE_CONFIG "set_module_config" +#define FUNCTION_NAME_SET_JOB_CONFIG "set_job_config" +#define FUNCTION_NAME_DELETE_JOB "delete_job" + +#define DYNCFG_MAX_WORDS 5 + +#define DYNCFG_VFNC_RET_CFG_ACCEPTED (1) + +enum module_type { + MOD_TYPE_UNKNOWN = 0, + MOD_TYPE_ARRAY, + MOD_TYPE_SINGLE +}; + +static inline enum module_type str2_module_type(const char *type_name) +{ + if (strcmp(type_name, "job_array") == 0) + return MOD_TYPE_ARRAY; + else if (strcmp(type_name, "single") == 0) + return MOD_TYPE_SINGLE; + return MOD_TYPE_UNKNOWN; +} + +static inline const char *module_type2str(enum module_type type) +{ + switch (type) { + case MOD_TYPE_ARRAY: + return "job_array"; + case MOD_TYPE_SINGLE: + return "single"; + default: + return "unknown"; + } +} + +struct dyncfg_config { + void *data; + size_t data_size; +}; + +typedef struct dyncfg_config dyncfg_config_t; + +struct configurable_plugin; +struct module; + +enum job_status { + JOB_STATUS_UNKNOWN = 0, // State used until plugin reports first status + JOB_STATUS_STOPPED, + JOB_STATUS_RUNNING, + JOB_STATUS_ERROR +}; + +static inline enum job_status str2job_state(const char *state_name) { + if (strcmp(state_name, "stopped") == 0) + return JOB_STATUS_STOPPED; + else if (strcmp(state_name, "running") == 0) + return JOB_STATUS_RUNNING; + else if (strcmp(state_name, "error") == 0) + return JOB_STATUS_ERROR; + return JOB_STATUS_UNKNOWN; +} + +const char *job_status2str(enum job_status status); + +enum set_config_result { + SET_CONFIG_ACCEPTED = 0, + SET_CONFIG_REJECTED, + SET_CONFIG_DEFFER +}; + +typedef uint32_t dyncfg_job_flg_t; +enum job_flags { + JOB_FLG_PS_LOADED = 1 << 0, // PS abbr. Persistent Storage + JOB_FLG_PLUGIN_PUSHED = 1 << 1, // got it from plugin (e.g. autodiscovered job) + JOB_FLG_STREAMING_PUSHED = 1 << 2, // got it through streaming + JOB_FLG_USER_CREATED = 1 << 3, // user created this job during agent runtime +}; + +enum job_type { + JOB_TYPE_UNKNOWN = 0, + JOB_TYPE_STOCK = 1, + JOB_TYPE_USER = 2, + JOB_TYPE_AUTODISCOVERED = 3, +}; + +static inline const char* job_type2str(enum job_type type) +{ + switch (type) { + case JOB_TYPE_STOCK: + return "stock"; + case JOB_TYPE_USER: + return "user"; + case JOB_TYPE_AUTODISCOVERED: + return "autodiscovered"; + case JOB_TYPE_UNKNOWN: + default: + return "unknown"; + } +} + +static inline enum job_type dyncfg_str2job_type(const char *type_name) +{ + if (strcmp(type_name, "stock") == 0) + return JOB_TYPE_STOCK; + else if (strcmp(type_name, "user") == 0) + return JOB_TYPE_USER; + else if (strcmp(type_name, "autodiscovered") == 0) + return JOB_TYPE_AUTODISCOVERED; + error_report("Unknown job type: %s", type_name); + return JOB_TYPE_UNKNOWN; +} + +struct job +{ + const char *name; + enum job_type type; + struct module *module; + + pthread_mutex_t lock; + // lock protexts only fields below (which are modified during job existence) + // others are static during lifetime of job + + int dirty; // this relates to rrdpush, true if parent has different data than us + + // state reported by plugin + usec_t last_state_update; + enum job_status status; // reported by plugin, enum as this has to be interpreted by UI + int state; // code reported by plugin which can mean anything plugin wants + char *reason; // reported by plugin, can be NULL (optional) + + dyncfg_job_flg_t flags; +}; + +struct module +{ + pthread_mutex_t lock; + char *name; + enum module_type type; + + struct configurable_plugin *plugin; + + // module config + enum set_config_result (*set_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg); + dyncfg_config_t (*get_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name); + dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx, const char *plugin_name, const char *module_name); + void *config_cb_usr_ctx; + + DICTIONARY *jobs; + + // jobs config + dyncfg_config_t (*get_job_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name); + dyncfg_config_t (*get_job_config_schema_cb)(void *usr_ctx, const char *plugin_name, const char *module_name); + enum set_config_result (*set_job_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg); + enum set_config_result (*delete_job_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name); + void *job_config_cb_usr_ctx; +}; + +struct configurable_plugin { + pthread_mutex_t lock; + char *name; + DICTIONARY *modules; + const char *schema; + + dyncfg_config_t (*get_config_cb)(void *usr_ctx, const char *plugin_name); + dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx, const char *plugin_name); + enum set_config_result (*set_config_cb)(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg); + void *cb_usr_ctx; // context for all callbacks (split if needed in future) +}; + +// API to be used by plugins +const DICTIONARY_ITEM *register_plugin(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, bool localhost); +void unregister_plugin(DICTIONARY *plugins_dict, const DICTIONARY_ITEM *plugin); +int register_module(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, struct module *module, bool localhost); +int register_job(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_type job_type, dyncfg_job_flg_t flags, int ignore_existing); + +const DICTIONARY_ITEM *report_job_status_acq_lock(DICTIONARY *plugins_dict, const DICTIONARY_ITEM **plugin_acq_item, DICTIONARY **job_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason); + +void dyn_conf_store_config(const char *function, const char *payload, struct configurable_plugin *plugin); +void unlink_job(const char *plugin_name, const char *module_name, const char *job_name); +void delete_job(struct configurable_plugin *plugin, const char *module_name, const char *job_name); +void delete_job_pname(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name); + +// API to be used by the web server(s) +json_object *get_list_of_plugins_json(DICTIONARY *plugins_dict); +struct configurable_plugin *get_plugin_by_name(DICTIONARY *plugins_dict, const char *name); + +json_object *get_list_of_modules_json(struct configurable_plugin *plugin); +struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name); + +json_object *job2json(struct job *job); + +// helper struct to make interface between internal webserver and h2o same +struct uni_http_response { + int status; + char *content; + size_t content_length; + HTTP_CONTENT_TYPE content_type; + void (*content_free)(void *); +}; + +struct uni_http_response dyn_conf_process_http_request(DICTIONARY *plugins_dict, int method, const char *plugin, const char *module, const char *job_id, void *payload, size_t payload_size); + +// API to be used by main netdata process, initialization and destruction etc. +int dyn_conf_init(void); +void freez_dyncfg(void *ptr); + +#define dyncfg_dictionary_create() dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE) + +void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data); + +void *dyncfg_main(void *in); + +#define DYNCFG_FUNCTION_TYPE_REGULAR (1 << 0) +#define DYNCFG_FUNCTION_TYPE_PAYLOAD (1 << 1) +#define DYNCFG_FUNCTION_TYPE_GET (1 << 2) +#define DYNCFG_FUNCTION_TYPE_SET (1 << 3) +#define DYNCFG_FUNCTION_TYPE_DELETE (1 << 4) +#define DYNCFG_FUNCTION_TYPE_ALL \ + (DYNCFG_FUNCTION_TYPE_REGULAR | DYNCFG_FUNCTION_TYPE_PAYLOAD | DYNCFG_FUNCTION_TYPE_GET | DYNCFG_FUNCTION_TYPE_SET | DYNCFG_FUNCTION_TYPE_DELETE) + +bool is_dyncfg_function(const char *function_name, uint8_t type); + +#endif //DYN_CONF_H diff --git a/libnetdata/dyn_conf/tests/sample_test_config.json b/libnetdata/dyn_conf/tests/sample_test_config.json new file mode 100644 index 00000000..a6595f12 --- /dev/null +++ b/libnetdata/dyn_conf/tests/sample_test_config.json @@ -0,0 +1,22 @@ +{ + "http_endpoints": { + "parent": { + "host": "127.0.0.1", + "mguid": null, + "port": 20001, + "ssl": false + }, + "child": { + "host": "127.0.0.1", + "mguid": "3bc2f7de-1445-11ee-9ed7-3c7c3f21784c", + "port": 19999, + "ssl": false + } + }, + "global": { + "test_plugin_name": "external_plugin", + "test_array_module_name": "module_of_the_future", + "test_single_module_name": "module_of_the_future_single_type", + "test_job_name": "fixed_job" + } +} diff --git a/libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb b/libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb new file mode 100644 index 00000000..820db77f --- /dev/null +++ b/libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb @@ -0,0 +1,192 @@ +class ParentChildTest + @@plugin_cfg = <<~HEREDOC +{ "test" : "true" } +HEREDOC + @@plugin_cfg2 = <<~HEREDOC +{ "asdfgh" : "asdfgh" } +HEREDOC + + @@job_cfg = <<~HEREDOC +{ "i am newly created job" : "true" } +HEREDOC + + def initialize + @parent = $config[:http_endpoints][:parent] + @child = $config[:http_endpoints][:child] + @plugin = $config[:global][:test_plugin_name] + @arry_mod = $config[:global][:test_array_module_name] + @single_mod = $config[:global][:test_single_module_name] + @test_job = $config[:global][:test_job_name] + end + def check_test_plugin_modules_list(host, child = nil) + rc = DynCfgHttpClient.get_plugin_module_list(host, @plugin, child) + assert_eq(rc.code, 200, "as HTTP code for get_module_list request on plugin \"#{@plugin}\"") + modules = nil + assert_nothing_raised do + modules = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(modules, :modules) + assert_eq(modules[:modules].count, 2, "as number of modules in plugin \"#{@plugin}\"") + modules[:modules].each do |m| + assert_has_key?(m, :name) + assert_has_key?(m, :type) + assert_is_one_of(m[:type], "job_array", "single") + end + assert_eq_str(modules[:modules][0][:name], @arry_mod, "name of first module in plugin \"#{@plugin}\"") + assert_eq_str(modules[:modules][1][:name], @single_mod, "name of second module in plugin \"#{@plugin}\"") + end + def run + TEST_SUITE("Parent/Child plugin config") + + TEST("parent/child/get_plugin_list", "Get child (hops:1) plugin list trough parent") + plugins = DynCfgHttpClient.get_plugin_list(@parent, @child) + assert_eq(plugins.code, 200, "as HTTP code for get_plugin_list request") + assert_nothing_raised do + plugins = JSON.parse(plugins.parsed_response, symbolize_names: true) + end + assert_has_key?(plugins, :configurable_plugins) + assert_array_include?(plugins[:configurable_plugins], @plugin) + PASS() + + TEST("parent/child/(set/get)plugin_config", "Set then get and compare child (hops:1) plugin config trough parent") + rc = DynCfgHttpClient.set_plugin_config(@parent, @plugin, @@plugin_cfg, @child) + assert_eq(rc.code, 200, "as HTTP code for set_plugin_config request") + + rc = DynCfgHttpClient.get_plugin_config(@parent, @plugin, @child) + assert_eq(rc.code, 200, "as HTTP code for get_plugin_config request") + assert_eq_str(rc.parsed_response.chomp!, @@plugin_cfg, "as plugin config") + + # We do this twice with different configs to ensure first config was not loaded from persistent storage (from previous tests) + rc = DynCfgHttpClient.set_plugin_config(@parent, @plugin, @@plugin_cfg2, @child) + assert_eq(rc.code, 200, "as HTTP code for set_plugin_config request 2") + + rc = DynCfgHttpClient.get_plugin_config(@parent, @plugin, @child) + assert_eq(rc.code, 200, "as HTTP code for get_plugin_config request 2") + assert_eq_str(rc.parsed_response.chomp!, @@plugin_cfg2, "set/get plugin config 2") + PASS() + + TEST("child/get_plugin_config", "Get child (hops:0) plugin config and compare with what we got trough parent (set_plugin_config from previous test)") + rc = DynCfgHttpClient.get_plugin_config(@child, @plugin, nil) + assert_eq(rc.code, 200, "as HTTP code for get_plugin_config request") + assert_eq_str(rc.parsed_response.chomp!, @@plugin_cfg2.chomp, "as plugin config") + PASS() + + TEST("parent/child/plugin_module_list", "Get child (hops:1) plugin module list trough parent and check its contents") + check_test_plugin_modules_list(@parent, @child) + PASS() + + TEST("child/plugin_module_list", "Get child (hops:0) plugin module list directly and check its contents") + check_test_plugin_modules_list(@child, nil) + PASS() + + TEST("parent/child/module/jobs", "Get list of jobs from child (hops:1) trough parent and check its contents, check job updates") + rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child) + assert_eq(rc.code, 200, "as HTTP code for get_jobs request") + jobs = nil + assert_nothing_raised do + jobs = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(jobs, :jobs) + new_job = jobs[:jobs].find {|i| i[:name] == @test_job} + assert_not_nil(new_job) + assert_has_key?(new_job, :status) + assert_not_eq_str(new_job[:status], "unknown", "job status is other than unknown") + assert_has_key?(new_job, :flags) + assert_array_include?(new_job[:flags], "JOB_FLG_STREAMING_PUSHED") + PASS() + + TEST("child/module/jobs", "Get list of jobs direct from child (hops:0) and check its contents, check job updates") + rc = DynCfgHttpClient.get_job_list(@child, @plugin, @arry_mod, nil) + assert_eq(rc.code, 200, "as HTTP code for get_jobs request") + jobs = nil + assert_nothing_raised do + jobs = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(jobs, :jobs) + new_job = jobs[:jobs].find {|i| i[:name] == @test_job} + assert_not_nil(new_job) + assert_has_key?(new_job, :status) + assert_not_eq_str(new_job[:status], "unknown", "job status is other than unknown") + assert_has_key?(new_job, :flags) + + assert_array_not_include?(new_job[:flags], "JOB_FLG_STREAMING_PUSHED") # this is plugin directly at child so it should not show this flag + PASS() + + TEST("parent/child/single_module/jobs", "Attempt getting list of jobs from child (hops:1) trough parent on single module. Check it fails properly") + rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @single_mod, @child) + assert_eq(rc.code, 400, "as HTTP code for get_jobs request") + assert_eq_str(rc.parsed_response, '400 - this module is not array type', "as HTTP code for get_jobs request on single module") + PASS() + + created_job = SecureRandom.uuid + TEST("parent/child/module/cr_del_job", "Create and delete job on child (hops:1) trough parent") + # create new job + rc = DynCfgHttpClient.create_job(@parent, @plugin, @arry_mod, created_job, @@job_cfg, @child) + assert_eq_http_code(rc, 200, "as HTTP code for create_job request") + # check this job is in job list @parent + rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child) + assert_eq(rc.code, 200, "as HTTP code for get_jobs request") + jobs = nil + assert_nothing_raised do + jobs = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(jobs, :jobs) + new_job = jobs[:jobs].find {|i| i[:name] == created_job} + assert_not_nil(new_job) + # check this job is in job list @child + rc = DynCfgHttpClient.get_job_list(@child, @plugin, @arry_mod, nil) + assert_eq(rc.code, 200, "as HTTP code for get_jobs request") + jobs = nil + assert_nothing_raised do + jobs = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(jobs, :jobs) + new_job = jobs[:jobs].find {|i| i[:name] == created_job} + assert_not_nil(new_job) + # check we can get job config back + rc = DynCfgHttpClient.get_job_config(@parent, @plugin, @arry_mod, created_job, @child) + assert_eq(rc.code, 200, "as HTTP code for get_job_config request") + assert_eq_str(rc.parsed_response.chomp!, @@job_cfg, "as job config") + # delete job + rc = DynCfgHttpClient.delete_job(@parent, @plugin, @arry_mod, created_job, @child) + assert_eq(rc.code, 200, "as HTTP code for delete_job request") + # Check it is not in parents job list anymore + rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child) + assert_eq(rc.code, 200, "as HTTP code for get_jobs request") + jobs = nil + assert_nothing_raised do + jobs = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(jobs, :jobs) + new_job = jobs[:jobs].find {|i| i[:name] == created_job} + assert_nil(new_job) + # Check it is not in childs job list anymore + rc = DynCfgHttpClient.get_job_list(@child, @plugin, @arry_mod, nil) + assert_eq(rc.code, 200, "as HTTP code for get_jobs request") + jobs = nil + assert_nothing_raised do + jobs = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(jobs, :jobs) + new_job = jobs[:jobs].find {|i| i[:name] == created_job} + assert_nil(new_job) + PASS() + + TEST("parent/child/module/del_undeletable_job", "Try delete job on child (child rejects), check failure case works (hops:1)") + # test if plugin rejects job deletion the job still remains in list as it should + rc = DynCfgHttpClient.delete_job(@parent, @plugin, @arry_mod, @test_job, @child) + assert_eq(rc.code, 500, "as HTTP code for delete_job request") + rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child) + assert_eq(rc.code, 200, "as HTTP code for get_jobs request") + jobs = nil + assert_nothing_raised do + jobs = JSON.parse(rc.parsed_response, symbolize_names: true) + end + assert_has_key?(jobs, :jobs) + job = jobs[:jobs].find {|i| i[:name] == @test_job} + assert_not_nil(job) + PASS() + end +end + +ParentChildTest.new.run() diff --git a/libnetdata/dyn_conf/tests/test_dyncfg.rb b/libnetdata/dyn_conf/tests/test_dyncfg.rb new file mode 100755 index 00000000..1b4b3a06 --- /dev/null +++ b/libnetdata/dyn_conf/tests/test_dyncfg.rb @@ -0,0 +1,266 @@ +#!/usr/bin/env ruby + +require 'json' +require 'httparty' +require 'pastel' +require 'securerandom' + +ARGV.length == 1 or raise "Usage: #{$0} <config file>" +config_file = ARGV[0] + +File.exist?(config_file) or raise "File not found: #{config_file}" + +$config = JSON.parse(File.read(config_file), symbolize_names: true) + +$plugin_name = $config[:global][:test_plugin_name] +$pastel = Pastel.new + +class TestRunner + attr_reader :stats + def initialize + @stats = { + :suites => 0, + :tests => 0, + :assertions => 0 + } + @test = nil + end + def add_assertion() + @stats[:assertions] += 1 + end + def FAIL(msg, exception = nil, loc = nil) + puts $pastel.red.bold(" ✕ FAIL") + STDERR.print " " + if loc + STDERR.print $pastel.yellow("@#{loc.path}:#{loc.lineno}: ") + else + STDERR.print $pastel.yellow("@#{caller_locations(1, 1).first.path}:#{caller_locations(1, 1).first.lineno}: ") + end + STDERR.puts msg + STDERR.puts exception.full_message(:highlight => true) if exception + STDERR.puts $pastel.yellow(" Backtrace:") + caller.each do |line| + STDERR.puts " #{line}" + end + exit 1 + end + def PASS() + STDERR.puts $pastel.green.bold(" ✓ PASS") + @stats[:tests] += 1 + @test = nil + end + def TEST_SUITE(name) + puts $pastel.bold("• TEST SUITE: \"#{name}\"") + @stats[:suites] += 1 + end + def assert_no_test_running() + unless @test.nil? + STDERR.puts $pastel.red("\nFATAL: Test \"#{@test}\" did not call PASS() or FAIL()!") + exit 1 + end + end + def TEST(name, description = nil) + assert_no_test_running() + @test = name + col = 0 + txt = " ├─ T: #{name} " + col += txt.length + print $pastel.bold(txt) + + tab = 50 + rem = tab - (col % tab) + rem.times do putc ' ' end + col += rem + + if (description) + txt = " - #{description} " + col += txt.length + print txt + + tab = 180 + rem = tab - (col % tab) + rem.times do putc '.' end + end + end + def FINALIZE() + assert_no_test_running() + end +end + +$test_runner = TestRunner.new +def FAIL(msg, exception = nil, loc = nil) + $test_runner.FAIL(msg, exception, loc) +end +def PASS() + $test_runner.PASS() +end +def TEST_SUITE(name) + $test_runner.TEST_SUITE(name) +end +def TEST(name, description = nil) + $test_runner.TEST(name, description) +end + +def assert_eq(got, expected, msg = nil) + unless got == expected + FAIL("Expected #{expected}, got #{got} #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_eq_http_code(got, expected, msg = nil) + unless got.code == expected + FAIL("Expected #{expected}, got #{got}. Server \"#{got.parsed_response}\" #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_eq_str(got, expected, msg = nil) + unless got == expected + FAIL("Strings do not match #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_not_eq_str(got, expected, msg = nil) + unless got != expected + FAIL("Strings shoud not match #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_nothing_raised() + begin + yield + rescue Exception => e + FAIL("Unexpected exception of type #{e.class} raised. Msg: \"#{e.message}\"", e, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_has_key?(hash, key) + unless hash.has_key?(key) + FAIL("Expected key \"#{key}\" in hash", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_array_include?(array, value) + unless array.include?(value) + FAIL("Expected array to include \"#{value}\"", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_array_not_include?(array, value) + if array.include?(value) + FAIL("Expected array to not include \"#{value}\"", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_is_one_of(value, *values) + unless values.include?(value) + FAIL("Expected value to be one of #{values.join(", ")}", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_not_nil(value) + if value.nil? + FAIL("Expected value to not be nil", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end +def assert_nil(value) + unless value.nil? + FAIL("Expected value to be nil", nil, caller_locations(1, 1).first) + end + $test_runner.add_assertion() +end + + +class DynCfgHttpClient + def self.protocol(cfg) + return cfg[:ssl] ? 'https://' : 'http://' + end + def self.url_base(host) + return "#{protocol(host)}#{host[:host]}:#{host[:port]}" + end + def self.get_url_cfg_base(host, child = nil) + url = url_base(host) + url += "/host/#{child[:mguid]}" if child + url += "/api/v2/config" + return url + end + def self.get_url_cfg_plugin(host, plugin, child = nil) + return get_url_cfg_base(host, child) + '/' + plugin + end + def self.get_url_cfg_module(host, plugin, mod, child = nil) + return get_url_cfg_plugin(host, plugin, child) + '/' + mod + end + def self.get_url_cfg_job(host, plugin, mod, job_id, child = nil) + return get_url_cfg_module(host, plugin, mod, child) + "/#{job_id}" + end + def self.get_plugin_list(host, child = nil) + begin + return HTTParty.get(get_url_cfg_base(host, child), verify: false, format: :plain) + rescue => e + FAIL(e.message, e) + end + end + def self.get_plugin_config(host, plugin, child = nil) + begin + return HTTParty.get(get_url_cfg_plugin(host, plugin, child), verify: false) + rescue => e + FAIL(e.message, e) + end + end + def self.set_plugin_config(host, plugin, cfg, child = nil) + begin + return HTTParty.put(get_url_cfg_plugin(host, plugin, child), verify: false, body: cfg) + rescue => e + FAIL(e.message, e) + end + end + def self.get_plugin_module_list(host, plugin, child = nil) + begin + return HTTParty.get(get_url_cfg_plugin(host, plugin, child) + "/modules", verify: false, format: :plain) + rescue => e + FAIL(e.message, e) + end + end + def self.get_job_list(host, plugin, mod, child = nil) + begin + return HTTParty.get(get_url_cfg_module(host, plugin, mod, child) + "/jobs", verify: false, format: :plain) + rescue => e + FAIL(e.message, e) + end + end + def self.create_job(host, plugin, mod, job_id, job_cfg, child = nil) + begin + return HTTParty.post(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false, body: job_cfg) + rescue => e + FAIL(e.message, e) + end + end + def self.delete_job(host, plugin, mod, job_id, child = nil) + begin + return HTTParty.delete(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false) + rescue => e + FAIL(e.message, e) + end + end + def self.get_job_config(host, plugin, mod, job_id, child = nil) + begin + return HTTParty.get(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false, format: :plain) + rescue => e + FAIL(e.message, e) + end + end + def self.set_job_config(host, plugin, mod, job_id, job_cfg, child = nil) + begin + return HTTParty.put(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false, body: job_cfg) + rescue => e + FAIL(e.message, e) + end + end +end + +require_relative 'sub_tests/test_parent_child.rb' + +$test_runner.FINALIZE() +puts $pastel.green.bold("All tests passed!") +puts ("Total #{$test_runner.stats[:assertions]} assertions, #{$test_runner.stats[:tests]} tests in #{$test_runner.stats[:suites]} suites") +exit 0 diff --git a/libnetdata/dyn_conf/tests/test_plugin/test.plugin b/libnetdata/dyn_conf/tests/test_plugin/test.plugin new file mode 100755 index 00000000..b890ab31 --- /dev/null +++ b/libnetdata/dyn_conf/tests/test_plugin/test.plugin @@ -0,0 +1,250 @@ +#!/usr/bin/env ruby + +# bogus chart that we create just so there is at least one chart +CHART_TYPE = 'lines' +UPDATE_EVERY = 1 +PRIORITY = 100000 +CHART_NAME = 'number_of_processes' +DIMENSION_NAME = 'running' + +$plugin_name = "external_plugin" +$plugin_version = "0.0.1" +$plugin_config = <<-HEREDOC +test_plugin_config +hableba hableba hableba +HEREDOC + +$array_module_name = 'module_of_the_future' +$fixed_job_name = 'fixed_job' + +$modules = { + $array_module_name => { + :type => :job_array, + :jobs => { + $fixed_job_name => { + :type => :fixed, + :config => <<-HEREDOC +fixed_job_config +HEREDOC + }, + }, + :config => <<-HEREDOC +module_of_the_future_config +HEREDOC + }, + "module_of_the_future_single_type" => { + :type => :single, + :jobs => {}, + :config => <<-HEREDOC +module_of_the_future_single_type_config +HEREDOC + } +} + +def out(str) + $log.puts "2 NETDATA> #{str}" + $stdout.puts str + $stdout.flush + $log.flush +end + +def log(str) + $log.puts "LOG > #{str}" + $log.flush +end + +#TODO this is AI code, verify +def split_with_quotes(str) + result = [] + current_word = "" + in_quotes = false + escaped = false + + str.each_char do |char| + if char == '\\' && !escaped + escaped = true + next + end + + if char == '"' && !escaped + in_quotes = !in_quotes + current_word << char + elsif char == ' ' && !in_quotes + result << current_word unless current_word.empty? + current_word = "" + else + current_word << char + end + + escaped = false + end + + result << current_word unless current_word.empty? + + result +end + + +def print_startup_messages + out "DYNCFG_ENABLE #{$plugin_name}" + $modules.each do |name, module_config| + out "DYNCFG_REGISTER_MODULE #{name} #{module_config[:type]}" + end + out "CHART system.#{CHART_NAME} '' 'Number of running processes' 'processes' processes processes.#{CHART_NAME} #{CHART_TYPE} #{PRIORITY} #{UPDATE_EVERY}" + out "DIMENSION #{DIMENSION_NAME} '' absolute 1 1" + + $modules.each do |mod_name, mod| + next unless mod[:type] == :job_array + mod[:jobs].each do |job_name, job| + next unless job[:type] == :fixed + out "DYNCFG_REGISTER_JOB #{mod_name} #{job_name} stock 0" + out "REPORT_JOB_STATUS #{$array_module_name} #{$fixed_job_name} running 0" + end + end +end + +def function_result(txid, msg, result) + out "FUNCTION_RESULT_BEGIN #{txid} #{result} text/plain 5" + out msg + out "FUNCTION_RESULT_END" +end + +def process_payload_function(params) + log "payload function #{params[:fncname]}, #{params[:fncparams]}" + fnc_name, mod_name, job_name = params[:fncparams] + case fnc_name + when 'set_plugin_config' + $plugin_config = params[:payload] + function_result(params[:txid], "plugin config set", 1) + when 'set_module_config' + mod = $modules[mod_name] + return function_result(params[:txid], "no such module", 0) if mod.nil? + mod[:config] = params[:payload] + function_result(params[:txid], "module config set", 1) + when 'set_job_config' + mod = $modules[mod_name] + return function_result(params[:txid], "no such module", 0) if mod.nil? + job = mod[:jobs][job_name] + if job.nil? + job = Hash.new if job.nil? + job[:type] = :dynamic + mod[:jobs][job_name] = job + end + job[:config] = params[:payload] + function_result(params[:txid], "job config set", 1) + end +end + +def process_function(params) + log "normal function #{params[:fncname]}, #{params[:fncparams]}" + fnc_name, mod_name, job_name = params[:fncparams] + case fnc_name + when 'get_plugin_config' + function_result(params[:txid], $plugin_config, 1) + when 'get_module_config' + return function_result(params[:txid], "no such module", 0) unless $modules.has_key?(mod_name) + function_result(params[:txid], $modules[mod_name][:config], 1) + when 'get_job_config' + mod = $modules[mod_name] + return function_result(params[:txid], "no such module", 0) if mod.nil? + job = mod[:jobs][job_name] + return function_result(params[:txid], "no such job", 0) if job.nil? + function_result(params[:txid], job[:config], 1) + when 'delete_job' + mod = $modules[mod_name] + return function_result(params[:txid], "no such module", 0) if mod.nil? + job = mod[:jobs][job_name] + return function_result(params[:txid], "no such job", 0) if job.nil? + if job[:type] == :fixed + return function_result(params[:txid], "this job can't be deleted", 0) + else + mod[:jobs].delete(job_name) + function_result(params[:txid], "job deleted", 1) + end + end +end + +$inflight_incoming = nil +def process_input(input) + words = split_with_quotes(input) + + unless $inflight_incoming.nil? + if input == "FUNCTION_PAYLOAD_END" + log $inflight_incoming[:payload] + process_payload_function($inflight_incoming) + $inflight_incoming = nil + else + $inflight_incoming[:payload] << input + $inflight_incoming[:payload] << "\n" + end + return + end + + case words[0] + when "FUNCTION", "FUNCTION_PAYLOAD" + params = {} + params[:command] = words[0] + params[:txid] = words[1] + params[:timeout] = words[2].to_i + params[:fncname] = words[3] + params[:fncname] = params[:fncname][1..-2] if params[:fncname].start_with?('"') && params[:fncname].end_with?('"') + if params[:command] == "FUNCTION_PAYLOAD" + $inflight_incoming = Hash.new + params[:fncparams] = split_with_quotes(params[:fncname]) + params[:fncname] = params[:fncparams][0] + $inflight_incoming[:txid] = params[:txid] + $inflight_incoming[:fncname] = params[:fncname] + $inflight_incoming[:params] = params + $inflight_incoming[:fncparams] = params[:fncparams] + $inflight_incoming[:payload] = "" + else + params[:fncparams] = split_with_quotes(params[:fncname]) + params[:fncname] = params[:fncparams][0] + process_function(params) + end + end +end + +def read_and_output_metric + processes = `ps -e | wc -l`.to_i - 1 # -1 to exclude the header line + timestamp = Time.now.to_i + + puts "BEGIN system.#{CHART_NAME}" + puts "SET #{DIMENSION_NAME} = #{processes}" + puts "END" +end + +def the_main + $stderr.reopen("/tmp/test_plugin_err.log", "w") + $log = File.open("/tmp/test_plugin.log", "w") + $log.puts "Starting plugin" + print_startup_messages + $log.puts "init done" + $log.flush + + last_metric_time = Time.now + + loop do + time_since_last_metric = Time.now - last_metric_time + + # If it's been more than 1 second since we collected metrics, collect them now + if time_since_last_metric >= 1 + read_and_output_metric + last_metric_time = Time.now + end + + # Use select to wait for input, but only wait up to the time remaining until we need to collect metrics again + remaining_time = [1 - time_since_last_metric, 0].max + if select([$stdin], nil, nil, remaining_time) + input = $stdin.gets + next if input.class != String + input.chomp! + $log.puts "RAW INPUT< #{input}" + $log.flush + process_input(input) + end + end +end + + +the_main if __FILE__ == $PROGRAM_NAME |