summaryrefslogtreecommitdiffstats
path: root/libnetdata/dyn_conf
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--libnetdata/dyn_conf/README.md167
-rw-r--r--libnetdata/dyn_conf/dyn_conf.c503
-rw-r--r--libnetdata/dyn_conf/dyn_conf.h147
-rw-r--r--libnetdata/dyn_conf/tests/sample_test_config.json22
-rw-r--r--libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb192
-rwxr-xr-xlibnetdata/dyn_conf/tests/test_dyncfg.rb266
-rwxr-xr-xlibnetdata/dyn_conf/tests/test_plugin/test.plugin250
7 files changed, 1368 insertions, 179 deletions
diff --git a/libnetdata/dyn_conf/README.md b/libnetdata/dyn_conf/README.md
new file mode 100644
index 000000000..6c8127400
--- /dev/null
+++ b/libnetdata/dyn_conf/README.md
@@ -0,0 +1,167 @@
+# Netdata Dynamic Configuration
+
+Purpose of Netdata Dynamic Configuration is to allow configuration of select Netdata plugins and options through the
+Netdata API and by extension by UI.
+
+## HTTP API documentation
+
+### Summary API
+
+For summary of all jobs and their statuses (for all children that stream to parent) use the following URL:
+
+| Method | Endpoint | Description |
+|:-------:|-------------------------------|------------------------------------------------------------|
+| **GET** | `api/v2/job_statuses` | list of Jobs |
+| **GET** | `api/v2/job_statuses?grouped` | list of Jobs (hierarchical, grouped by host/plugin/module) |
+
+### Dyncfg API
+
+### Top level
+
+| Method | Endpoint | Description |
+|:-------:|------------------|-----------------------------------------|
+| **GET** | `/api/v2/config` | registered Plugins (sent DYNCFG_ENABLE) |
+
+### Plugin level
+
+| Method | Endpoint | Description |
+|:-------:|-----------------------------------|------------------------------|
+| **GET** | `/api/v2/config/[plugin]` | Plugin config |
+| **PUT** | `/api/v2/config/[plugin]` | update Plugin config |
+| **GET** | `/api/v2/config/[plugin]/modules` | Modules registered by Plugin |
+| **GET** | `/api/v2/config/[plugin]/schema` | Plugin config schema |
+
+### Module level
+
+| Method | Endpoint | Description |
+|:-------:|-----------------------------------------------|---------------------------|
+| **GET** | `/api/v2/config/<plugin>/[module]` | Module config |
+| **PUT** | `/api/v2/config/[plugin]/[module]` | update Module config |
+| **GET** | `/api/v2/config/[plugin]/[module]/jobs` | Jobs registered by Module |
+| **GET** | `/api/v2/config/[plugin]/[module]/job_schema` | Job config schema |
+| **GET** | `/api/v2/config/[plugin]/[module]/schema` | Module config schema |
+
+### Job level - only for modules where `module_type == job_array`
+
+| Method | Endpoint | Description |
+|:----------:|------------------------------------------|--------------------------------|
+| **GET** | `/api/v2/config/[plugin]/[module]/[job]` | Job config |
+| **PUT** | `/api/v2/config/[plugin]/[module]/[job]` | update Job config |
+| **POST** | `/api/v2/config/[plugin]/[module]/[job]` | create Job |
+| **DELETE** | `/api/v2/config/[plugin]/[module]/[job]` | delete Job (created by Dyncfg) |
+
+## AGENT<->PLUGIN interface documentation
+
+### 1. DYNCFG_ENABLE
+
+Plugin signifies to agent its ability to use new dynamic config and the name it wishes to use by sending
+
+```
+plugin->agent:
+=============
+DYNCFG_ENABLE [plugin_url_name]
+```
+
+This can be sent only once per lifetime of the plugin (at startup or later) sending it multiple times is considered a
+protocol violation and plugin might get terminated.
+After this command is sent the plugin has to be ready to accept all the new commands/keywords related to dynamic
+configuration (this command lets agent know this plugin is dyncfg capable and wishes to use dyncfg functionality).
+
+After this command agent can call
+
+```
+agent->plugin:
+=============
+FUNCTION_PAYLOAD [UUID] 1 "set_plugin_config"
+the new configuration
+blah blah blah
+FUNCTION_PAYLOAD_END
+
+plugin->agent:
+=============
+FUNCTION_RESULT_BEGIN [UUID] [(1/0)(accept/reject)] [text/plain] 5
+FUNCTION_RESULT_END
+```
+
+to set the new config which can be accepted/rejected by plugin by sending answer for this FUNCTION as it would with any
+other regular function.
+
+The new `FUNCTION_PAYLOAD` command differs from regular `FUNCTION` command exclusively in its ability to send bigger
+payloads (configuration file contents) to the plugin (not just parameters list).
+
+Agent can also call (after `DYNCFG_ENABLE`)
+
+```
+Agent->plugin:
+=============
+FUNCTION [UID] 1 "get_plugin_config"
+
+Plugin->agent:
+=============
+FUNCTION_RESULT_BEGIN [UID] 1 text/plain 5
+{
+ "the currently used config from plugin" : "nice"
+}
+FUNCTION_RESULT_END
+```
+
+and
+
+```
+Agent->plugin:
+=============
+FUNCTION [UID] 1 "get_plugin_config_schema"
+
+Plugin->agent:
+=============
+FUNCTION_RESULT_BEGIN [UID] 1 text/plain 5
+{
+ "the schema of plugin configuration" : "splendid"
+}
+FUNCTION_RESULT_END
+```
+
+Plugin can also register zero, one or more configurable modules using:
+
+```
+plugin->agent:
+=============
+DYNCFG_REGISTER_MODULE [module_url_name] (job_array|single)
+```
+
+modules can be added any time during plugins lifetime (you are not required to add them all at startup).
+
+### 2. DYNCFG_REGISTER_MODULE
+
+Module has to choose one of following types at registration:
+
+- `single` - module itself has configuration but does not accept any jobs *(this is useful mainly for internal netdata
+ configurable things like webserver etc.)*
+- `job_array` - module itself **can** *(not must)* have configuration and it has an array of jobs which can be added,
+ modified and deleted. **this is what plugin developer needs in most cases**
+
+After module has been registered agent can call
+
+- `set_module_config [module]` FUNCTION_PAYLOAD
+- `get_module_config [module]` FUNCTION
+- `get_module_config_schema [module]` FUNCTION
+
+with same syntax as `set_plugin_config` and `get_plugin_config`. In case of `set` command the plugin has ability to
+reject the new configuration pushed to it.
+
+In a case the module was registered as `job_array` type following commands can be used to manage jobs:
+
+### 3. Job interface for job_array modules
+
+- `get_job_config_schema [module]` - FUNCTION
+- `get_job_config [module] [job]` - FUNCTION
+- `set_job_config [module] [job]` - FUNCTION_PAYLOAD
+- `delete_job_name [module] [job]` - FUNCTION
+
+### 4. Streaming
+
+When above commands are transferred trough streaming additionally `plugin_name` is prefixed as first parameter. This is
+done to allow routing to appropriate plugin @child.
+
+As a plugin developer you don't need to concern yourself with this detail as that parameter is stripped when sent to the
+plugin *(and added when sent trough streaming)* automagically.
diff --git a/libnetdata/dyn_conf/dyn_conf.c b/libnetdata/dyn_conf/dyn_conf.c
index 00289fdf5..ee4b4733a 100644
--- a/libnetdata/dyn_conf/dyn_conf.c
+++ b/libnetdata/dyn_conf/dyn_conf.c
@@ -3,7 +3,7 @@
#include "dyn_conf.h"
#define DYN_CONF_PATH_MAX (4096)
-#define DYN_CONF_DIR VARLIB_DIR "/etc"
+#define DYN_CONF_DIR VARLIB_DIR "/dynconf"
#define DYN_CONF_JOB_SCHEMA "job_schema"
#define DYN_CONF_SCHEMA "schema"
@@ -11,9 +11,20 @@
#define DYN_CONF_JOB_LIST "jobs"
#define DYN_CONF_CFG_EXT ".cfg"
-DICTIONARY *plugins_dict = NULL;
+void job_flags_wallkthrough(dyncfg_job_flg_t flags, void (*cb)(const char *str, void *data), void *data)
+{
+ if (flags & JOB_FLG_PS_LOADED)
+ cb("JOB_FLG_PS_LOADED", data);
+ if (flags & JOB_FLG_PLUGIN_PUSHED)
+ cb("JOB_FLG_PLUGIN_PUSHED", data);
+ if (flags & JOB_FLG_STREAMING_PUSHED)
+ cb("JOB_FLG_STREAMING_PUSHED", data);
+ if (flags & JOB_FLG_USER_CREATED)
+ cb("JOB_FLG_USER_CREATED", data);
+}
struct deferred_cfg_send {
+ DICTIONARY *plugins_dict;
char *plugin_name;
char *module_name;
char *job_name;
@@ -33,7 +44,7 @@ static void deferred_config_free(struct deferred_cfg_send *dcs)
freez(dcs);
}
-static void deferred_config_push_back(const char *plugin_name, const char *module_name, const char *job_name)
+static void deferred_config_push_back(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name)
{
struct deferred_cfg_send *deferred = callocz(1, sizeof(struct deferred_cfg_send));
deferred->plugin_name = strdupz(plugin_name);
@@ -42,6 +53,7 @@ static void deferred_config_push_back(const char *plugin_name, const char *modul
if (job_name != NULL)
deferred->job_name = strdupz(job_name);
}
+ deferred->plugins_dict = plugins_dict;
pthread_mutex_lock(&deferred_configs_lock);
if (dyncfg_shutdown) {
pthread_mutex_unlock(&deferred_configs_lock);
@@ -95,7 +107,7 @@ static int _get_list_of_plugins_json_cb(const DICTIONARY_ITEM *item, void *entry
return 0;
}
-json_object *get_list_of_plugins_json()
+json_object *get_list_of_plugins_json(DICTIONARY *plugins_dict)
{
json_object *obj = json_object_new_array();
@@ -114,18 +126,7 @@ static int _get_list_of_modules_json_cb(const DICTIONARY_ITEM *item, void *entry
json_object *json_item = json_object_new_string(module->name);
json_object_object_add(json_module, "name", json_item);
- const char *module_type;
- switch (module->type) {
- case MOD_TYPE_SINGLE:
- module_type = "single";
- break;
- case MOD_TYPE_ARRAY:
- module_type = "job_array";
- break;
- default:
- module_type = "unknown";
- break;
- }
+ const char *module_type = module_type2str(module->type);
json_item = json_object_new_string(module_type);
json_object_object_add(json_module, "type", json_item);
@@ -163,17 +164,31 @@ const char *job_status2str(enum job_status status)
}
}
-static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data)
+static void _job_flags2str_cb(const char *str, void *data)
{
- UNUSED(item);
- json_object *obj = (json_object *)data;
- struct job *job = (struct job *)entry;
+ json_object *json_item = json_object_new_string(str);
+ json_object_array_add((json_object *)data, json_item);
+}
+json_object *job2json(struct job *job) {
json_object *json_job = json_object_new_object();
+
json_object *json_item = json_object_new_string(job->name);
json_object_object_add(json_job, "name", json_item);
+
+ json_item = json_object_new_string(job_type2str(job->type));
+ json_object_object_add(json_job, "type", json_item);
+
+ netdata_mutex_lock(&job->lock);
json_item = json_object_new_string(job_status2str(job->status));
+ json_object_object_add(json_job, "status", json_item);
+
+ json_item = json_object_new_int(job->state);
json_object_object_add(json_job, "state", json_item);
+
+ json_item = job->reason == NULL ? NULL : json_object_new_string(job->reason);
+ json_object_object_add(json_job, "reason", json_item);
+
int64_t last_state_update_s = job->last_state_update / USEC_PER_SEC;
int64_t last_state_update_us = job->last_state_update % USEC_PER_SEC;
@@ -183,6 +198,22 @@ static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, v
json_item = json_object_new_int64(last_state_update_us);
json_object_object_add(json_job, "last_state_update_us", json_item);
+ json_item = json_object_new_array();
+ job_flags_wallkthrough(job->flags, _job_flags2str_cb, json_item);
+ json_object_object_add(json_job, "flags", json_item);
+
+ netdata_mutex_unlock(&job->lock);
+
+ return json_job;
+}
+
+static int _get_list_of_jobs_json_cb(const DICTIONARY_ITEM *item, void *entry, void *data)
+{
+ UNUSED(item);
+ json_object *obj = (json_object *)data;
+
+ json_object *json_job = job2json((struct job *)entry);
+
json_object_array_add(obj, json_job);
return 0;
@@ -206,24 +237,59 @@ struct job *get_job_by_name(struct module *module, const char *job_name)
return dictionary_get(module->jobs, job_name);
}
-int remove_job(struct module *module, struct job *job)
+void unlink_job(const char *plugin_name, const char *module_name, const char *job_name)
{
// as we are going to do unlink here we better make sure we have all to build proper path
- if (unlikely(job->name == NULL || module == NULL || module->name == NULL || module->plugin == NULL || module->plugin->name == NULL))
- return 0;
+ if (unlikely(job_name == NULL || module_name == NULL || plugin_name == NULL))
+ return;
+ BUFFER *buffer = buffer_create(DYN_CONF_PATH_MAX, NULL);
+ buffer_sprintf(buffer, DYN_CONF_DIR "/%s/%s/%s" DYN_CONF_CFG_EXT, plugin_name, module_name, job_name);
+ if (unlink(buffer_tostring(buffer)))
+ netdata_log_error("Cannot remove file %s", buffer_tostring(buffer));
- enum set_config_result rc = module->delete_job_cb(module->job_config_cb_usr_ctx, module->name, job->name);
+ buffer_free(buffer);
+}
+
+void delete_job(struct configurable_plugin *plugin, const char *module_name, const char *job_name)
+{
+ struct module *module = get_module_by_name(plugin, module_name);
+ if (module == NULL) {
+ error_report("DYNCFG module \"%s\" not found", module_name);
+ return;
+ }
+
+ struct job *job_item = get_job_by_name(module, job_name);
+ if (job_item == NULL) {
+ error_report("DYNCFG job \"%s\" not found", job_name);
+ return;
+ }
+
+ dictionary_del(module->jobs, job_name);
+}
+
+void delete_job_pname(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name)
+{
+ const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name);
+ if (plugin_item == NULL) {
+ error_report("DYNCFG plugin \"%s\" not found", plugin_name);
+ return;
+ }
+ struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item);
+
+ delete_job(plugin, module_name, job_name);
+
+ dictionary_acquired_item_release(plugins_dict, plugin_item);
+}
+
+int remove_job(struct module *module, struct job *job)
+{
+ enum set_config_result rc = module->delete_job_cb(module->job_config_cb_usr_ctx, module->plugin->name, module->name, job->name);
if (rc != SET_CONFIG_ACCEPTED) {
error_report("DYNCFG module \"%s\" rejected delete job for \"%s\"", module->name, job->name);
return 0;
}
-
- BUFFER *buffer = buffer_create(DYN_CONF_PATH_MAX, NULL);
- buffer_sprintf(buffer, DYN_CONF_DIR "/%s/%s/%s" DYN_CONF_CFG_EXT, module->plugin->name, module->name, job->name);
- unlink(buffer_tostring(buffer));
- buffer_free(buffer);
- return dictionary_del(module->jobs, job->name);
+ return 1;
}
struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name)
@@ -231,7 +297,7 @@ struct module *get_module_by_name(struct configurable_plugin *plugin, const char
return dictionary_get(plugin->modules, module_name);
}
-inline struct configurable_plugin *get_plugin_by_name(const char *name)
+inline struct configurable_plugin *get_plugin_by_name(DICTIONARY *plugins_dict, const char *name)
{
return dictionary_get(plugins_dict, name);
}
@@ -282,6 +348,59 @@ static int store_config(const char *module_name, const char *submodule_name, con
return 0;
}
+#ifdef NETDATA_DEV_MODE
+#define netdata_dev_fatal(...) fatal(__VA_ARGS__)
+#else
+#define netdata_dev_fatal(...) error_report(__VA_ARGS__)
+#endif
+
+void dyn_conf_store_config(const char *function, const char *payload, struct configurable_plugin *plugin) {
+ dyncfg_config_t config = {
+ .data = (char*)payload,
+ .data_size = strlen(payload)
+ };
+
+ char *fnc = strdupz(function);
+ // split fnc to words
+ char *words[DYNCFG_MAX_WORDS];
+ size_t words_c = quoted_strings_splitter(fnc, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
+
+ const char *fnc_name = get_word(words, words_c, 0);
+ if (fnc_name == NULL) {
+ error_report("Function name expected \"%s\"", function);
+ goto CLEANUP;
+ }
+ if (strncmp(fnc_name, FUNCTION_NAME_SET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_SET_PLUGIN_CONFIG)) == 0) {
+ store_config(plugin->name, NULL, NULL, config);
+ goto CLEANUP;
+ }
+
+ if (words_c < 2) {
+ error_report("Module name expected \"%s\"", function);
+ goto CLEANUP;
+ }
+ const char *module_name = get_word(words, words_c, 1);
+ if (strncmp(fnc_name, FUNCTION_NAME_SET_MODULE_CONFIG, strlen(FUNCTION_NAME_SET_MODULE_CONFIG)) == 0) {
+ store_config(plugin->name, module_name, NULL, config);
+ goto CLEANUP;
+ }
+
+ if (words_c < 3) {
+ error_report("Job name expected \"%s\"", function);
+ goto CLEANUP;
+ }
+ const char *job_name = get_word(words, words_c, 2);
+ if (strncmp(fnc_name, FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) {
+ store_config(plugin->name, module_name, job_name, config);
+ goto CLEANUP;
+ }
+
+ netdata_dev_fatal("Unknown function \"%s\"", function);
+
+CLEANUP:
+ freez(fnc);
+}
+
dyncfg_config_t load_config(const char *plugin_name, const char *module_name, const char *job_id)
{
BUFFER *filename = buffer_create(DYN_CONF_PATH_MAX, NULL);
@@ -310,16 +429,12 @@ dyncfg_config_t load_config(const char *plugin_name, const char *module_name, co
char *set_plugin_config(struct configurable_plugin *plugin, dyncfg_config_t cfg)
{
- enum set_config_result rc = plugin->set_config_cb(plugin->cb_usr_ctx, &cfg);
+ enum set_config_result rc = plugin->set_config_cb(plugin->cb_usr_ctx, plugin->name, &cfg);
if (rc != SET_CONFIG_ACCEPTED) {
error_report("DYNCFG plugin \"%s\" rejected config", plugin->name);
return "plugin rejected config";
}
- if (store_config(plugin->name, NULL, NULL, cfg)) {
- error_report("DYNCFG could not store config for module \"%s\"", plugin->name);
- return "could not store config on disk";
- }
return NULL;
}
@@ -327,62 +442,38 @@ static char *set_module_config(struct module *mod, dyncfg_config_t cfg)
{
struct configurable_plugin *plugin = mod->plugin;
- enum set_config_result rc = mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg);
+ enum set_config_result rc = mod->set_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name, &cfg);
if (rc != SET_CONFIG_ACCEPTED) {
error_report("DYNCFG module \"%s\" rejected config", plugin->name);
return "module rejected config";
}
- if (store_config(plugin->name, mod->name, NULL, cfg)) {
- error_report("DYNCFG could not store config for module \"%s\"", mod->name);
- return "could not store config on disk";
- }
-
return NULL;
}
-struct job *job_new()
+struct job *job_new(const char *job_id)
{
struct job *job = callocz(1, sizeof(struct job));
job->state = JOB_STATUS_UNKNOWN;
job->last_state_update = now_realtime_usec();
+ job->name = strdupz(job_id);
+ netdata_mutex_init(&job->lock);
return job;
}
-static int set_job_config(struct job *job, dyncfg_config_t cfg)
+static inline void job_del(struct job *job)
{
- struct module *mod = job->module;
- enum set_config_result rt = mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name, &cfg);
-
- if (rt != SET_CONFIG_ACCEPTED) {
- error_report("DYNCFG module \"%s\" rejected config for job \"%s\"", mod->name, job->name);
- return 1;
- }
-
- if (store_config(mod->plugin->name, mod->name, job->name, cfg)) {
- error_report("DYNCFG could not store config for module \"%s\"", mod->name);
- return 1;
- }
-
- return 0;
+ netdata_mutex_destroy(&job->lock);
+ freez(job->reason);
+ freez((void*)job->name);
+ freez(job);
}
-struct job *add_job(struct module *mod, const char *job_id, dyncfg_config_t cfg)
+void job_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
{
- struct job *job = job_new();
- job->name = strdupz(job_id);
- job->module = mod;
-
- if (set_job_config(job, cfg)) {
- freez(job->name);
- freez(job);
- return NULL;
- }
-
- dictionary_set(mod->jobs, job->name, job, sizeof(job));
-
- return job;
-
+ UNUSED(item);
+ UNUSED(data);
+ job_del((struct job *)value);
}
void module_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
@@ -395,10 +486,9 @@ void module_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
freez(mod);
}
-
-const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin)
+const DICTIONARY_ITEM *register_plugin(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, bool localhost)
{
- if (get_plugin_by_name(plugin->name) != NULL) {
+ if (get_plugin_by_name(plugins_dict, plugin->name) != NULL) {
error_report("DYNCFG plugin \"%s\" already registered", plugin->name);
return NULL;
}
@@ -413,7 +503,8 @@ const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin)
plugin->modules = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE);
dictionary_register_delete_callback(plugin->modules, module_del_cb, NULL);
- deferred_config_push_back(plugin->name, NULL, NULL);
+ if (localhost)
+ deferred_config_push_back(plugins_dict, plugin->name, NULL, NULL);
dictionary_set(plugins_dict, plugin->name, plugin, sizeof(plugin));
@@ -421,24 +512,14 @@ const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin)
return dictionary_get_and_acquire_item(plugins_dict, plugin->name);
}
-void unregister_plugin(const DICTIONARY_ITEM *plugin)
+void unregister_plugin(DICTIONARY *plugins_dict, const DICTIONARY_ITEM *plugin)
{
struct configurable_plugin *plug = dictionary_acquired_item_value(plugin);
dictionary_acquired_item_release(plugins_dict, plugin);
dictionary_del(plugins_dict, plug->name);
}
-void job_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
-{
- UNUSED(item);
- UNUSED(data);
- struct job *job = (struct job *)value;
- freez(job->reason);
- freez(job->name);
- freez(job);
-}
-
-int register_module(struct configurable_plugin *plugin, struct module *module)
+int register_module(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, struct module *module, bool localhost)
{
if (get_module_by_name(plugin, module->name) != NULL) {
error_report("DYNCFG module \"%s\" already registered", module->name);
@@ -447,7 +528,8 @@ int register_module(struct configurable_plugin *plugin, struct module *module)
pthread_mutex_init(&module->lock, NULL);
- deferred_config_push_back(plugin->name, module->name, NULL);
+ if (localhost)
+ deferred_config_push_back(plugins_dict, plugin->name, module->name, NULL);
module->plugin = plugin;
@@ -455,34 +537,38 @@ int register_module(struct configurable_plugin *plugin, struct module *module)
module->jobs = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE);
dictionary_register_delete_callback(module->jobs, job_del_cb, NULL);
- // load all jobs from disk
- BUFFER *path = buffer_create(DYN_CONF_PATH_MAX, NULL);
- buffer_sprintf(path, "%s/%s/%s", DYN_CONF_DIR, plugin->name, module->name);
- DIR *dir = opendir(buffer_tostring(path));
- if (dir != NULL) {
- struct dirent *ent;
- while ((ent = readdir(dir)) != NULL) {
- if (ent->d_name[0] == '.')
- continue;
- if (ent->d_type != DT_REG)
- continue;
- size_t len = strnlen(ent->d_name, NAME_MAX);
- if (len <= strlen(DYN_CONF_CFG_EXT))
- continue;
- if (strcmp(ent->d_name + len - strlen(DYN_CONF_CFG_EXT), DYN_CONF_CFG_EXT) != 0)
- continue;
- ent->d_name[len - strlen(DYN_CONF_CFG_EXT)] = '\0';
-
- struct job *job = job_new();
- job->name = strdupz(ent->d_name);
- job->module = module;
- dictionary_set(module->jobs, job->name, job, sizeof(job));
-
- deferred_config_push_back(plugin->name, module->name, job->name);
+ if (localhost) {
+ // load all jobs from disk
+ BUFFER *path = buffer_create(DYN_CONF_PATH_MAX, NULL);
+ buffer_sprintf(path, "%s/%s/%s", DYN_CONF_DIR, plugin->name, module->name);
+ DIR *dir = opendir(buffer_tostring(path));
+ if (dir != NULL) {
+ struct dirent *ent;
+ while ((ent = readdir(dir)) != NULL) {
+ if (ent->d_name[0] == '.')
+ continue;
+ if (ent->d_type != DT_REG)
+ continue;
+ size_t len = strnlen(ent->d_name, NAME_MAX);
+ if (len <= strlen(DYN_CONF_CFG_EXT))
+ continue;
+ if (strcmp(ent->d_name + len - strlen(DYN_CONF_CFG_EXT), DYN_CONF_CFG_EXT) != 0)
+ continue;
+ ent->d_name[len - strlen(DYN_CONF_CFG_EXT)] = '\0';
+
+ struct job *job = job_new(ent->d_name);
+ job->module = module;
+ job->flags = JOB_FLG_PS_LOADED;
+ job->type = JOB_TYPE_USER;
+
+ dictionary_set(module->jobs, job->name, job, sizeof(job));
+
+ deferred_config_push_back(plugins_dict, plugin->name, module->name, ent->d_name);
+ }
+ closedir(dir);
}
- closedir(dir);
+ buffer_free(path);
}
- buffer_free(path);
}
dictionary_set(plugin->modules, module->name, module, sizeof(module));
@@ -490,11 +576,49 @@ int register_module(struct configurable_plugin *plugin, struct module *module)
return 0;
}
+int register_job(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_type job_type, dyncfg_job_flg_t flags, int ignore_existing)
+{
+ int rc = 1;
+ const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name);
+ if (plugin_item == NULL) {
+ error_report("plugin \"%s\" not registered", plugin_name);
+ return rc;
+ }
+ struct configurable_plugin *plugin = dictionary_acquired_item_value(plugin_item);
+ struct module *mod = get_module_by_name(plugin, module_name);
+ if (mod == NULL) {
+ error_report("module \"%s\" not registered", module_name);
+ goto ERR_EXIT;
+ }
+ if (mod->type != MOD_TYPE_ARRAY) {
+ error_report("module \"%s\" is not an array", module_name);
+ goto ERR_EXIT;
+ }
+ if (get_job_by_name(mod, job_name) != NULL) {
+ if (!ignore_existing)
+ error_report("job \"%s\" already registered", job_name);
+ goto ERR_EXIT;
+ }
+
+ struct job *job = job_new(job_name);
+ job->module = mod;
+ job->flags = flags;
+ job->type = job_type;
+
+ dictionary_set(mod->jobs, job->name, job, sizeof(job));
+
+ rc = 0;
+ERR_EXIT:
+ dictionary_acquired_item_release(plugins_dict, plugin_item);
+ return rc;
+}
+
void freez_dyncfg(void *ptr) {
freez(ptr);
}
-static void handle_dyncfg_root(struct uni_http_response *resp, int method)
+#ifdef NETDATA_TEST_DYNCFG
+static void handle_dyncfg_root(DICTIONARY *plugins_dict, struct uni_http_response *resp, int method)
{
if (method != HTTP_METHOD_GET) {
resp->content = "method not allowed";
@@ -502,7 +626,7 @@ static void handle_dyncfg_root(struct uni_http_response *resp, int method)
resp->status = HTTP_RESP_METHOD_NOT_ALLOWED;
return;
}
- json_object *obj = get_list_of_plugins_json();
+ json_object *obj = get_list_of_plugins_json(plugins_dict);
json_object *wrapper = json_object_new_object();
json_object_object_add(wrapper, "configurable_plugins", obj);
resp->content = strdupz(json_object_to_json_string_ext(wrapper, JSON_C_TO_STRING_PRETTY));
@@ -518,7 +642,7 @@ static void handle_plugin_root(struct uni_http_response *resp, int method, struc
switch(method) {
case HTTP_METHOD_GET:
{
- dyncfg_config_t cfg = plugin->get_config_cb(plugin->cb_usr_ctx);
+ dyncfg_config_t cfg = plugin->get_config_cb(plugin->cb_usr_ctx, plugin->name);
resp->content = mallocz(cfg.data_size);
memcpy(resp->content, cfg.data, cfg.data_size);
resp->status = HTTP_RESP_OK;
@@ -558,11 +682,12 @@ static void handle_plugin_root(struct uni_http_response *resp, int method, struc
return;
}
}
+#endif
void handle_module_root(struct uni_http_response *resp, int method, struct configurable_plugin *plugin, const char *module, void *post_payload, size_t post_payload_size)
{
- if (strncmp(module, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) {
- dyncfg_config_t cfg = plugin->get_config_schema_cb(plugin->cb_usr_ctx);
+ if (strncmp(module, DYN_CONF_SCHEMA, sizeof(DYN_CONF_SCHEMA)) == 0) {
+ dyncfg_config_t cfg = plugin->get_config_schema_cb(plugin->cb_usr_ctx, plugin->name);
resp->content = mallocz(cfg.data_size);
memcpy(resp->content, cfg.data, cfg.data_size);
resp->status = HTTP_RESP_OK;
@@ -570,7 +695,7 @@ void handle_module_root(struct uni_http_response *resp, int method, struct confi
resp->content_length = cfg.data_size;
return;
}
- if (strncmp(module, DYN_CONF_MODULE_LIST, strlen(DYN_CONF_MODULE_LIST)) == 0) {
+ if (strncmp(module, DYN_CONF_MODULE_LIST, sizeof(DYN_CONF_MODULE_LIST)) == 0) {
if (method != HTTP_METHOD_GET) {
resp->content = "method not allowed (only GET)";
resp->content_length = strlen(resp->content);
@@ -596,7 +721,7 @@ void handle_module_root(struct uni_http_response *resp, int method, struct confi
return;
}
if (method == HTTP_METHOD_GET) {
- dyncfg_config_t cfg = mod->get_config_cb(mod->config_cb_usr_ctx, mod->name);
+ dyncfg_config_t cfg = mod->get_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name);
resp->content = mallocz(cfg.data_size);
memcpy(resp->content, cfg.data, cfg.data_size);
resp->status = HTTP_RESP_OK;
@@ -651,8 +776,7 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method,
.data = post_payload,
.data_size = post_payload_size
};
- job = add_job(mod, job_id, cont);
- if (job == NULL) {
+ if (mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job_id, &cont)) {
resp->content = "failed to add job";
resp->content_length = strlen(resp->content);
resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR;
@@ -672,7 +796,7 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method,
switch (method) {
case HTTP_METHOD_GET:
{
- dyncfg_config_t cfg = mod->get_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, job->name);
+ dyncfg_config_t cfg = mod->get_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job->name);
resp->content = mallocz(cfg.data_size);
memcpy(resp->content, cfg.data, cfg.data_size);
resp->status = HTTP_RESP_OK;
@@ -692,10 +816,11 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method,
.data = post_payload,
.data_size = post_payload_size
};
- if(set_job_config(job, cont)) {
- resp->status = HTTP_RESP_BAD_REQUEST;
+ if (mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name, job->name, &cont) != SET_CONFIG_ACCEPTED) {
+ error_report("DYNCFG module \"%s\" rejected config for job \"%s\"", mod->name, job->name);
resp->content = "failed to set job config";
resp->content_length = strlen(resp->content);
+ resp->status = HTTP_RESP_INTERNAL_SERVER_ERROR;
return;
}
resp->status = HTTP_RESP_OK;
@@ -726,8 +851,8 @@ static inline void _handle_job_root(struct uni_http_response *resp, int method,
void handle_job_root(struct uni_http_response *resp, int method, struct module *mod, const char *job_id, void *post_payload, size_t post_payload_size)
{
- if (strncmp(job_id, DYN_CONF_SCHEMA, strlen(DYN_CONF_SCHEMA)) == 0) {
- dyncfg_config_t cfg = mod->get_config_schema_cb(mod->config_cb_usr_ctx, mod->name);
+ if (strncmp(job_id, DYN_CONF_SCHEMA, sizeof(DYN_CONF_SCHEMA)) == 0) {
+ dyncfg_config_t cfg = mod->get_config_schema_cb(mod->config_cb_usr_ctx, mod->plugin->name, mod->name);
resp->content = mallocz(cfg.data_size);
memcpy(resp->content, cfg.data, cfg.data_size);
resp->status = HTTP_RESP_OK;
@@ -735,8 +860,8 @@ void handle_job_root(struct uni_http_response *resp, int method, struct module *
resp->content_length = cfg.data_size;
return;
}
- if (strncmp(job_id, DYN_CONF_JOB_SCHEMA, strlen(DYN_CONF_JOB_SCHEMA)) == 0) {
- dyncfg_config_t cfg = mod->get_job_config_schema_cb(mod->job_config_cb_usr_ctx, mod->name);
+ if (strncmp(job_id, DYN_CONF_JOB_SCHEMA, sizeof(DYN_CONF_JOB_SCHEMA)) == 0) {
+ dyncfg_config_t cfg = mod->get_job_config_schema_cb(mod->job_config_cb_usr_ctx, mod->plugin->name, mod->name);
resp->content = mallocz(cfg.data_size);
memcpy(resp->content, cfg.data, cfg.data_size);
resp->status = HTTP_RESP_OK;
@@ -744,7 +869,7 @@ void handle_job_root(struct uni_http_response *resp, int method, struct module *
resp->content_length = cfg.data_size;
return;
}
- if (strncmp(job_id, DYN_CONF_JOB_LIST, strlen(DYN_CONF_JOB_LIST)) == 0) {
+ if (strncmp(job_id, DYN_CONF_JOB_LIST, sizeof(DYN_CONF_JOB_LIST)) == 0) {
if (mod->type != MOD_TYPE_ARRAY) {
resp->content = "module type is not job_array (can't get the list of jobs)";
resp->content_length = strlen(resp->content);
@@ -776,7 +901,14 @@ void handle_job_root(struct uni_http_response *resp, int method, struct module *
dictionary_acquired_item_release(mod->jobs, job_item);
}
-struct uni_http_response dyn_conf_process_http_request(int method, const char *plugin, const char *module, const char *job_id, void *post_payload, size_t post_payload_size)
+struct uni_http_response dyn_conf_process_http_request(
+ DICTIONARY *plugins_dict __maybe_unused,
+ int method __maybe_unused,
+ const char *plugin __maybe_unused,
+ const char *module __maybe_unused,
+ const char *job_id __maybe_unused,
+ void *post_payload __maybe_unused,
+ size_t post_payload_size __maybe_unused)
{
struct uni_http_response resp = {
.status = HTTP_RESP_INTERNAL_SERVER_ERROR,
@@ -785,8 +917,14 @@ struct uni_http_response dyn_conf_process_http_request(int method, const char *p
.content_free = NULL,
.content_length = 0
};
+#ifndef NETDATA_TEST_DYNCFG
+ resp.content = "DYNCFG is disabled (as it is for now developer only feature). This will be enabled by default when ready for technical preview.";
+ resp.content_length = strlen(resp.content);
+ resp.status = HTTP_RESP_PRECOND_FAIL;
+ return resp;
+#else
if (plugin == NULL) {
- handle_dyncfg_root(&resp, method);
+ handle_dyncfg_root(plugins_dict, &resp, method);
return resp;
}
const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, plugin);
@@ -814,9 +952,9 @@ struct uni_http_response dyn_conf_process_http_request(int method, const char *p
goto EXIT_PLUGIN;
}
if (mod->type != MOD_TYPE_ARRAY) {
- resp.content = "module is not array";
+ resp.content = "400 - this module is not array type";
resp.content_length = strlen(resp.content);
- resp.status = HTTP_RESP_NOT_FOUND;
+ resp.status = HTTP_RESP_BAD_REQUEST;
goto EXIT_PLUGIN;
}
handle_job_root(&resp, method, mod, job_id, post_payload, post_payload_size);
@@ -824,6 +962,7 @@ struct uni_http_response dyn_conf_process_http_request(int method, const char *p
EXIT_PLUGIN:
dictionary_acquired_item_release(plugins_dict, plugin_item);
return resp;
+#endif
}
void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
@@ -836,41 +975,53 @@ void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data)
freez(plugin);
}
-void report_job_status(struct configurable_plugin *plugin, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason)
+// on failure - return NULL - all unlocked, nothing acquired
+// on success - return pointer to job item - keep job and plugin acquired and locked!!!
+// for caller convenience (to prevent another lock and races)
+// caller is responsible to unlock the job and release it when not needed anymore
+// this also avoids dependency creep
+const DICTIONARY_ITEM *report_job_status_acq_lock(DICTIONARY *plugins_dict, const DICTIONARY_ITEM **plugin_acq_item, DICTIONARY **job_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason)
{
- const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(plugins_dict, plugin->name);
- if (item == NULL) {
- netdata_log_error("plugin %s not found", plugin->name);
- return;
+ *plugin_acq_item = dictionary_get_and_acquire_item(plugins_dict, plugin_name);
+ if (*plugin_acq_item == NULL) {
+ netdata_log_error("plugin %s not found", plugin_name);
+ return NULL;
}
- struct configurable_plugin *plug = dictionary_acquired_item_value(item);
+
+ struct configurable_plugin *plug = dictionary_acquired_item_value(*plugin_acq_item);
struct module *mod = get_module_by_name(plug, module_name);
if (mod == NULL) {
netdata_log_error("module %s not found", module_name);
- goto EXIT_PLUGIN;
+ dictionary_acquired_item_release(plugins_dict, *plugin_acq_item);
+ return NULL;
}
if (mod->type != MOD_TYPE_ARRAY) {
netdata_log_error("module %s is not array", module_name);
- goto EXIT_PLUGIN;
+ dictionary_acquired_item_release(plugins_dict, *plugin_acq_item);
+ return NULL;
}
+ *job_dict = mod->jobs;
const DICTIONARY_ITEM *job_item = dictionary_get_and_acquire_item(mod->jobs, job_name);
if (job_item == NULL) {
netdata_log_error("job %s not found", job_name);
- goto EXIT_PLUGIN;
+ dictionary_acquired_item_release(plugins_dict, *plugin_acq_item);
+ return NULL;
}
struct job *job = dictionary_acquired_item_value(job_item);
+
+ pthread_mutex_lock(&job->lock);
job->status = status;
job->state = status_code;
if (job->reason != NULL) {
freez(job->reason);
}
- job->reason = reason;
+ job->reason = reason != NULL ? strdupz(reason) : NULL; // reason is optional
job->last_state_update = now_realtime_usec();
- dictionary_acquired_item_release(mod->jobs, job_item);
+ job->dirty = true;
-EXIT_PLUGIN:
- dictionary_acquired_item_release(plugins_dict, item);
+ // no unlock and acquired_item_release on success on purpose
+ return job_item;
}
int dyn_conf_init(void)
@@ -882,9 +1033,6 @@ int dyn_conf_init(void)
}
}
- plugins_dict = dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE);
- dictionary_register_delete_callback(plugins_dict, plugin_del_cb, NULL);
-
return 0;
}
@@ -912,6 +1060,15 @@ void *dyncfg_main(void *ptr)
while (!netdata_exit) {
struct deferred_cfg_send *dcs = deferred_config_pop(ptr);
+ DICTIONARY *plugins_dict = dcs->plugins_dict;
+#ifdef NETDATA_INTERNAL_CHECKS
+ if (plugins_dict == NULL) {
+ fatal("DYNCFG, plugins_dict is NULL");
+ deferred_config_free(dcs);
+ continue;
+ }
+#endif
+
const DICTIONARY_ITEM *plugin_item = dictionary_get_and_acquire_item(plugins_dict, dcs->plugin_name);
if (plugin_item == NULL) {
error_report("DYNCFG, plugin %s not found", dcs->plugin_name);
@@ -922,21 +1079,21 @@ void *dyncfg_main(void *ptr)
if (dcs->module_name == NULL) {
dyncfg_config_t cfg = load_config(dcs->plugin_name, NULL, NULL);
if (cfg.data != NULL) {
- plugin->set_config_cb(plugin->cb_usr_ctx, &cfg);
+ plugin->set_config_cb(plugin->cb_usr_ctx, plugin->name, &cfg);
freez(cfg.data);
}
} else if (dcs->job_name == NULL) {
dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, NULL);
if (cfg.data != NULL) {
struct module *mod = get_module_by_name(plugin, dcs->module_name);
- mod->set_config_cb(mod->config_cb_usr_ctx, mod->name, &cfg);
+ mod->set_config_cb(mod->config_cb_usr_ctx, plugin->name, mod->name, &cfg);
freez(cfg.data);
}
} else {
dyncfg_config_t cfg = load_config(dcs->plugin_name, dcs->module_name, dcs->job_name);
if (cfg.data != NULL) {
struct module *mod = get_module_by_name(plugin, dcs->module_name);
- mod->set_job_config_cb(mod->job_config_cb_usr_ctx, mod->name, dcs->job_name, &cfg);
+ mod->set_job_config_cb(mod->job_config_cb_usr_ctx, plugin->name, mod->name, dcs->job_name, &cfg);
freez(cfg.data);
}
}
@@ -947,3 +1104,37 @@ void *dyncfg_main(void *ptr)
netdata_thread_cleanup_pop(1);
return NULL;
}
+
+bool is_dyncfg_function(const char *function_name, uint8_t type) {
+ // TODO add hash to speed things up
+ if (type & (DYNCFG_FUNCTION_TYPE_GET | DYNCFG_FUNCTION_TYPE_REGULAR)) {
+ if (strncmp(function_name, FUNCTION_NAME_GET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_GET_PLUGIN_CONFIG)) == 0)
+ return true;
+ if (strncmp(function_name, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA)) == 0)
+ return true;
+ if (strncmp(function_name, FUNCTION_NAME_GET_MODULE_CONFIG, strlen(FUNCTION_NAME_GET_MODULE_CONFIG)) == 0)
+ return true;
+ if (strncmp(function_name, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA)) == 0)
+ return true;
+ if (strncmp(function_name, FUNCTION_NAME_GET_JOB_CONFIG, strlen(FUNCTION_NAME_GET_JOB_CONFIG)) == 0)
+ return true;
+ if (strncmp(function_name, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA, strlen(FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA)) == 0)
+ return true;
+ }
+
+ if (type & (DYNCFG_FUNCTION_TYPE_SET | DYNCFG_FUNCTION_TYPE_PAYLOAD)) {
+ if (strncmp(function_name, FUNCTION_NAME_SET_PLUGIN_CONFIG, strlen(FUNCTION_NAME_SET_PLUGIN_CONFIG)) == 0)
+ return true;
+ if (strncmp(function_name, FUNCTION_NAME_SET_MODULE_CONFIG, strlen(FUNCTION_NAME_SET_MODULE_CONFIG)) == 0)
+ return true;
+ if (strncmp(function_name, FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0)
+ return true;
+ }
+
+ if (type & (DYNCFG_FUNCTION_TYPE_DELETE | DYNCFG_FUNCTION_TYPE_REGULAR)) {
+ if (strncmp(function_name, FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0)
+ return true;
+ }
+
+ return false;
+}
diff --git a/libnetdata/dyn_conf/dyn_conf.h b/libnetdata/dyn_conf/dyn_conf.h
index f10ae6a12..f6a5fe49a 100644
--- a/libnetdata/dyn_conf/dyn_conf.h
+++ b/libnetdata/dyn_conf/dyn_conf.h
@@ -5,6 +5,21 @@
#include "../libnetdata.h"
+#define FUNCTION_NAME_GET_PLUGIN_CONFIG "get_plugin_config"
+#define FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA "get_plugin_config_schema"
+#define FUNCTION_NAME_GET_MODULE_CONFIG "get_module_config"
+#define FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA "get_module_config_schema"
+#define FUNCTION_NAME_GET_JOB_CONFIG "get_job_config"
+#define FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA "get_job_config_schema"
+#define FUNCTION_NAME_SET_PLUGIN_CONFIG "set_plugin_config"
+#define FUNCTION_NAME_SET_MODULE_CONFIG "set_module_config"
+#define FUNCTION_NAME_SET_JOB_CONFIG "set_job_config"
+#define FUNCTION_NAME_DELETE_JOB "delete_job"
+
+#define DYNCFG_MAX_WORDS 5
+
+#define DYNCFG_VFNC_RET_CFG_ACCEPTED (1)
+
enum module_type {
MOD_TYPE_UNKNOWN = 0,
MOD_TYPE_ARRAY,
@@ -20,6 +35,18 @@ static inline enum module_type str2_module_type(const char *type_name)
return MOD_TYPE_UNKNOWN;
}
+static inline const char *module_type2str(enum module_type type)
+{
+ switch (type) {
+ case MOD_TYPE_ARRAY:
+ return "job_array";
+ case MOD_TYPE_SINGLE:
+ return "single";
+ default:
+ return "unknown";
+ }
+}
+
struct dyncfg_config {
void *data;
size_t data_size;
@@ -37,7 +64,7 @@ enum job_status {
JOB_STATUS_ERROR
};
-inline enum job_status str2job_state(const char *state_name) {
+static inline enum job_status str2job_state(const char *state_name) {
if (strcmp(state_name, "stopped") == 0)
return JOB_STATUS_STOPPED;
else if (strcmp(state_name, "running") == 0)
@@ -47,24 +74,75 @@ inline enum job_status str2job_state(const char *state_name) {
return JOB_STATUS_UNKNOWN;
}
+const char *job_status2str(enum job_status status);
+
enum set_config_result {
SET_CONFIG_ACCEPTED = 0,
SET_CONFIG_REJECTED,
SET_CONFIG_DEFFER
};
+typedef uint32_t dyncfg_job_flg_t;
+enum job_flags {
+ JOB_FLG_PS_LOADED = 1 << 0, // PS abbr. Persistent Storage
+ JOB_FLG_PLUGIN_PUSHED = 1 << 1, // got it from plugin (e.g. autodiscovered job)
+ JOB_FLG_STREAMING_PUSHED = 1 << 2, // got it through streaming
+ JOB_FLG_USER_CREATED = 1 << 3, // user created this job during agent runtime
+};
+
+enum job_type {
+ JOB_TYPE_UNKNOWN = 0,
+ JOB_TYPE_STOCK = 1,
+ JOB_TYPE_USER = 2,
+ JOB_TYPE_AUTODISCOVERED = 3,
+};
+
+static inline const char* job_type2str(enum job_type type)
+{
+ switch (type) {
+ case JOB_TYPE_STOCK:
+ return "stock";
+ case JOB_TYPE_USER:
+ return "user";
+ case JOB_TYPE_AUTODISCOVERED:
+ return "autodiscovered";
+ case JOB_TYPE_UNKNOWN:
+ default:
+ return "unknown";
+ }
+}
+
+static inline enum job_type str2job_type(const char *type_name)
+{
+ if (strcmp(type_name, "stock") == 0)
+ return JOB_TYPE_STOCK;
+ else if (strcmp(type_name, "user") == 0)
+ return JOB_TYPE_USER;
+ else if (strcmp(type_name, "autodiscovered") == 0)
+ return JOB_TYPE_AUTODISCOVERED;
+ error_report("Unknown job type: %s", type_name);
+ return JOB_TYPE_UNKNOWN;
+}
+
struct job
{
- char *name;
+ const char *name;
+ enum job_type type;
+ struct module *module;
+
+ pthread_mutex_t lock;
+ // lock protexts only fields below (which are modified during job existence)
+ // others are static during lifetime of job
- //state reported by config
+ int dirty; // this relates to rrdpush, true if parent has different data than us
+
+ // state reported by plugin
+ usec_t last_state_update;
enum job_status status; // reported by plugin, enum as this has to be interpreted by UI
int state; // code reported by plugin which can mean anything plugin wants
char *reason; // reported by plugin, can be NULL (optional)
- usec_t last_state_update;
-
- struct module *module;
+ dyncfg_job_flg_t flags;
};
struct module
@@ -76,18 +154,18 @@ struct module
struct configurable_plugin *plugin;
// module config
- enum set_config_result (*set_config_cb)(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg);
- dyncfg_config_t (*get_config_cb)(void *usr_ctx, const char *name);
- dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx, const char *name);
+ enum set_config_result (*set_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg);
+ dyncfg_config_t (*get_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name);
+ dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx, const char *plugin_name, const char *module_name);
void *config_cb_usr_ctx;
DICTIONARY *jobs;
// jobs config
- dyncfg_config_t (*get_job_config_cb)(void *usr_ctx, const char *module_name, const char *job_name);
- dyncfg_config_t (*get_job_config_schema_cb)(void *usr_ctx, const char *module_name);
- enum set_config_result (*set_job_config_cb)(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg);
- enum set_config_result (*delete_job_cb)(void *usr_ctx, const char *module_name, const char *job_name);
+ dyncfg_config_t (*get_job_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name);
+ dyncfg_config_t (*get_job_config_schema_cb)(void *usr_ctx, const char *plugin_name, const char *module_name);
+ enum set_config_result (*set_job_config_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg);
+ enum set_config_result (*delete_job_cb)(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name);
void *job_config_cb_usr_ctx;
};
@@ -97,26 +175,34 @@ struct configurable_plugin {
DICTIONARY *modules;
const char *schema;
- dyncfg_config_t (*get_config_cb)(void *usr_ctx);
- dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx);
- enum set_config_result (*set_config_cb)(void *usr_ctx, dyncfg_config_t *cfg);
+ dyncfg_config_t (*get_config_cb)(void *usr_ctx, const char *plugin_name);
+ dyncfg_config_t (*get_config_schema_cb)(void *usr_ctx, const char *plugin_name);
+ enum set_config_result (*set_config_cb)(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg);
void *cb_usr_ctx; // context for all callbacks (split if needed in future)
};
// API to be used by plugins
-const DICTIONARY_ITEM *register_plugin(struct configurable_plugin *plugin);
-void unregister_plugin(const DICTIONARY_ITEM *plugin);
-int register_module(struct configurable_plugin *plugin, struct module *module);
+const DICTIONARY_ITEM *register_plugin(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, bool localhost);
+void unregister_plugin(DICTIONARY *plugins_dict, const DICTIONARY_ITEM *plugin);
+int register_module(DICTIONARY *plugins_dict, struct configurable_plugin *plugin, struct module *module, bool localhost);
+int register_job(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_type job_type, dyncfg_job_flg_t flags, int ignore_existing);
+
+const DICTIONARY_ITEM *report_job_status_acq_lock(DICTIONARY *plugins_dict, const DICTIONARY_ITEM **plugin_acq_item, DICTIONARY **job_dict, const char *plugin_name, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason);
-void report_job_status(struct configurable_plugin *plugin, const char *module_name, const char *job_name, enum job_status status, int status_code, char *reason);
+void dyn_conf_store_config(const char *function, const char *payload, struct configurable_plugin *plugin);
+void unlink_job(const char *plugin_name, const char *module_name, const char *job_name);
+void delete_job(struct configurable_plugin *plugin, const char *module_name, const char *job_name);
+void delete_job_pname(DICTIONARY *plugins_dict, const char *plugin_name, const char *module_name, const char *job_name);
// API to be used by the web server(s)
-json_object *get_list_of_plugins_json();
-struct configurable_plugin *get_plugin_by_name(const char *name);
+json_object *get_list_of_plugins_json(DICTIONARY *plugins_dict);
+struct configurable_plugin *get_plugin_by_name(DICTIONARY *plugins_dict, const char *name);
json_object *get_list_of_modules_json(struct configurable_plugin *plugin);
struct module *get_module_by_name(struct configurable_plugin *plugin, const char *module_name);
+json_object *job2json(struct job *job);
+
// helper struct to make interface between internal webserver and h2o same
struct uni_http_response {
int status;
@@ -126,11 +212,26 @@ struct uni_http_response {
void (*content_free)(void *);
};
-struct uni_http_response dyn_conf_process_http_request(int method, const char *plugin, const char *module, const char *job_id, void *payload, size_t payload_size);
+struct uni_http_response dyn_conf_process_http_request(DICTIONARY *plugins_dict, int method, const char *plugin, const char *module, const char *job_id, void *payload, size_t payload_size);
// API to be used by main netdata process, initialization and destruction etc.
int dyn_conf_init(void);
void freez_dyncfg(void *ptr);
+
+#define dyncfg_dictionary_create() dictionary_create(DICT_OPTION_VALUE_LINK_DONT_CLONE)
+
+void plugin_del_cb(const DICTIONARY_ITEM *item, void *value, void *data);
+
void *dyncfg_main(void *in);
+#define DYNCFG_FUNCTION_TYPE_REGULAR (1 << 0)
+#define DYNCFG_FUNCTION_TYPE_PAYLOAD (1 << 1)
+#define DYNCFG_FUNCTION_TYPE_GET (1 << 2)
+#define DYNCFG_FUNCTION_TYPE_SET (1 << 3)
+#define DYNCFG_FUNCTION_TYPE_DELETE (1 << 4)
+#define DYNCFG_FUNCTION_TYPE_ALL \
+ (DYNCFG_FUNCTION_TYPE_REGULAR | DYNCFG_FUNCTION_TYPE_PAYLOAD | DYNCFG_FUNCTION_TYPE_GET | DYNCFG_FUNCTION_TYPE_SET | DYNCFG_FUNCTION_TYPE_DELETE)
+
+bool is_dyncfg_function(const char *function_name, uint8_t type);
+
#endif //DYN_CONF_H
diff --git a/libnetdata/dyn_conf/tests/sample_test_config.json b/libnetdata/dyn_conf/tests/sample_test_config.json
new file mode 100644
index 000000000..a6595f124
--- /dev/null
+++ b/libnetdata/dyn_conf/tests/sample_test_config.json
@@ -0,0 +1,22 @@
+{
+ "http_endpoints": {
+ "parent": {
+ "host": "127.0.0.1",
+ "mguid": null,
+ "port": 20001,
+ "ssl": false
+ },
+ "child": {
+ "host": "127.0.0.1",
+ "mguid": "3bc2f7de-1445-11ee-9ed7-3c7c3f21784c",
+ "port": 19999,
+ "ssl": false
+ }
+ },
+ "global": {
+ "test_plugin_name": "external_plugin",
+ "test_array_module_name": "module_of_the_future",
+ "test_single_module_name": "module_of_the_future_single_type",
+ "test_job_name": "fixed_job"
+ }
+}
diff --git a/libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb b/libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb
new file mode 100644
index 000000000..820db77f8
--- /dev/null
+++ b/libnetdata/dyn_conf/tests/sub_tests/test_parent_child.rb
@@ -0,0 +1,192 @@
+class ParentChildTest
+ @@plugin_cfg = <<~HEREDOC
+{ "test" : "true" }
+HEREDOC
+ @@plugin_cfg2 = <<~HEREDOC
+{ "asdfgh" : "asdfgh" }
+HEREDOC
+
+ @@job_cfg = <<~HEREDOC
+{ "i am newly created job" : "true" }
+HEREDOC
+
+ def initialize
+ @parent = $config[:http_endpoints][:parent]
+ @child = $config[:http_endpoints][:child]
+ @plugin = $config[:global][:test_plugin_name]
+ @arry_mod = $config[:global][:test_array_module_name]
+ @single_mod = $config[:global][:test_single_module_name]
+ @test_job = $config[:global][:test_job_name]
+ end
+ def check_test_plugin_modules_list(host, child = nil)
+ rc = DynCfgHttpClient.get_plugin_module_list(host, @plugin, child)
+ assert_eq(rc.code, 200, "as HTTP code for get_module_list request on plugin \"#{@plugin}\"")
+ modules = nil
+ assert_nothing_raised do
+ modules = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(modules, :modules)
+ assert_eq(modules[:modules].count, 2, "as number of modules in plugin \"#{@plugin}\"")
+ modules[:modules].each do |m|
+ assert_has_key?(m, :name)
+ assert_has_key?(m, :type)
+ assert_is_one_of(m[:type], "job_array", "single")
+ end
+ assert_eq_str(modules[:modules][0][:name], @arry_mod, "name of first module in plugin \"#{@plugin}\"")
+ assert_eq_str(modules[:modules][1][:name], @single_mod, "name of second module in plugin \"#{@plugin}\"")
+ end
+ def run
+ TEST_SUITE("Parent/Child plugin config")
+
+ TEST("parent/child/get_plugin_list", "Get child (hops:1) plugin list trough parent")
+ plugins = DynCfgHttpClient.get_plugin_list(@parent, @child)
+ assert_eq(plugins.code, 200, "as HTTP code for get_plugin_list request")
+ assert_nothing_raised do
+ plugins = JSON.parse(plugins.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(plugins, :configurable_plugins)
+ assert_array_include?(plugins[:configurable_plugins], @plugin)
+ PASS()
+
+ TEST("parent/child/(set/get)plugin_config", "Set then get and compare child (hops:1) plugin config trough parent")
+ rc = DynCfgHttpClient.set_plugin_config(@parent, @plugin, @@plugin_cfg, @child)
+ assert_eq(rc.code, 200, "as HTTP code for set_plugin_config request")
+
+ rc = DynCfgHttpClient.get_plugin_config(@parent, @plugin, @child)
+ assert_eq(rc.code, 200, "as HTTP code for get_plugin_config request")
+ assert_eq_str(rc.parsed_response.chomp!, @@plugin_cfg, "as plugin config")
+
+ # We do this twice with different configs to ensure first config was not loaded from persistent storage (from previous tests)
+ rc = DynCfgHttpClient.set_plugin_config(@parent, @plugin, @@plugin_cfg2, @child)
+ assert_eq(rc.code, 200, "as HTTP code for set_plugin_config request 2")
+
+ rc = DynCfgHttpClient.get_plugin_config(@parent, @plugin, @child)
+ assert_eq(rc.code, 200, "as HTTP code for get_plugin_config request 2")
+ assert_eq_str(rc.parsed_response.chomp!, @@plugin_cfg2, "set/get plugin config 2")
+ PASS()
+
+ TEST("child/get_plugin_config", "Get child (hops:0) plugin config and compare with what we got trough parent (set_plugin_config from previous test)")
+ rc = DynCfgHttpClient.get_plugin_config(@child, @plugin, nil)
+ assert_eq(rc.code, 200, "as HTTP code for get_plugin_config request")
+ assert_eq_str(rc.parsed_response.chomp!, @@plugin_cfg2.chomp, "as plugin config")
+ PASS()
+
+ TEST("parent/child/plugin_module_list", "Get child (hops:1) plugin module list trough parent and check its contents")
+ check_test_plugin_modules_list(@parent, @child)
+ PASS()
+
+ TEST("child/plugin_module_list", "Get child (hops:0) plugin module list directly and check its contents")
+ check_test_plugin_modules_list(@child, nil)
+ PASS()
+
+ TEST("parent/child/module/jobs", "Get list of jobs from child (hops:1) trough parent and check its contents, check job updates")
+ rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child)
+ assert_eq(rc.code, 200, "as HTTP code for get_jobs request")
+ jobs = nil
+ assert_nothing_raised do
+ jobs = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(jobs, :jobs)
+ new_job = jobs[:jobs].find {|i| i[:name] == @test_job}
+ assert_not_nil(new_job)
+ assert_has_key?(new_job, :status)
+ assert_not_eq_str(new_job[:status], "unknown", "job status is other than unknown")
+ assert_has_key?(new_job, :flags)
+ assert_array_include?(new_job[:flags], "JOB_FLG_STREAMING_PUSHED")
+ PASS()
+
+ TEST("child/module/jobs", "Get list of jobs direct from child (hops:0) and check its contents, check job updates")
+ rc = DynCfgHttpClient.get_job_list(@child, @plugin, @arry_mod, nil)
+ assert_eq(rc.code, 200, "as HTTP code for get_jobs request")
+ jobs = nil
+ assert_nothing_raised do
+ jobs = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(jobs, :jobs)
+ new_job = jobs[:jobs].find {|i| i[:name] == @test_job}
+ assert_not_nil(new_job)
+ assert_has_key?(new_job, :status)
+ assert_not_eq_str(new_job[:status], "unknown", "job status is other than unknown")
+ assert_has_key?(new_job, :flags)
+
+ assert_array_not_include?(new_job[:flags], "JOB_FLG_STREAMING_PUSHED") # this is plugin directly at child so it should not show this flag
+ PASS()
+
+ TEST("parent/child/single_module/jobs", "Attempt getting list of jobs from child (hops:1) trough parent on single module. Check it fails properly")
+ rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @single_mod, @child)
+ assert_eq(rc.code, 400, "as HTTP code for get_jobs request")
+ assert_eq_str(rc.parsed_response, '400 - this module is not array type', "as HTTP code for get_jobs request on single module")
+ PASS()
+
+ created_job = SecureRandom.uuid
+ TEST("parent/child/module/cr_del_job", "Create and delete job on child (hops:1) trough parent")
+ # create new job
+ rc = DynCfgHttpClient.create_job(@parent, @plugin, @arry_mod, created_job, @@job_cfg, @child)
+ assert_eq_http_code(rc, 200, "as HTTP code for create_job request")
+ # check this job is in job list @parent
+ rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child)
+ assert_eq(rc.code, 200, "as HTTP code for get_jobs request")
+ jobs = nil
+ assert_nothing_raised do
+ jobs = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(jobs, :jobs)
+ new_job = jobs[:jobs].find {|i| i[:name] == created_job}
+ assert_not_nil(new_job)
+ # check this job is in job list @child
+ rc = DynCfgHttpClient.get_job_list(@child, @plugin, @arry_mod, nil)
+ assert_eq(rc.code, 200, "as HTTP code for get_jobs request")
+ jobs = nil
+ assert_nothing_raised do
+ jobs = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(jobs, :jobs)
+ new_job = jobs[:jobs].find {|i| i[:name] == created_job}
+ assert_not_nil(new_job)
+ # check we can get job config back
+ rc = DynCfgHttpClient.get_job_config(@parent, @plugin, @arry_mod, created_job, @child)
+ assert_eq(rc.code, 200, "as HTTP code for get_job_config request")
+ assert_eq_str(rc.parsed_response.chomp!, @@job_cfg, "as job config")
+ # delete job
+ rc = DynCfgHttpClient.delete_job(@parent, @plugin, @arry_mod, created_job, @child)
+ assert_eq(rc.code, 200, "as HTTP code for delete_job request")
+ # Check it is not in parents job list anymore
+ rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child)
+ assert_eq(rc.code, 200, "as HTTP code for get_jobs request")
+ jobs = nil
+ assert_nothing_raised do
+ jobs = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(jobs, :jobs)
+ new_job = jobs[:jobs].find {|i| i[:name] == created_job}
+ assert_nil(new_job)
+ # Check it is not in childs job list anymore
+ rc = DynCfgHttpClient.get_job_list(@child, @plugin, @arry_mod, nil)
+ assert_eq(rc.code, 200, "as HTTP code for get_jobs request")
+ jobs = nil
+ assert_nothing_raised do
+ jobs = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(jobs, :jobs)
+ new_job = jobs[:jobs].find {|i| i[:name] == created_job}
+ assert_nil(new_job)
+ PASS()
+
+ TEST("parent/child/module/del_undeletable_job", "Try delete job on child (child rejects), check failure case works (hops:1)")
+ # test if plugin rejects job deletion the job still remains in list as it should
+ rc = DynCfgHttpClient.delete_job(@parent, @plugin, @arry_mod, @test_job, @child)
+ assert_eq(rc.code, 500, "as HTTP code for delete_job request")
+ rc = DynCfgHttpClient.get_job_list(@parent, @plugin, @arry_mod, @child)
+ assert_eq(rc.code, 200, "as HTTP code for get_jobs request")
+ jobs = nil
+ assert_nothing_raised do
+ jobs = JSON.parse(rc.parsed_response, symbolize_names: true)
+ end
+ assert_has_key?(jobs, :jobs)
+ job = jobs[:jobs].find {|i| i[:name] == @test_job}
+ assert_not_nil(job)
+ PASS()
+ end
+end
+
+ParentChildTest.new.run()
diff --git a/libnetdata/dyn_conf/tests/test_dyncfg.rb b/libnetdata/dyn_conf/tests/test_dyncfg.rb
new file mode 100755
index 000000000..1b4b3a068
--- /dev/null
+++ b/libnetdata/dyn_conf/tests/test_dyncfg.rb
@@ -0,0 +1,266 @@
+#!/usr/bin/env ruby
+
+require 'json'
+require 'httparty'
+require 'pastel'
+require 'securerandom'
+
+ARGV.length == 1 or raise "Usage: #{$0} <config file>"
+config_file = ARGV[0]
+
+File.exist?(config_file) or raise "File not found: #{config_file}"
+
+$config = JSON.parse(File.read(config_file), symbolize_names: true)
+
+$plugin_name = $config[:global][:test_plugin_name]
+$pastel = Pastel.new
+
+class TestRunner
+ attr_reader :stats
+ def initialize
+ @stats = {
+ :suites => 0,
+ :tests => 0,
+ :assertions => 0
+ }
+ @test = nil
+ end
+ def add_assertion()
+ @stats[:assertions] += 1
+ end
+ def FAIL(msg, exception = nil, loc = nil)
+ puts $pastel.red.bold(" ✕ FAIL")
+ STDERR.print " "
+ if loc
+ STDERR.print $pastel.yellow("@#{loc.path}:#{loc.lineno}: ")
+ else
+ STDERR.print $pastel.yellow("@#{caller_locations(1, 1).first.path}:#{caller_locations(1, 1).first.lineno}: ")
+ end
+ STDERR.puts msg
+ STDERR.puts exception.full_message(:highlight => true) if exception
+ STDERR.puts $pastel.yellow(" Backtrace:")
+ caller.each do |line|
+ STDERR.puts " #{line}"
+ end
+ exit 1
+ end
+ def PASS()
+ STDERR.puts $pastel.green.bold(" ✓ PASS")
+ @stats[:tests] += 1
+ @test = nil
+ end
+ def TEST_SUITE(name)
+ puts $pastel.bold("• TEST SUITE: \"#{name}\"")
+ @stats[:suites] += 1
+ end
+ def assert_no_test_running()
+ unless @test.nil?
+ STDERR.puts $pastel.red("\nFATAL: Test \"#{@test}\" did not call PASS() or FAIL()!")
+ exit 1
+ end
+ end
+ def TEST(name, description = nil)
+ assert_no_test_running()
+ @test = name
+ col = 0
+ txt = " ├─ T: #{name} "
+ col += txt.length
+ print $pastel.bold(txt)
+
+ tab = 50
+ rem = tab - (col % tab)
+ rem.times do putc ' ' end
+ col += rem
+
+ if (description)
+ txt = " - #{description} "
+ col += txt.length
+ print txt
+
+ tab = 180
+ rem = tab - (col % tab)
+ rem.times do putc '.' end
+ end
+ end
+ def FINALIZE()
+ assert_no_test_running()
+ end
+end
+
+$test_runner = TestRunner.new
+def FAIL(msg, exception = nil, loc = nil)
+ $test_runner.FAIL(msg, exception, loc)
+end
+def PASS()
+ $test_runner.PASS()
+end
+def TEST_SUITE(name)
+ $test_runner.TEST_SUITE(name)
+end
+def TEST(name, description = nil)
+ $test_runner.TEST(name, description)
+end
+
+def assert_eq(got, expected, msg = nil)
+ unless got == expected
+ FAIL("Expected #{expected}, got #{got} #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_eq_http_code(got, expected, msg = nil)
+ unless got.code == expected
+ FAIL("Expected #{expected}, got #{got}. Server \"#{got.parsed_response}\" #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_eq_str(got, expected, msg = nil)
+ unless got == expected
+ FAIL("Strings do not match #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_not_eq_str(got, expected, msg = nil)
+ unless got != expected
+ FAIL("Strings shoud not match #{msg ? "(#{msg})" : ""}", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_nothing_raised()
+ begin
+ yield
+ rescue Exception => e
+ FAIL("Unexpected exception of type #{e.class} raised. Msg: \"#{e.message}\"", e, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_has_key?(hash, key)
+ unless hash.has_key?(key)
+ FAIL("Expected key \"#{key}\" in hash", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_array_include?(array, value)
+ unless array.include?(value)
+ FAIL("Expected array to include \"#{value}\"", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_array_not_include?(array, value)
+ if array.include?(value)
+ FAIL("Expected array to not include \"#{value}\"", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_is_one_of(value, *values)
+ unless values.include?(value)
+ FAIL("Expected value to be one of #{values.join(", ")}", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_not_nil(value)
+ if value.nil?
+ FAIL("Expected value to not be nil", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+def assert_nil(value)
+ unless value.nil?
+ FAIL("Expected value to be nil", nil, caller_locations(1, 1).first)
+ end
+ $test_runner.add_assertion()
+end
+
+
+class DynCfgHttpClient
+ def self.protocol(cfg)
+ return cfg[:ssl] ? 'https://' : 'http://'
+ end
+ def self.url_base(host)
+ return "#{protocol(host)}#{host[:host]}:#{host[:port]}"
+ end
+ def self.get_url_cfg_base(host, child = nil)
+ url = url_base(host)
+ url += "/host/#{child[:mguid]}" if child
+ url += "/api/v2/config"
+ return url
+ end
+ def self.get_url_cfg_plugin(host, plugin, child = nil)
+ return get_url_cfg_base(host, child) + '/' + plugin
+ end
+ def self.get_url_cfg_module(host, plugin, mod, child = nil)
+ return get_url_cfg_plugin(host, plugin, child) + '/' + mod
+ end
+ def self.get_url_cfg_job(host, plugin, mod, job_id, child = nil)
+ return get_url_cfg_module(host, plugin, mod, child) + "/#{job_id}"
+ end
+ def self.get_plugin_list(host, child = nil)
+ begin
+ return HTTParty.get(get_url_cfg_base(host, child), verify: false, format: :plain)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.get_plugin_config(host, plugin, child = nil)
+ begin
+ return HTTParty.get(get_url_cfg_plugin(host, plugin, child), verify: false)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.set_plugin_config(host, plugin, cfg, child = nil)
+ begin
+ return HTTParty.put(get_url_cfg_plugin(host, plugin, child), verify: false, body: cfg)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.get_plugin_module_list(host, plugin, child = nil)
+ begin
+ return HTTParty.get(get_url_cfg_plugin(host, plugin, child) + "/modules", verify: false, format: :plain)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.get_job_list(host, plugin, mod, child = nil)
+ begin
+ return HTTParty.get(get_url_cfg_module(host, plugin, mod, child) + "/jobs", verify: false, format: :plain)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.create_job(host, plugin, mod, job_id, job_cfg, child = nil)
+ begin
+ return HTTParty.post(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false, body: job_cfg)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.delete_job(host, plugin, mod, job_id, child = nil)
+ begin
+ return HTTParty.delete(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.get_job_config(host, plugin, mod, job_id, child = nil)
+ begin
+ return HTTParty.get(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false, format: :plain)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+ def self.set_job_config(host, plugin, mod, job_id, job_cfg, child = nil)
+ begin
+ return HTTParty.put(get_url_cfg_job(host, plugin, mod, job_id, child), verify: false, body: job_cfg)
+ rescue => e
+ FAIL(e.message, e)
+ end
+ end
+end
+
+require_relative 'sub_tests/test_parent_child.rb'
+
+$test_runner.FINALIZE()
+puts $pastel.green.bold("All tests passed!")
+puts ("Total #{$test_runner.stats[:assertions]} assertions, #{$test_runner.stats[:tests]} tests in #{$test_runner.stats[:suites]} suites")
+exit 0
diff --git a/libnetdata/dyn_conf/tests/test_plugin/test.plugin b/libnetdata/dyn_conf/tests/test_plugin/test.plugin
new file mode 100755
index 000000000..b890ab314
--- /dev/null
+++ b/libnetdata/dyn_conf/tests/test_plugin/test.plugin
@@ -0,0 +1,250 @@
+#!/usr/bin/env ruby
+
+# bogus chart that we create just so there is at least one chart
+CHART_TYPE = 'lines'
+UPDATE_EVERY = 1
+PRIORITY = 100000
+CHART_NAME = 'number_of_processes'
+DIMENSION_NAME = 'running'
+
+$plugin_name = "external_plugin"
+$plugin_version = "0.0.1"
+$plugin_config = <<-HEREDOC
+test_plugin_config
+hableba hableba hableba
+HEREDOC
+
+$array_module_name = 'module_of_the_future'
+$fixed_job_name = 'fixed_job'
+
+$modules = {
+ $array_module_name => {
+ :type => :job_array,
+ :jobs => {
+ $fixed_job_name => {
+ :type => :fixed,
+ :config => <<-HEREDOC
+fixed_job_config
+HEREDOC
+ },
+ },
+ :config => <<-HEREDOC
+module_of_the_future_config
+HEREDOC
+ },
+ "module_of_the_future_single_type" => {
+ :type => :single,
+ :jobs => {},
+ :config => <<-HEREDOC
+module_of_the_future_single_type_config
+HEREDOC
+ }
+}
+
+def out(str)
+ $log.puts "2 NETDATA> #{str}"
+ $stdout.puts str
+ $stdout.flush
+ $log.flush
+end
+
+def log(str)
+ $log.puts "LOG > #{str}"
+ $log.flush
+end
+
+#TODO this is AI code, verify
+def split_with_quotes(str)
+ result = []
+ current_word = ""
+ in_quotes = false
+ escaped = false
+
+ str.each_char do |char|
+ if char == '\\' && !escaped
+ escaped = true
+ next
+ end
+
+ if char == '"' && !escaped
+ in_quotes = !in_quotes
+ current_word << char
+ elsif char == ' ' && !in_quotes
+ result << current_word unless current_word.empty?
+ current_word = ""
+ else
+ current_word << char
+ end
+
+ escaped = false
+ end
+
+ result << current_word unless current_word.empty?
+
+ result
+end
+
+
+def print_startup_messages
+ out "DYNCFG_ENABLE #{$plugin_name}"
+ $modules.each do |name, module_config|
+ out "DYNCFG_REGISTER_MODULE #{name} #{module_config[:type]}"
+ end
+ out "CHART system.#{CHART_NAME} '' 'Number of running processes' 'processes' processes processes.#{CHART_NAME} #{CHART_TYPE} #{PRIORITY} #{UPDATE_EVERY}"
+ out "DIMENSION #{DIMENSION_NAME} '' absolute 1 1"
+
+ $modules.each do |mod_name, mod|
+ next unless mod[:type] == :job_array
+ mod[:jobs].each do |job_name, job|
+ next unless job[:type] == :fixed
+ out "DYNCFG_REGISTER_JOB #{mod_name} #{job_name} stock 0"
+ out "REPORT_JOB_STATUS #{$array_module_name} #{$fixed_job_name} running 0"
+ end
+ end
+end
+
+def function_result(txid, msg, result)
+ out "FUNCTION_RESULT_BEGIN #{txid} #{result} text/plain 5"
+ out msg
+ out "FUNCTION_RESULT_END"
+end
+
+def process_payload_function(params)
+ log "payload function #{params[:fncname]}, #{params[:fncparams]}"
+ fnc_name, mod_name, job_name = params[:fncparams]
+ case fnc_name
+ when 'set_plugin_config'
+ $plugin_config = params[:payload]
+ function_result(params[:txid], "plugin config set", 1)
+ when 'set_module_config'
+ mod = $modules[mod_name]
+ return function_result(params[:txid], "no such module", 0) if mod.nil?
+ mod[:config] = params[:payload]
+ function_result(params[:txid], "module config set", 1)
+ when 'set_job_config'
+ mod = $modules[mod_name]
+ return function_result(params[:txid], "no such module", 0) if mod.nil?
+ job = mod[:jobs][job_name]
+ if job.nil?
+ job = Hash.new if job.nil?
+ job[:type] = :dynamic
+ mod[:jobs][job_name] = job
+ end
+ job[:config] = params[:payload]
+ function_result(params[:txid], "job config set", 1)
+ end
+end
+
+def process_function(params)
+ log "normal function #{params[:fncname]}, #{params[:fncparams]}"
+ fnc_name, mod_name, job_name = params[:fncparams]
+ case fnc_name
+ when 'get_plugin_config'
+ function_result(params[:txid], $plugin_config, 1)
+ when 'get_module_config'
+ return function_result(params[:txid], "no such module", 0) unless $modules.has_key?(mod_name)
+ function_result(params[:txid], $modules[mod_name][:config], 1)
+ when 'get_job_config'
+ mod = $modules[mod_name]
+ return function_result(params[:txid], "no such module", 0) if mod.nil?
+ job = mod[:jobs][job_name]
+ return function_result(params[:txid], "no such job", 0) if job.nil?
+ function_result(params[:txid], job[:config], 1)
+ when 'delete_job'
+ mod = $modules[mod_name]
+ return function_result(params[:txid], "no such module", 0) if mod.nil?
+ job = mod[:jobs][job_name]
+ return function_result(params[:txid], "no such job", 0) if job.nil?
+ if job[:type] == :fixed
+ return function_result(params[:txid], "this job can't be deleted", 0)
+ else
+ mod[:jobs].delete(job_name)
+ function_result(params[:txid], "job deleted", 1)
+ end
+ end
+end
+
+$inflight_incoming = nil
+def process_input(input)
+ words = split_with_quotes(input)
+
+ unless $inflight_incoming.nil?
+ if input == "FUNCTION_PAYLOAD_END"
+ log $inflight_incoming[:payload]
+ process_payload_function($inflight_incoming)
+ $inflight_incoming = nil
+ else
+ $inflight_incoming[:payload] << input
+ $inflight_incoming[:payload] << "\n"
+ end
+ return
+ end
+
+ case words[0]
+ when "FUNCTION", "FUNCTION_PAYLOAD"
+ params = {}
+ params[:command] = words[0]
+ params[:txid] = words[1]
+ params[:timeout] = words[2].to_i
+ params[:fncname] = words[3]
+ params[:fncname] = params[:fncname][1..-2] if params[:fncname].start_with?('"') && params[:fncname].end_with?('"')
+ if params[:command] == "FUNCTION_PAYLOAD"
+ $inflight_incoming = Hash.new
+ params[:fncparams] = split_with_quotes(params[:fncname])
+ params[:fncname] = params[:fncparams][0]
+ $inflight_incoming[:txid] = params[:txid]
+ $inflight_incoming[:fncname] = params[:fncname]
+ $inflight_incoming[:params] = params
+ $inflight_incoming[:fncparams] = params[:fncparams]
+ $inflight_incoming[:payload] = ""
+ else
+ params[:fncparams] = split_with_quotes(params[:fncname])
+ params[:fncname] = params[:fncparams][0]
+ process_function(params)
+ end
+ end
+end
+
+def read_and_output_metric
+ processes = `ps -e | wc -l`.to_i - 1 # -1 to exclude the header line
+ timestamp = Time.now.to_i
+
+ puts "BEGIN system.#{CHART_NAME}"
+ puts "SET #{DIMENSION_NAME} = #{processes}"
+ puts "END"
+end
+
+def the_main
+ $stderr.reopen("/tmp/test_plugin_err.log", "w")
+ $log = File.open("/tmp/test_plugin.log", "w")
+ $log.puts "Starting plugin"
+ print_startup_messages
+ $log.puts "init done"
+ $log.flush
+
+ last_metric_time = Time.now
+
+ loop do
+ time_since_last_metric = Time.now - last_metric_time
+
+ # If it's been more than 1 second since we collected metrics, collect them now
+ if time_since_last_metric >= 1
+ read_and_output_metric
+ last_metric_time = Time.now
+ end
+
+ # Use select to wait for input, but only wait up to the time remaining until we need to collect metrics again
+ remaining_time = [1 - time_since_last_metric, 0].max
+ if select([$stdin], nil, nil, remaining_time)
+ input = $stdin.gets
+ next if input.class != String
+ input.chomp!
+ $log.puts "RAW INPUT< #{input}"
+ $log.flush
+ process_input(input)
+ end
+ end
+end
+
+
+the_main if __FILE__ == $PROGRAM_NAME