diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:08 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:08 +0000 |
commit | 81581f9719bc56f01d5aa08952671d65fda9867a (patch) | |
tree | 0f5c6b6138bf169c23c9d24b1fc0a3521385cb18 /collectors/plugins.d | |
parent | Releasing debian version 1.38.1-1. (diff) | |
download | netdata-81581f9719bc56f01d5aa08952671d65fda9867a.tar.xz netdata-81581f9719bc56f01d5aa08952671d65fda9867a.zip |
Merging upstream version 1.39.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/README.md | 89 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 93 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 15 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 994 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 33 |
5 files changed, 1008 insertions, 216 deletions
diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md index 8ad1d3a65..1c3b50cb7 100644 --- a/collectors/plugins.d/README.md +++ b/collectors/plugins.d/README.md @@ -1,13 +1,13 @@ <!-- -title: "External plugins overview" +title: "External plugins" custom_edit_url: "https://github.com/netdata/netdata/edit/master/collectors/plugins.d/README.md" -sidebar_label: "External plugins overview" +sidebar_label: "External plugins" learn_status: "Published" learn_topic_type: "References" -learn_rel_path: "Developers" +learn_rel_path: "Developers/External plugins" --> -# External plugins overview +# External plugins `plugins.d` is the Netdata internal plugin that collects metrics from external processes, thus allowing Netdata to use **external plugins**. @@ -138,7 +138,7 @@ a single program can produce any number of charts with any number of dimensions Charts can be added any time (not just the beginning). -### command line parameters +### Command line parameters The plugin **MUST** accept just **one** parameter: **the number of seconds it is expected to update the values for its charts**. The value passed by Netdata @@ -149,7 +149,7 @@ The external plugin can overwrite the update frequency. For example, the server request per second updates, but the plugin may ignore it and update its charts every 5 seconds. -### environment variables +### Environment variables There are a few environment variables that are set by `netdata` and are available for the plugin to use. @@ -175,6 +175,83 @@ The plugin should output instructions for Netdata to its output (`stdout`). Sinc `DISABLE` will disable this plugin. This will prevent Netdata from restarting the plugin. You can also exit with the value `1` to have the same effect. +#### HOST_DEFINE + +`HOST_DEFINE` defines a new (or updates an existing) virtual host. + +The template is: + +> HOST_DEFINE machine_guid hostname + +where: + +- `machine_guid` + + uniquely identifies the host, this is what will be needed to add charts to the host. + +- `hostname` + + is the hostname of the virtual host + +#### HOST_LABEL + +`HOST_LABEL` adds a key-value pair to the virtual host labels. It has to be given between `HOST_DEFINE` and `HOST_DEFINE_END`. + +The template is: + +> HOST_LABEL key value + +where: + +- `key` + + uniquely identifies the key of the label + +- `value` + + is the value associated with this key + +There are a few special keys that are used to define the system information of the monitored system: + +- `_cloud_provider_type` +- `_cloud_instance_type` +- `_cloud_instance_region` +- `_os_name` +- `_os_version` +- `_kernel_version` +- `_system_cores` +- `_system_cpu_freq` +- `_system_ram_total` +- `_system_disk_space` +- `_architecture` +- `_virtualization` +- `_container` +- `_container_detection` +- `_virt_detection` +- `_is_k8s_node` +- `_install_type` +- `_prebuilt_arch` +- `_prebuilt_dist` + +#### HOST_DEFINE_END + +`HOST_DEFINE_END` commits the host information, creating a new host entity, or updating an existing one with the same `machine_guid`. + +#### HOST + +`HOST` switches data collection between hosts. + +The template is: + +> HOST machine_guid + +where: + +- `machine_guid` + + is the UUID of the host to switch to. After this command, every other command following it is assumed to be associated with this host. + Setting machine_guid to `localhost` switches data collection to the local host. + #### CHART `CHART` defines a new chart. diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 7608f3afc..dc13cd2ee 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -18,7 +18,7 @@ inline size_t pluginsd_initialize_plugin_directories() } // Parse it and store it to plugin directories - return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0); + return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace); } static inline void plugin_set_disabled(struct plugind *cd) { @@ -51,6 +51,8 @@ static void pluginsd_worker_thread_cleanup(void *arg) { struct plugind *cd = (struct plugind *)arg; + worker_unregister(); + netdata_spinlock_lock(&cd->unsafe.spinlock); cd->unsafe.running = false; @@ -62,74 +64,73 @@ static void pluginsd_worker_thread_cleanup(void *arg) netdata_spinlock_unlock(&cd->unsafe.spinlock); if (pid) { - info("data collection thread exiting"); - siginfo_t info; - info("killing child process pid %d", pid); + info("PLUGINSD: 'host:%s', killing data collection child process with pid %d", + rrdhost_hostname(cd->host), pid); + if (killpid(pid) != -1) { - info("waiting for child process pid %d to exit...", pid); + info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...", + rrdhost_hostname(cd->host), pid); + waitid(P_PID, (id_t)pid, &info, WEXITED); } } } #define SERIAL_FAILURES_THRESHOLD 10 -static void pluginsd_worker_thread_handle_success(struct plugind *cd) -{ +static void pluginsd_worker_thread_handle_success(struct plugind *cd) { if (likely(cd->successful_collections)) { sleep((unsigned int)cd->update_every); return; } if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) { - info( - "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", - cd->fullfilename, cd->unsafe.pid, - plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); + info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); + sleep((unsigned int)(cd->update_every * 10)); return; } if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - error( - "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)." - "We have tried to collect something %zu times - unsuccessfully. Disabling it.", - cd->fullfilename, cd->unsafe.pid, cd->serial_failures); + error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, " + "although it reports success (exits with 0)." + "We have tried to collect something %zu times - unsuccessfully. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures); plugin_set_disabled(cd); return; } } -static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) -{ +static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) { if (worker_ret_code == -1) { - info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid); + info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); plugin_set_disabled(cd); return; } if (!cd->successful_collections) { - error( - "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename, - cd->unsafe.pid, worker_ret_code); + error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code); plugin_set_disabled(cd); return; } if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) { - error( - "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", - cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, - plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); + error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - error( - "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." - "We tried to restart it %zu times, but it failed to generate data. Disabling it.", - cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures); + error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." + "We tried to restart it %zu times, but it failed to generate data. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, + cd->successful_collections, cd->serial_failures); plugin_set_disabled(cd); return; } @@ -137,8 +138,7 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r #undef SERIAL_FAILURES_THRESHOLD -static void *pluginsd_worker_thread(void *arg) -{ +static void *pluginsd_worker_thread(void *arg) { worker_register("PLUGINSD"); netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg); @@ -151,14 +151,20 @@ static void *pluginsd_worker_thread(void *arg) while (service_running(SERVICE_COLLECTORS)) { FILE *fp_child_input = NULL; FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); + if (unlikely(!fp_child_input || !fp_child_output)) { - error("Cannot popen(\"%s\", \"r\").", cd->cmd); + error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd); break; } - info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid); - count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0); - error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count); + info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); + + count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0); + + info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count); + killpid(cd->unsafe.pid); int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); @@ -172,29 +178,29 @@ static void *pluginsd_worker_thread(void *arg) if (unlikely(!plugin_is_enabled(cd))) break; } - worker_unregister(); netdata_thread_cleanup_pop(1); return NULL; } -static void pluginsd_main_cleanup(void *data) -{ +static void pluginsd_main_cleanup(void *data) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - info("cleaning up..."); + info("PLUGINSD: cleaning up..."); struct plugind *cd; for (cd = pluginsd_root; cd; cd = cd->next) { netdata_spinlock_lock(&cd->unsafe.spinlock); if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) { - info("stopping plugin thread: %s", cd->id); + info("PLUGINSD: 'host:%s', stopping plugin thread: %s", + rrdhost_hostname(cd->host), cd->id); + netdata_thread_cancel(cd->unsafe.thread); } netdata_spinlock_unlock(&cd->unsafe.spinlock); } - info("cleanup completed."); + info("PLUGINSD: cleanup completed."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; worker_unregister(); @@ -282,6 +288,7 @@ void *pluginsd_main(void *ptr) strncpyz(cd->filename, file->d_name, FILENAME_MAX); snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename); + cd->host = localhost; cd->unsafe.enabled = enabled; cd->unsafe.running = false; @@ -294,9 +301,7 @@ void *pluginsd_main(void *ptr) config_get(cd->id, "command options", def)); // link it - if (likely(pluginsd_root)) - cd->next = pluginsd_root; - pluginsd_root = cd; + DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(pluginsd_root, cd, prev, next); if (plugin_is_enabled(cd)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 35af9fe58..68ed4940f 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -34,6 +34,17 @@ #define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" #define PLUGINSD_KEYWORD_REPLAY_END "REND" +#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2" +#define PLUGINSD_KEYWORD_SET_V2 "SET2" +#define PLUGINSD_KEYWORD_END_V2 "END2" + +#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE" +#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END" +#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL" +#define PLUGINSD_KEYWORD_HOST "HOST" + +#define PLUGINSD_KEYWORD_EXIT "EXIT" + #define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds #define PLUGINSD_LINE_MAX_SSL_READ 512 @@ -56,6 +67,7 @@ struct plugind { size_t serial_failures; // the number of times the plugin started // without collecting values + RRDHOST *host; // the host the plugin collects data for int update_every; // the plugin default data collection frequency struct { @@ -67,7 +79,8 @@ struct plugind { } unsafe; time_t started_t; - uint32_t capabilities; // follows the same principles as streaming capabilities + + struct plugind *prev; struct plugind *next; }; diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 2c0f2cbc6..28fc0bd49 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -71,20 +71,109 @@ static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char return st; } -static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { +static inline RRDSET *pluginsd_get_chart_from_parent(void *user) { + return ((PARSER_USER_OBJECT *) user)->st; +} + +static inline void pluginsd_lock_rrdset_data_collection(void *user) { + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + if(u->st && !u->v2.locked_data_collection) { + netdata_spinlock_lock(&u->st->data_collection_lock); + u->v2.locked_data_collection = true; + } +} + +static inline bool pluginsd_unlock_rrdset_data_collection(void *user) { + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + if(u->st && u->v2.locked_data_collection) { + netdata_spinlock_unlock(&u->st->data_collection_lock); + u->v2.locked_data_collection = false; + return true; + } + + return false; +} + +void pluginsd_rrdset_cleanup(RRDSET *st) { + for(size_t i = 0; i < st->pluginsd.used ; i++) { + if (st->pluginsd.rda[i]) { + rrddim_acquired_release(st->pluginsd.rda[i]); + st->pluginsd.rda[i] = NULL; + } + } + freez(st->pluginsd.rda); + st->pluginsd.rda = NULL; + st->pluginsd.size = 0; + st->pluginsd.used = 0; + st->pluginsd.pos = 0; +} + +static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) { + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + + if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) { + error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked", + rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + } + + if(unlikely(u->v2.ml_locked)) { + ml_chart_update_end(u->st); + u->v2.ml_locked = false; + + error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked", + rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + } + + if(st) { + size_t dims = dictionary_entries(st->rrddim_root_index); + if(unlikely(st->pluginsd.size < dims)) { + st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *)); + st->pluginsd.size = dims; + } + + if(st->pluginsd.pos > st->pluginsd.used && st->pluginsd.pos <= st->pluginsd.size) + st->pluginsd.used = st->pluginsd.pos; + + st->pluginsd.pos = 0; + } + + u->st = st; +} + +static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { if (unlikely(!dimension || !*dimension)) { error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", rrdhost_hostname(host), rrdset_id(st), cmd); return NULL; } - RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); + RRDDIM_ACQUIRED *rda; - if (unlikely(!rda)) + if(likely(st->pluginsd.pos < st->pluginsd.used)) { + rda = st->pluginsd.rda[st->pluginsd.pos]; + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + if (likely(rd && string_strcmp(rd->id, dimension) == 0)) { + st->pluginsd.pos++; + return rd; + } + else { + rrddim_acquired_release(rda); + st->pluginsd.rda[st->pluginsd.pos] = NULL; + } + } + + rda = rrddim_find_and_acquire(st, dimension); + if (unlikely(!rda)) { error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", rrdhost_hostname(host), rrdset_id(st), dimension, cmd); - return rda; + return NULL; + } + + if(likely(st->pluginsd.pos < st->pluginsd.size)) + st->pluginsd.rda[st->pluginsd.pos++] = rda; + + return rrddim_acquired_to_rrddim(rda); } static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { @@ -102,8 +191,14 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons return st; } -static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) { +static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) { ((PARSER_USER_OBJECT *) user)->enabled = 0; + + if(keyword && msg) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg); + } + return PARSER_RC_ERROR; } @@ -113,24 +208,21 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) char *value = get_word(words, num_words, 2); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - - RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); - if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET"); if (value && *value) - rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0)); + rrddim_set_by_pointer(st, rd, str2ll_encoded(value)); - rrddim_acquired_release(rda); return PARSER_RC_OK; } @@ -140,12 +232,12 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) char *microseconds_txt = get_word(words, num_words, 2); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - ((PARSER_USER_OBJECT *)user)->st = st; + pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN); usec_t microseconds = 0; if (microseconds_txt && *microseconds_txt) { @@ -187,16 +279,16 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) UNUSED(num_words); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); - ((PARSER_USER_OBJECT *) user)->st = NULL; - ((PARSER_USER_OBJECT *) user)->count++; + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END); + ((PARSER_USER_OBJECT *) user)->data_collections_count++; struct timeval now; now_realtime_timeval(&now); @@ -205,10 +297,151 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) return PARSER_RC_OK; } +static void pluginsd_host_define_cleanup(void *user) { + PARSER_USER_OBJECT *u = user; + + string_freez(u->host_define.hostname); + dictionary_destroy(u->host_define.rrdlabels); + + u->host_define.hostname = NULL; + u->host_define.rrdlabels = NULL; + u->host_define.parsing_host = false; +} + +static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) { + if(uuid_parse(guid, *uuid)) + return false; + + uuid_unparse_lower(*uuid, output); + + return true; +} + +static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) { + PARSER_USER_OBJECT *u = user; + + char *guid = get_word(words, num_words, 1); + char *hostname = get_word(words, num_words, 2); + + if(unlikely(!guid || !*guid || !hostname || !*hostname)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters"); + + if(unlikely(u->host_define.parsing_host)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, + "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?"); + + if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?"); + + u->host_define.hostname = string_strdupz(hostname); + u->host_define.rrdlabels = rrdlabels_create(); + u->host_define.parsing_host = true; + + return PARSER_RC_OK; +} + +static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) { + PARSER_USER_OBJECT *u = user; + + char *name = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); + + if(!name || !*name || !value) + return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters"); + + if(!u->host_define.parsing_host || !dict) + return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); + + rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG); + + return PARSER_RC_OK; +} + +static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) { + PARSER_USER_OBJECT *u = user; + return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL); +} + +static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { + PARSER_USER_OBJECT *u = user; + + if(!u->host_define.parsing_host) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); + + RRDHOST *host = rrdhost_find_or_create( + string2str(u->host_define.hostname), + string2str(u->host_define.hostname), + u->host_define.machine_guid_str, + "Netdata Virtual Host 1.0", + netdata_configured_timezone, + netdata_configured_abbrev_timezone, + netdata_configured_utc_offset, + NULL, + program_name, + program_version, + default_rrd_update_every, + default_rrd_history_entries, + default_rrd_memory_mode, + default_health_enabled, + default_rrdpush_enabled, + default_rrdpush_destination, + default_rrdpush_api_key, + default_rrdpush_send_charts_matching, + default_rrdpush_enable_replication, + default_rrdpush_seconds_to_replicate, + default_rrdpush_replication_step, + rrdhost_labels_to_system_info(u->host_define.rrdlabels), + false + ); + + if(host->rrdlabels) { + rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels); + } + else { + host->rrdlabels = u->host_define.rrdlabels; + u->host_define.rrdlabels = NULL; + } + + pluginsd_host_define_cleanup(user); + + u->host = host; + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END); + + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + rrdcontext_host_child_connected(host); + schedule_node_info_update(host); + + return PARSER_RC_OK; +} + +static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) { + PARSER_USER_OBJECT *u = user; + + char *guid = get_word(words, num_words, 1); + + if(!guid || !*guid || strcmp(guid, "localhost") == 0) { + u->host = localhost; + return PARSER_RC_OK; + } + + uuid_t uuid; + char uuid_str[UUID_STR_LEN]; + if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?"); + + RRDHOST *host = rrdhost_find_by_guid(uuid_str); + if(unlikely(!host)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?"); + + u->host = host; + + return PARSER_RC_OK; +} + PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) { RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); char *type = get_word(words, num_words, 1); char *name = get_word(words, num_words, 2); @@ -231,19 +464,14 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) } // make sure we have the required variables - if (unlikely((!type || !*type || !id || !*id))) { - error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.", - rrdhost_hostname(host)); - - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; - } + if (unlikely((!type || !*type || !id || !*id))) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters"); // parse the name, and make sure it does not include 'type.' if (unlikely(name && *name)) { // when data are streamed from child nodes // name will be type.name - // so we have to remove 'type.' from name too + // so, we have to remove 'type.' from name too size_t len = strlen(type); if (strncmp(type, name, len) == 0 && name[len] == '.') name = &name[len + 1]; @@ -320,7 +548,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } } - ((PARSER_USER_OBJECT *)user)->st = st; + pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART); return PARSER_RC_OK; } @@ -332,10 +560,10 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us const char *wall_clock_time_txt = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; @@ -379,33 +607,24 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) char *options = get_word(words, num_words, 6); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - if (unlikely(!id)) { - error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.", - rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET"); - return PLUGINSD_DISABLE_PLUGIN(user); - } - - if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) { - error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.", - rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (unlikely(!id)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id"); long multiplier = 1; if (multiplier_s && *multiplier_s) { - multiplier = strtol(multiplier_s, NULL, 0); + multiplier = str2ll_encoded(multiplier_s); if (unlikely(!multiplier)) multiplier = 1; } long divisor = 1; if (likely(divisor_s && *divisor_s)) { - divisor = strtol(divisor_s, NULL, 0); + divisor = str2ll_encoded(divisor_s); if (unlikely(!divisor)) divisor = 1; } @@ -683,7 +902,7 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u } else { if(format && *format) - pf->destination_wb->contenttype = functions_format_to_content_type(format); + pf->destination_wb->content_type = functions_format_to_content_type(format); pf->code = code; @@ -712,9 +931,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) NETDATA_DOUBLE v; RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; + RRDSET *st = pluginsd_get_chart_from_parent(user); int global = (st) ? 0 : 1; @@ -730,13 +949,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) } } - if (unlikely(!name || !*name)) { - error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.", - rrdhost_hostname(host), st ? rrdset_id(st):"UNSET"); - - ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (unlikely(!name || !*name)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name"); if (unlikely(!value || !*value)) value = NULL; @@ -750,17 +964,11 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) return PARSER_RC_OK; } - if (!global && !st) { - error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - name - ); - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (!global && !st) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given"); char *endptr = NULL; - v = (NETDATA_DOUBLE)str2ndd(value, &endptr); + v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr); if (unlikely(endptr && *endptr)) { if (endptr == value) error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", @@ -803,8 +1011,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - debug(D_PLUGINSD, "requested a FLUSH"); - ((PARSER_USER_OBJECT *) user)->st = NULL; + debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH); + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH); ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; @@ -816,7 +1024,7 @@ PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe { info("PLUGINSD: plugin called DISABLE. Disabling it."); ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; + return PARSER_RC_STOP; } PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) @@ -825,10 +1033,8 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) const char *label_source = get_word(words, num_words, 2); const char *value = get_word(words, num_words, 3); - if (!name || !label_source || !value) { - error("PLUGINSD: ignoring malformed or empty LABEL command."); - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (!name || !label_source || !value) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters"); char *store = (char *)value; bool allocated_store = false; @@ -874,7 +1080,7 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); debug(D_PLUGINSD, "requested to OVERWRITE host labels"); @@ -898,11 +1104,12 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) if (!name || !value || !*label_source) { error("Ignoring malformed or empty CHART LABEL command."); - return PLUGINSD_DISABLE_PLUGIN(user); + return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); } if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) { - ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels; + RRDSET *st = pluginsd_get_chart_from_parent(user); + ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels; rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); } @@ -915,17 +1122,17 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); debug(D_PLUGINSD, "requested to commit chart labels"); if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) { error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(user); + return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); } rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); @@ -937,15 +1144,14 @@ PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words _ return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user) -{ +PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) { char *id = get_word(words, num_words, 1); char *start_time_str = get_word(words, num_words, 2); char *end_time_str = get_word(words, num_words, 3); char *child_now_str = get_word(words, num_words, 4); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st; if (likely(!id || !*id)) @@ -953,17 +1159,17 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use else st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - ((PARSER_USER_OBJECT *) user)->st = st; + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(start_time_str && end_time_str) { - time_t start_time = (time_t)str2ul(start_time_str); - time_t end_time = (time_t)str2ul(end_time_str); + time_t start_time = (time_t) str2ull_encoded(start_time_str); + time_t end_time = (time_t) str2ull_encoded(end_time_str); time_t wall_clock_time = 0, tolerance; bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child; if(child_now_str) { - wall_clock_time = (time_t)str2ul(child_now_str); + wall_clock_time = (time_t) str2ull_encoded(child_now_str); tolerance = st->update_every + 1; wall_clock_comes_from_child = true; } @@ -1016,7 +1222,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use return PARSER_RC_OK; } - error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN + " from %ld to %ld, but timestamps are invalid " + "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); } @@ -1033,6 +1241,33 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use return PARSER_RC_OK; } +static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) { + SN_FLAGS flags = SN_FLAG_NONE; + + char c; + while ((c = *flags_str++)) { + switch (c) { + case 'A': + flags |= SN_FLAG_NOT_ANOMALOUS; + break; + + case 'R': + flags |= SN_FLAG_RESET; + break; + + case 'E': + flags = SN_EMPTY_SLOT; + return flags; + + default: + internal_error(true, "Unknown SN_FLAGS flag '%c'", c); + break; + } + } + + return flags; +} + PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) { char *dimension = get_word(words, num_words, 1); @@ -1040,31 +1275,34 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) char *flags_str = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) { + PARSER_USER_OBJECT *u = user; + if(!u->replay.rset_enabled) { error_limit_static_thread_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors", - rrdhost_hostname(host), rrdset_id(st)); + error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", + rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); // we have to return OK here return PARSER_RC_OK; } - RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); - if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) { - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + if (unlikely(!u->replay.start_time || !u->replay.end_time)) { + error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.", rrdhost_hostname(host), rrdset_id(st), dimension, - ((PARSER_USER_OBJECT *) user)->replay.start_time, - ((PARSER_USER_OBJECT *) user)->replay.end_time); - return PLUGINSD_DISABLE_PLUGIN(user); + PLUGINSD_KEYWORD_REPLAY_SET, + u->replay.start_time, + u->replay.end_time, + PLUGINSD_KEYWORD_REPLAY_BEGIN); + return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); } if (unlikely(!value_str || !*value_str)) @@ -1074,39 +1312,19 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) flags_str = ""; if (likely(value_str)) { - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); - RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) { - NETDATA_DOUBLE value = strtondd(value_str, NULL); - SN_FLAGS flags = SN_FLAG_NONE; - - char c; - while ((c = *flags_str++)) { - switch (c) { - case 'R': - flags |= SN_FLAG_RESET; - break; - - case 'E': - flags |= SN_EMPTY_SLOT; - value = NAN; - break; - - default: - error("unknown flag '%c'", c); - break; - } - } + NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL); + SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str); - if (!netdata_double_isnumber(value)) { + if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) { value = NAN; flags = SN_EMPTY_SLOT; } - rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags); - rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time; + rrddim_store_metric(rd, u->replay.end_time_ut, value, flags); + rd->last_collected_time.tv_sec = u->replay.end_time; rd->last_collected_time.tv_usec = 0; rd->collections_counter++; } @@ -1117,7 +1335,6 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) } } - rrddim_acquired_release(rda); return PARSER_RC_OK; } @@ -1133,26 +1350,25 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words char *last_stored_value_str = get_word(words, num_words, 5); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec; - usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0; + usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; if(last_collected_ut > dim_last_collected_ut) { - rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC; - rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC; + rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); + rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); } - rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0; - rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0; - rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0; - rrddim_acquired_release(rda); + rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0; + rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0; + rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0; + return PARSER_RC_OK; } @@ -1165,23 +1381,23 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words char *last_updated_ut_str = get_word(words, num_words, 2); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec; - usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0; + usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; if(last_collected_ut > chart_last_collected_ut) { - st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC; - st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC; + st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); + st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); } usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec; - usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0; + usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0; if(last_updated_ut > chart_last_updated_ut) { - st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC; - st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC; + st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC); + st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC); } st->counter++; @@ -1205,24 +1421,25 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) const char *last_entry_requested_txt = get_word(words, num_words, 6); const char *child_world_time_txt = get_word(words, num_words, 7); // optional - time_t update_every_child = (time_t)str2ul(update_every_child_txt); - time_t first_entry_child = (time_t)str2ul(first_entry_child_txt); - time_t last_entry_child = (time_t)str2ul(last_entry_child_txt); + time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt); + time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt); + time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt); bool start_streaming = (strcmp(start_streaming_txt, "true") == 0); - time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt); - time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt); + time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt); + time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt); // the optional child world time - time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec(); + time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded( + child_world_time_txt) : now_realtime_sec(); PARSER_USER_OBJECT *user_object = user; RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, @@ -1235,8 +1452,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) ); #endif - ((PARSER_USER_OBJECT *) user)->st = NULL; - ((PARSER_USER_OBJECT *) user)->count++; + ((PARSER_USER_OBJECT *) user)->data_collections_count++; if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) { time_t now = now_realtime_sec(); @@ -1282,11 +1498,16 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.", rrdhost_hostname(host), rrdset_id(st)); #endif + + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END); + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0); return PARSER_RC_OK; } + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END); + rrdcontext_updated_retention_rrdset(st); bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, @@ -1295,8 +1516,319 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } +PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { + timing_init(); + + char *id = get_word(words, num_words, 1); + char *update_every_str = get_word(words, num_words, 2); + char *end_time_str = get_word(words, num_words, 3); + char *wall_clock_time_str = get_word(words, num_words, 4); + + if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + + timing_step(TIMING_STEP_BEGIN2_PREPARE); + + RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + + pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN_V2); + + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED))) + rrdset_isnot_obsolete(st); + + timing_step(TIMING_STEP_BEGIN2_FIND_CHART); + + // ------------------------------------------------------------------------ + // parse the parameters + + time_t update_every = (time_t) str2ull_encoded(update_every_str); + time_t end_time = (time_t) str2ull_encoded(end_time_str); + + time_t wall_clock_time; + if(likely(*wall_clock_time_str == '#')) + wall_clock_time = end_time; + else + wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str); + + if (unlikely(update_every != st->update_every)) + rrdset_set_update_every_s(st, update_every); + + timing_step(TIMING_STEP_BEGIN2_PARSE); + + // ------------------------------------------------------------------------ + // prepare our state + + pluginsd_lock_rrdset_data_collection(user); + + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + u->v2.update_every = update_every; + u->v2.end_time = end_time; + u->v2.wall_clock_time = wall_clock_time; + u->v2.ml_locked = ml_chart_update_begin(st); + + timing_step(TIMING_STEP_BEGIN2_ML); + + // ------------------------------------------------------------------------ + // propagate it forward in v2 + + if(!u->v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost)) + u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time); + + if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) { + // check if receiver and sender have the same number parsing capabilities + bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754); + NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + + BUFFER *wb = u->v2.stream_buffer.wb; + + buffer_need_bytes(wb, 1024); + + if(unlikely(u->v2.stream_buffer.begin_v2_added)) + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); + buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); + buffer_fast_strcat(wb, "' ", 2); + + if(can_copy) + buffer_strcat(wb, update_every_str); + else + buffer_print_uint64_encoded(wb, encoding, update_every); + + buffer_fast_strcat(wb, " ", 1); + + if(can_copy) + buffer_strcat(wb, end_time_str); + else + buffer_print_uint64_encoded(wb, encoding, end_time); + + buffer_fast_strcat(wb, " ", 1); + + if(can_copy) + buffer_strcat(wb, wall_clock_time_str); + else + buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + + buffer_fast_strcat(wb, "\n", 1); + + u->v2.stream_buffer.last_point_end_time_s = end_time; + u->v2.stream_buffer.begin_v2_added = true; + } + + timing_step(TIMING_STEP_BEGIN2_PROPAGATE); + + // ------------------------------------------------------------------------ + // store it + + st->last_collected_time.tv_sec = end_time; + st->last_collected_time.tv_usec = 0; + st->last_updated.tv_sec = end_time; + st->last_updated.tv_usec = 0; + st->counter++; + st->counter_done++; + + // these are only needed for db mode RAM, SAVE, MAP, ALLOC + st->current_entry++; + if(st->current_entry >= st->entries) + st->current_entry -= st->entries; + + timing_step(TIMING_STEP_BEGIN2_STORE); + + return PARSER_RC_OK; +} + +PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { + timing_init(); + + char *dimension = get_word(words, num_words, 1); + char *collected_str = get_word(words, num_words, 2); + char *value_str = get_word(words, num_words, 3); + char *flags_str = get_word(words, num_words, 4); + + if(unlikely(!dimension || !collected_str || !value_str || !flags_str)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); + + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + + timing_step(TIMING_STEP_SET2_PREPARE); + + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2); + if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + + if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED))) + rrddim_isnot_obsolete(st, rd); + + timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION); + + // ------------------------------------------------------------------------ + // parse the parameters + + collected_number collected_value = (collected_number) str2ll_encoded(collected_str); + + NETDATA_DOUBLE value; + if(*value_str == '#') + value = (NETDATA_DOUBLE)collected_value; + else + value = str2ndd_encoded(value_str, NULL); + + SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str); + + timing_step(TIMING_STEP_SET2_PARSE); + + // ------------------------------------------------------------------------ + // check value and ML + + if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) { + value = NAN; + flags = SN_EMPTY_SLOT; + + if(u->v2.ml_locked) + ml_dimension_is_anomalous(rd, u->v2.end_time, 0, false); + } + else if(u->v2.ml_locked) { + if (ml_dimension_is_anomalous(rd, u->v2.end_time, value, true)) { + // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous + flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS); + } + else + flags |= SN_FLAG_NOT_ANOMALOUS; + } + + timing_step(TIMING_STEP_SET2_ML); + + // ------------------------------------------------------------------------ + // propagate it forward in v2 + + if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) { + // check if receiver and sender have the same number parsing capabilities + bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754); + NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + + BUFFER *wb = u->v2.stream_buffer.wb; + buffer_need_bytes(wb, 1024); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "' ", 2); + if(can_copy) + buffer_strcat(wb, collected_str); + else + buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex + buffer_fast_strcat(wb, " ", 1); + if(can_copy) + buffer_strcat(wb, value_str); + else + buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal + buffer_fast_strcat(wb, " ", 1); + buffer_print_sn_flags(wb, flags, true); + buffer_fast_strcat(wb, "\n", 1); + } + + timing_step(TIMING_STEP_SET2_PROPAGATE); + + // ------------------------------------------------------------------------ + // store it + + rrddim_store_metric(rd, u->v2.end_time * USEC_PER_SEC, value, flags); + rd->last_collected_time.tv_sec = u->v2.end_time; + rd->last_collected_time.tv_usec = 0; + rd->last_collected_value = collected_value; + rd->last_stored_value = value; + rd->last_calculated_value = value; + rd->collections_counter++; + rd->updated = true; + + timing_step(TIMING_STEP_SET2_STORE); + + return PARSER_RC_OK; +} + +void pluginsd_cleanup_v2(void *user) { + // this is called when the thread is stopped while processing + pluginsd_set_chart_from_parent(user, NULL, "THREAD CLEANUP"); +} + +PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { + timing_init(); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + u->data_collections_count++; + + timing_step(TIMING_STEP_END2_PREPARE); + + // ------------------------------------------------------------------------ + // propagate the whole chart update in v1 + + if(unlikely(!u->v2.stream_buffer.v2 && !u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb)) + rrdset_push_metrics_v1(&u->v2.stream_buffer, st); + + timing_step(TIMING_STEP_END2_PUSH_V1); + + // ------------------------------------------------------------------------ + // unblock data collection + + ml_chart_update_end(st); + u->v2.ml_locked = false; + + timing_step(TIMING_STEP_END2_ML); + + pluginsd_unlock_rrdset_data_collection(user); + rrdcontext_collected_rrdset(st); + store_metric_collection_completed(); + + timing_step(TIMING_STEP_END2_RRDSET); + + // ------------------------------------------------------------------------ + // propagate it forward + + rrdset_push_metrics_finished(&u->v2.stream_buffer, st); + + timing_step(TIMING_STEP_END2_PROPAGATE); + + // ------------------------------------------------------------------------ + // cleanup RRDSET / RRDDIM + + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + rd->calculated_value = 0; + rd->collected_value = 0; + rd->updated = false; + } + rrddim_foreach_done(rd); + + // ------------------------------------------------------------------------ + // reset state + + u->v2 = (struct parser_user_object_v2){ 0 }; + + timing_step(TIMING_STEP_END2_STORE); + timing_report(); + + return PARSER_RC_OK; +} + static void pluginsd_process_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; + + pluginsd_cleanup_v2(parser->user); + pluginsd_host_define_cleanup(parser->user); + rrd_collector_finished(); parser_destroy(parser); } @@ -1335,7 +1867,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi }; // fp_plugin_output = our input; fp_plugin_input = our output - PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); + PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, + PARSER_INPUT_SPLIT, NULL); + + pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD); rrd_collector_started(); @@ -1344,9 +1879,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); user.parser = parser; + char buffer[PLUGINSD_LINE_MAX + 1]; - while (likely(!parser_next(parser))) { - if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL))) + while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) { + if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer))) break; } @@ -1354,7 +1890,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi netdata_thread_cleanup_pop(1); cd->unsafe.enabled = user.enabled; - size_t count = user.count; + size_t count = user.data_collections_count; if (likely(count)) { cd->successful_collections += count; @@ -1365,3 +1901,141 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi return count; } + +PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused) +{ + info("PLUGINSD: plugin called EXIT."); + return PARSER_RC_STOP; +} + +static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) { + + if (types & PARSER_INIT_PLUGINSD) { + add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); + add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); + + add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define); + add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end); + add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels); + add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host); + + add_func(parser, PLUGINSD_KEYWORD_EXIT, pluginsd_exit); + } + + if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) { + // plugins.d plugins and streaming + add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); + add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); + add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); + add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); + add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); + add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); + add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); + add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); + add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); + + add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); + add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); + add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + + inflight_functions_init(parser); + } + + if (types & PARSER_INIT_STREAMING) { + add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end); + + // replication + add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end); + + // streaming metrics v2 + add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2); + add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2); + add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2); + } +} + +void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) { + pluginsd_keywords_init_internal(parser, types, parser_add_keyword); +} + +struct pluginsd_user_unittest { + size_t size; + const char **hashtable; + uint32_t (*hash)(const char *s); + size_t collisions; +}; + +void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) { + struct pluginsd_user_unittest *u = parser->user; + + uint32_t hash = u->hash(keyword); + uint32_t slot = hash % u->size; + + if(u->hashtable[slot]) + u->collisions++; + + u->hashtable[slot] = keyword; +} + +static struct { + const char *name; + uint32_t (*hash)(const char *s); + size_t slots_needed; +} hashers[] = { + { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, }, + { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, }, + { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, }, + { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, }, + { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, }, + + // terminator + { .name = NULL, NULL, .slots_needed = 0, }, +}; + +int pluginsd_parser_unittest(void) { + PARSER *p; + size_t slots_to_check = 1000; + size_t i, h; + + // check for hashtable collisions + for(h = 0; hashers[h].name ;h++) { + hashers[h].slots_needed = slots_to_check * 1000000; + + for (i = 10; i < slots_to_check; i++) { + struct pluginsd_user_unittest user = { + .hash = hashers[h].hash, + .size = i, + .hashtable = callocz(i, sizeof(const char *)), + .collisions = 0, + }; + + p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); + pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING, + pluginsd_keyword_collision_check); + parser_destroy(p); + + freez(user.hashtable); + + if (!user.collisions) { + hashers[h].slots_needed = i; + break; + } + } + } + + for(h = 0; hashers[h].name ;h++) { + if(hashers[h].slots_needed > 1000) + info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check); + else + info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed); + } + + p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); + pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING); + parser_destroy(p); + return 0; +} diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index e18b43e58..1fdc23a0e 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -3,7 +3,12 @@ #ifndef NETDATA_PLUGINSD_PARSER_H #define NETDATA_PLUGINSD_PARSER_H -#include "parser/parser.h" +#include "daemon/common.h" + +typedef enum __attribute__ ((__packed__)) { + PARSER_INIT_PLUGINSD = (1 << 1), + PARSER_INIT_STREAMING = (1 << 2), +} PLUGINSD_KEYWORDS; typedef struct parser_user_object { PARSER *parser; @@ -14,13 +19,20 @@ typedef struct parser_user_object { int trust_durations; DICTIONARY *new_host_labels; DICTIONARY *chart_rrdlabels_linked_temporarily; - size_t count; + size_t data_collections_count; int enabled; - uint8_t st_exists; - uint8_t host_exists; - void *private; // the user can set this for private use + + STREAM_CAPABILITIES capabilities; // receiver capabilities struct { + bool parsing_host; + uuid_t machine_guid; + char machine_guid_str[UUID_STR_LEN]; + STRING *hostname; + DICTIONARY *rrdlabels; + } host_define; + + struct parser_user_object_replay { time_t start_time; time_t end_time; @@ -31,9 +43,20 @@ typedef struct parser_user_object { bool rset_enabled; } replay; + + struct parser_user_object_v2 { + bool locked_data_collection; + RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this + time_t update_every; + time_t end_time; + time_t wall_clock_time; + bool ml_locked; + } v2; } PARSER_USER_OBJECT; PARSER_RC pluginsd_function(char **words, size_t num_words, void *user); PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user); void inflight_functions_init(PARSER *parser); +void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types); + #endif //NETDATA_PLUGINSD_PARSER_H |