summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
commita836a244a3d2bdd4da1ee2641e3e957850668cea (patch)
treecb87c75b3677fab7144f868435243f864048a1e6 /collectors/plugins.d
parentAdding upstream version 1.38.1. (diff)
downloadnetdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz
netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/README.md89
-rw-r--r--collectors/plugins.d/plugins_d.c93
-rw-r--r--collectors/plugins.d/plugins_d.h15
-rw-r--r--collectors/plugins.d/pluginsd_parser.c994
-rw-r--r--collectors/plugins.d/pluginsd_parser.h33
5 files changed, 1008 insertions, 216 deletions
diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md
index 8ad1d3a65..1c3b50cb7 100644
--- a/collectors/plugins.d/README.md
+++ b/collectors/plugins.d/README.md
@@ -1,13 +1,13 @@
<!--
-title: "External plugins overview"
+title: "External plugins"
custom_edit_url: "https://github.com/netdata/netdata/edit/master/collectors/plugins.d/README.md"
-sidebar_label: "External plugins overview"
+sidebar_label: "External plugins"
learn_status: "Published"
learn_topic_type: "References"
-learn_rel_path: "Developers"
+learn_rel_path: "Developers/External plugins"
-->
-# External plugins overview
+# External plugins
`plugins.d` is the Netdata internal plugin that collects metrics
from external processes, thus allowing Netdata to use **external plugins**.
@@ -138,7 +138,7 @@ a single program can produce any number of charts with any number of dimensions
Charts can be added any time (not just the beginning).
-### command line parameters
+### Command line parameters
The plugin **MUST** accept just **one** parameter: **the number of seconds it is
expected to update the values for its charts**. The value passed by Netdata
@@ -149,7 +149,7 @@ The external plugin can overwrite the update frequency. For example, the server
request per second updates, but the plugin may ignore it and update its charts
every 5 seconds.
-### environment variables
+### Environment variables
There are a few environment variables that are set by `netdata` and are
available for the plugin to use.
@@ -175,6 +175,83 @@ The plugin should output instructions for Netdata to its output (`stdout`). Sinc
`DISABLE` will disable this plugin. This will prevent Netdata from restarting the plugin. You can also exit with the value `1` to have the same effect.
+#### HOST_DEFINE
+
+`HOST_DEFINE` defines a new (or updates an existing) virtual host.
+
+The template is:
+
+> HOST_DEFINE machine_guid hostname
+
+where:
+
+- `machine_guid`
+
+ uniquely identifies the host, this is what will be needed to add charts to the host.
+
+- `hostname`
+
+ is the hostname of the virtual host
+
+#### HOST_LABEL
+
+`HOST_LABEL` adds a key-value pair to the virtual host labels. It has to be given between `HOST_DEFINE` and `HOST_DEFINE_END`.
+
+The template is:
+
+> HOST_LABEL key value
+
+where:
+
+- `key`
+
+ uniquely identifies the key of the label
+
+- `value`
+
+ is the value associated with this key
+
+There are a few special keys that are used to define the system information of the monitored system:
+
+- `_cloud_provider_type`
+- `_cloud_instance_type`
+- `_cloud_instance_region`
+- `_os_name`
+- `_os_version`
+- `_kernel_version`
+- `_system_cores`
+- `_system_cpu_freq`
+- `_system_ram_total`
+- `_system_disk_space`
+- `_architecture`
+- `_virtualization`
+- `_container`
+- `_container_detection`
+- `_virt_detection`
+- `_is_k8s_node`
+- `_install_type`
+- `_prebuilt_arch`
+- `_prebuilt_dist`
+
+#### HOST_DEFINE_END
+
+`HOST_DEFINE_END` commits the host information, creating a new host entity, or updating an existing one with the same `machine_guid`.
+
+#### HOST
+
+`HOST` switches data collection between hosts.
+
+The template is:
+
+> HOST machine_guid
+
+where:
+
+- `machine_guid`
+
+ is the UUID of the host to switch to. After this command, every other command following it is assumed to be associated with this host.
+ Setting machine_guid to `localhost` switches data collection to the local host.
+
#### CHART
`CHART` defines a new chart.
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 7608f3afc..dc13cd2ee 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -18,7 +18,7 @@ inline size_t pluginsd_initialize_plugin_directories()
}
// Parse it and store it to plugin directories
- return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0);
+ return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace);
}
static inline void plugin_set_disabled(struct plugind *cd) {
@@ -51,6 +51,8 @@ static void pluginsd_worker_thread_cleanup(void *arg)
{
struct plugind *cd = (struct plugind *)arg;
+ worker_unregister();
+
netdata_spinlock_lock(&cd->unsafe.spinlock);
cd->unsafe.running = false;
@@ -62,74 +64,73 @@ static void pluginsd_worker_thread_cleanup(void *arg)
netdata_spinlock_unlock(&cd->unsafe.spinlock);
if (pid) {
- info("data collection thread exiting");
-
siginfo_t info;
- info("killing child process pid %d", pid);
+ info("PLUGINSD: 'host:%s', killing data collection child process with pid %d",
+ rrdhost_hostname(cd->host), pid);
+
if (killpid(pid) != -1) {
- info("waiting for child process pid %d to exit...", pid);
+ info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...",
+ rrdhost_hostname(cd->host), pid);
+
waitid(P_PID, (id_t)pid, &info, WEXITED);
}
}
}
#define SERIAL_FAILURES_THRESHOLD 10
-static void pluginsd_worker_thread_handle_success(struct plugind *cd)
-{
+static void pluginsd_worker_thread_handle_success(struct plugind *cd) {
if (likely(cd->successful_collections)) {
sleep((unsigned int)cd->update_every);
return;
}
if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) {
- info(
- "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
- cd->fullfilename, cd->unsafe.pid,
- plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+ info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+
sleep((unsigned int)(cd->update_every * 10));
return;
}
if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
- error(
- "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)."
- "We have tried to collect something %zu times - unsuccessfully. Disabling it.",
- cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
+ error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, "
+ "although it reports success (exits with 0)."
+ "We have tried to collect something %zu times - unsuccessfully. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
plugin_set_disabled(cd);
return;
}
}
-static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code)
-{
+static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) {
if (worker_ret_code == -1) {
- info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid);
+ info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
plugin_set_disabled(cd);
return;
}
if (!cd->successful_collections) {
- error(
- "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename,
- cd->unsafe.pid, worker_ret_code);
+ error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code);
plugin_set_disabled(cd);
return;
}
if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) {
- error(
- "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
- cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
- plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
+ error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
sleep((unsigned int)(cd->update_every * 10));
return;
}
if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
- error(
- "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
- "We tried to restart it %zu times, but it failed to generate data. Disabling it.",
- cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures);
+ error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
+ "We tried to restart it %zu times, but it failed to generate data. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code,
+ cd->successful_collections, cd->serial_failures);
plugin_set_disabled(cd);
return;
}
@@ -137,8 +138,7 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r
#undef SERIAL_FAILURES_THRESHOLD
-static void *pluginsd_worker_thread(void *arg)
-{
+static void *pluginsd_worker_thread(void *arg) {
worker_register("PLUGINSD");
netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg);
@@ -151,14 +151,20 @@ static void *pluginsd_worker_thread(void *arg)
while (service_running(SERVICE_COLLECTORS)) {
FILE *fp_child_input = NULL;
FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
+
if (unlikely(!fp_child_input || !fp_child_output)) {
- error("Cannot popen(\"%s\", \"r\").", cd->cmd);
+ error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd);
break;
}
- info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid);
- count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0);
- error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count);
+ info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
+
+ count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
+
+ info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
+
killpid(cd->unsafe.pid);
int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
@@ -172,29 +178,29 @@ static void *pluginsd_worker_thread(void *arg)
if (unlikely(!plugin_is_enabled(cd)))
break;
}
- worker_unregister();
netdata_thread_cleanup_pop(1);
return NULL;
}
-static void pluginsd_main_cleanup(void *data)
-{
+static void pluginsd_main_cleanup(void *data) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- info("cleaning up...");
+ info("PLUGINSD: cleaning up...");
struct plugind *cd;
for (cd = pluginsd_root; cd; cd = cd->next) {
netdata_spinlock_lock(&cd->unsafe.spinlock);
if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) {
- info("stopping plugin thread: %s", cd->id);
+ info("PLUGINSD: 'host:%s', stopping plugin thread: %s",
+ rrdhost_hostname(cd->host), cd->id);
+
netdata_thread_cancel(cd->unsafe.thread);
}
netdata_spinlock_unlock(&cd->unsafe.spinlock);
}
- info("cleanup completed.");
+ info("PLUGINSD: cleanup completed.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
worker_unregister();
@@ -282,6 +288,7 @@ void *pluginsd_main(void *ptr)
strncpyz(cd->filename, file->d_name, FILENAME_MAX);
snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename);
+ cd->host = localhost;
cd->unsafe.enabled = enabled;
cd->unsafe.running = false;
@@ -294,9 +301,7 @@ void *pluginsd_main(void *ptr)
config_get(cd->id, "command options", def));
// link it
- if (likely(pluginsd_root))
- cd->next = pluginsd_root;
- pluginsd_root = cd;
+ DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(pluginsd_root, cd, prev, next);
if (plugin_is_enabled(cd)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 35af9fe58..68ed4940f 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -34,6 +34,17 @@
#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
#define PLUGINSD_KEYWORD_REPLAY_END "REND"
+#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2"
+#define PLUGINSD_KEYWORD_SET_V2 "SET2"
+#define PLUGINSD_KEYWORD_END_V2 "END2"
+
+#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE"
+#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END"
+#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL"
+#define PLUGINSD_KEYWORD_HOST "HOST"
+
+#define PLUGINSD_KEYWORD_EXIT "EXIT"
+
#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
#define PLUGINSD_LINE_MAX_SSL_READ 512
@@ -56,6 +67,7 @@ struct plugind {
size_t serial_failures; // the number of times the plugin started
// without collecting values
+ RRDHOST *host; // the host the plugin collects data for
int update_every; // the plugin default data collection frequency
struct {
@@ -67,7 +79,8 @@ struct plugind {
} unsafe;
time_t started_t;
- uint32_t capabilities; // follows the same principles as streaming capabilities
+
+ struct plugind *prev;
struct plugind *next;
};
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 2c0f2cbc6..28fc0bd49 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -71,20 +71,109 @@ static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char
return st;
}
-static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
+static inline RRDSET *pluginsd_get_chart_from_parent(void *user) {
+ return ((PARSER_USER_OBJECT *) user)->st;
+}
+
+static inline void pluginsd_lock_rrdset_data_collection(void *user) {
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ if(u->st && !u->v2.locked_data_collection) {
+ netdata_spinlock_lock(&u->st->data_collection_lock);
+ u->v2.locked_data_collection = true;
+ }
+}
+
+static inline bool pluginsd_unlock_rrdset_data_collection(void *user) {
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ if(u->st && u->v2.locked_data_collection) {
+ netdata_spinlock_unlock(&u->st->data_collection_lock);
+ u->v2.locked_data_collection = false;
+ return true;
+ }
+
+ return false;
+}
+
+void pluginsd_rrdset_cleanup(RRDSET *st) {
+ for(size_t i = 0; i < st->pluginsd.used ; i++) {
+ if (st->pluginsd.rda[i]) {
+ rrddim_acquired_release(st->pluginsd.rda[i]);
+ st->pluginsd.rda[i] = NULL;
+ }
+ }
+ freez(st->pluginsd.rda);
+ st->pluginsd.rda = NULL;
+ st->pluginsd.size = 0;
+ st->pluginsd.used = 0;
+ st->pluginsd.pos = 0;
+}
+
+static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) {
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+
+ if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) {
+ error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
+ rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ }
+
+ if(unlikely(u->v2.ml_locked)) {
+ ml_chart_update_end(u->st);
+ u->v2.ml_locked = false;
+
+ error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
+ rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ }
+
+ if(st) {
+ size_t dims = dictionary_entries(st->rrddim_root_index);
+ if(unlikely(st->pluginsd.size < dims)) {
+ st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
+ st->pluginsd.size = dims;
+ }
+
+ if(st->pluginsd.pos > st->pluginsd.used && st->pluginsd.pos <= st->pluginsd.size)
+ st->pluginsd.used = st->pluginsd.pos;
+
+ st->pluginsd.pos = 0;
+ }
+
+ u->st = st;
+}
+
+static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
if (unlikely(!dimension || !*dimension)) {
error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
- RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+ RRDDIM_ACQUIRED *rda;
- if (unlikely(!rda))
+ if(likely(st->pluginsd.pos < st->pluginsd.used)) {
+ rda = st->pluginsd.rda[st->pluginsd.pos];
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+ if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
+ st->pluginsd.pos++;
+ return rd;
+ }
+ else {
+ rrddim_acquired_release(rda);
+ st->pluginsd.rda[st->pluginsd.pos] = NULL;
+ }
+ }
+
+ rda = rrddim_find_and_acquire(st, dimension);
+ if (unlikely(!rda)) {
error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
- return rda;
+ return NULL;
+ }
+
+ if(likely(st->pluginsd.pos < st->pluginsd.size))
+ st->pluginsd.rda[st->pluginsd.pos++] = rda;
+
+ return rrddim_acquired_to_rrddim(rda);
}
static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
@@ -102,8 +191,14 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons
return st;
}
-static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) {
+static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) {
((PARSER_USER_OBJECT *) user)->enabled = 0;
+
+ if(keyword && msg) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
+ }
+
return PARSER_RC_ERROR;
}
@@ -113,24 +208,21 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
char *value = get_word(words, num_words, 2);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
-
- RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
- if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
if (value && *value)
- rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+ rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
- rrddim_acquired_release(rda);
return PARSER_RC_OK;
}
@@ -140,12 +232,12 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
char *microseconds_txt = get_word(words, num_words, 2);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- ((PARSER_USER_OBJECT *)user)->st = st;
+ pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN);
usec_t microseconds = 0;
if (microseconds_txt && *microseconds_txt) {
@@ -187,16 +279,16 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
UNUSED(num_words);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
- ((PARSER_USER_OBJECT *) user)->st = NULL;
- ((PARSER_USER_OBJECT *) user)->count++;
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END);
+ ((PARSER_USER_OBJECT *) user)->data_collections_count++;
struct timeval now;
now_realtime_timeval(&now);
@@ -205,10 +297,151 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
+static void pluginsd_host_define_cleanup(void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ string_freez(u->host_define.hostname);
+ dictionary_destroy(u->host_define.rrdlabels);
+
+ u->host_define.hostname = NULL;
+ u->host_define.rrdlabels = NULL;
+ u->host_define.parsing_host = false;
+}
+
+static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
+ if(uuid_parse(guid, *uuid))
+ return false;
+
+ uuid_unparse_lower(*uuid, output);
+
+ return true;
+}
+
+static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ char *guid = get_word(words, num_words, 1);
+ char *hostname = get_word(words, num_words, 2);
+
+ if(unlikely(!guid || !*guid || !hostname || !*hostname))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
+
+ if(unlikely(u->host_define.parsing_host))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE,
+ "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
+
+ if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
+
+ u->host_define.hostname = string_strdupz(hostname);
+ u->host_define.rrdlabels = rrdlabels_create();
+ u->host_define.parsing_host = true;
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) {
+ PARSER_USER_OBJECT *u = user;
+
+ char *name = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
+
+ if(!name || !*name || !value)
+ return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters");
+
+ if(!u->host_define.parsing_host || !dict)
+ return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+
+ rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
+
+ return PARSER_RC_OK;
+}
+
+static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) {
+ PARSER_USER_OBJECT *u = user;
+ return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL);
+}
+
+static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ if(!u->host_define.parsing_host)
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+
+ RRDHOST *host = rrdhost_find_or_create(
+ string2str(u->host_define.hostname),
+ string2str(u->host_define.hostname),
+ u->host_define.machine_guid_str,
+ "Netdata Virtual Host 1.0",
+ netdata_configured_timezone,
+ netdata_configured_abbrev_timezone,
+ netdata_configured_utc_offset,
+ NULL,
+ program_name,
+ program_version,
+ default_rrd_update_every,
+ default_rrd_history_entries,
+ default_rrd_memory_mode,
+ default_health_enabled,
+ default_rrdpush_enabled,
+ default_rrdpush_destination,
+ default_rrdpush_api_key,
+ default_rrdpush_send_charts_matching,
+ default_rrdpush_enable_replication,
+ default_rrdpush_seconds_to_replicate,
+ default_rrdpush_replication_step,
+ rrdhost_labels_to_system_info(u->host_define.rrdlabels),
+ false
+ );
+
+ if(host->rrdlabels) {
+ rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels);
+ }
+ else {
+ host->rrdlabels = u->host_define.rrdlabels;
+ u->host_define.rrdlabels = NULL;
+ }
+
+ pluginsd_host_define_cleanup(user);
+
+ u->host = host;
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ rrdcontext_host_child_connected(host);
+ schedule_node_info_update(host);
+
+ return PARSER_RC_OK;
+}
+
+static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ char *guid = get_word(words, num_words, 1);
+
+ if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
+ u->host = localhost;
+ return PARSER_RC_OK;
+ }
+
+ uuid_t uuid;
+ char uuid_str[UUID_STR_LEN];
+ if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
+
+ RRDHOST *host = rrdhost_find_by_guid(uuid_str);
+ if(unlikely(!host))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
+
+ u->host = host;
+
+ return PARSER_RC_OK;
+}
+
PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
{
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
char *type = get_word(words, num_words, 1);
char *name = get_word(words, num_words, 2);
@@ -231,19 +464,14 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
}
// make sure we have the required variables
- if (unlikely((!type || !*type || !id || !*id))) {
- error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.",
- rrdhost_hostname(host));
-
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
- return PARSER_RC_ERROR;
- }
+ if (unlikely((!type || !*type || !id || !*id)))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters");
// parse the name, and make sure it does not include 'type.'
if (unlikely(name && *name)) {
// when data are streamed from child nodes
// name will be type.name
- // so we have to remove 'type.' from name too
+ // so, we have to remove 'type.' from name too
size_t len = strlen(type);
if (strncmp(type, name, len) == 0 && name[len] == '.')
name = &name[len + 1];
@@ -320,7 +548,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
}
- ((PARSER_USER_OBJECT *)user)->st = st;
+ pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART);
return PARSER_RC_OK;
}
@@ -332,10 +560,10 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
const char *wall_clock_time_txt = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
@@ -379,33 +607,24 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
char *options = get_word(words, num_words, 6);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- if (unlikely(!id)) {
- error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.",
- rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET");
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
-
- if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) {
- error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.",
- rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (unlikely(!id))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
long multiplier = 1;
if (multiplier_s && *multiplier_s) {
- multiplier = strtol(multiplier_s, NULL, 0);
+ multiplier = str2ll_encoded(multiplier_s);
if (unlikely(!multiplier))
multiplier = 1;
}
long divisor = 1;
if (likely(divisor_s && *divisor_s)) {
- divisor = strtol(divisor_s, NULL, 0);
+ divisor = str2ll_encoded(divisor_s);
if (unlikely(!divisor))
divisor = 1;
}
@@ -683,7 +902,7 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u
}
else {
if(format && *format)
- pf->destination_wb->contenttype = functions_format_to_content_type(format);
+ pf->destination_wb->content_type = functions_format_to_content_type(format);
pf->code = code;
@@ -712,9 +931,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
NETDATA_DOUBLE v;
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+ RRDSET *st = pluginsd_get_chart_from_parent(user);
int global = (st) ? 0 : 1;
@@ -730,13 +949,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
}
}
- if (unlikely(!name || !*name)) {
- error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.",
- rrdhost_hostname(host), st ? rrdset_id(st):"UNSET");
-
- ((PARSER_USER_OBJECT *)user)->enabled = 0;
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (unlikely(!name || !*name))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
if (unlikely(!value || !*value))
value = NULL;
@@ -750,17 +964,11 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
- if (!global && !st) {
- error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- name
- );
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (!global && !st)
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
char *endptr = NULL;
- v = (NETDATA_DOUBLE)str2ndd(value, &endptr);
+ v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
if (unlikely(endptr && *endptr)) {
if (endptr == value)
error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
@@ -803,8 +1011,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
- debug(D_PLUGINSD, "requested a FLUSH");
- ((PARSER_USER_OBJECT *) user)->st = NULL;
+ debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH);
((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
@@ -816,7 +1024,7 @@ PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe
{
info("PLUGINSD: plugin called DISABLE. Disabling it.");
((PARSER_USER_OBJECT *) user)->enabled = 0;
- return PARSER_RC_ERROR;
+ return PARSER_RC_STOP;
}
PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
@@ -825,10 +1033,8 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
const char *label_source = get_word(words, num_words, 2);
const char *value = get_word(words, num_words, 3);
- if (!name || !label_source || !value) {
- error("PLUGINSD: ignoring malformed or empty LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (!name || !label_source || !value)
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters");
char *store = (char *)value;
bool allocated_store = false;
@@ -874,7 +1080,7 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
debug(D_PLUGINSD, "requested to OVERWRITE host labels");
@@ -898,11 +1104,12 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
if (!name || !value || !*label_source) {
error("Ignoring malformed or empty CHART LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(user);
+ return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
}
if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
- ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels;
+ RRDSET *st = pluginsd_get_chart_from_parent(user);
+ ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels;
rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
}
@@ -915,17 +1122,17 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
debug(D_PLUGINSD, "requested to commit chart labels");
if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(user);
+ return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
}
rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
@@ -937,15 +1144,14 @@ PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words _
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user)
-{
+PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
char *id = get_word(words, num_words, 1);
char *start_time_str = get_word(words, num_words, 2);
char *end_time_str = get_word(words, num_words, 3);
char *child_now_str = get_word(words, num_words, 4);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st;
if (likely(!id || !*id))
@@ -953,17 +1159,17 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
else
st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
- ((PARSER_USER_OBJECT *) user)->st = st;
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(start_time_str && end_time_str) {
- time_t start_time = (time_t)str2ul(start_time_str);
- time_t end_time = (time_t)str2ul(end_time_str);
+ time_t start_time = (time_t) str2ull_encoded(start_time_str);
+ time_t end_time = (time_t) str2ull_encoded(end_time_str);
time_t wall_clock_time = 0, tolerance;
bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
if(child_now_str) {
- wall_clock_time = (time_t)str2ul(child_now_str);
+ wall_clock_time = (time_t) str2ull_encoded(child_now_str);
tolerance = st->update_every + 1;
wall_clock_comes_from_child = true;
}
@@ -1016,7 +1222,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
return PARSER_RC_OK;
}
- error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
+ " from %ld to %ld, but timestamps are invalid "
+ "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
}
@@ -1033,6 +1241,33 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
return PARSER_RC_OK;
}
+static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
+ SN_FLAGS flags = SN_FLAG_NONE;
+
+ char c;
+ while ((c = *flags_str++)) {
+ switch (c) {
+ case 'A':
+ flags |= SN_FLAG_NOT_ANOMALOUS;
+ break;
+
+ case 'R':
+ flags |= SN_FLAG_RESET;
+ break;
+
+ case 'E':
+ flags = SN_EMPTY_SLOT;
+ return flags;
+
+ default:
+ internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
+ break;
+ }
+ }
+
+ return flags;
+}
+
PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
{
char *dimension = get_word(words, num_words, 1);
@@ -1040,31 +1275,34 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
char *flags_str = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) {
+ PARSER_USER_OBJECT *u = user;
+ if(!u->replay.rset_enabled) {
error_limit_static_thread_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors",
- rrdhost_hostname(host), rrdset_id(st));
+ error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
+ rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
// we have to return OK here
return PARSER_RC_OK;
}
- RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ if (unlikely(!u->replay.start_time || !u->replay.end_time)) {
+ error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
rrdhost_hostname(host),
rrdset_id(st),
dimension,
- ((PARSER_USER_OBJECT *) user)->replay.start_time,
- ((PARSER_USER_OBJECT *) user)->replay.end_time);
- return PLUGINSD_DISABLE_PLUGIN(user);
+ PLUGINSD_KEYWORD_REPLAY_SET,
+ u->replay.start_time,
+ u->replay.end_time,
+ PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
}
if (unlikely(!value_str || !*value_str))
@@ -1074,39 +1312,19 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
flags_str = "";
if (likely(value_str)) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
-
RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
- NETDATA_DOUBLE value = strtondd(value_str, NULL);
- SN_FLAGS flags = SN_FLAG_NONE;
-
- char c;
- while ((c = *flags_str++)) {
- switch (c) {
- case 'R':
- flags |= SN_FLAG_RESET;
- break;
-
- case 'E':
- flags |= SN_EMPTY_SLOT;
- value = NAN;
- break;
-
- default:
- error("unknown flag '%c'", c);
- break;
- }
- }
+ NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
+ SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
- if (!netdata_double_isnumber(value)) {
+ if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
value = NAN;
flags = SN_EMPTY_SLOT;
}
- rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags);
- rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+ rrddim_store_metric(rd, u->replay.end_time_ut, value, flags);
+ rd->last_collected_time.tv_sec = u->replay.end_time;
rd->last_collected_time.tv_usec = 0;
rd->collections_counter++;
}
@@ -1117,7 +1335,6 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
}
}
- rrddim_acquired_release(rda);
return PARSER_RC_OK;
}
@@ -1133,26 +1350,25 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
char *last_stored_value_str = get_word(words, num_words, 5);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
- usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
if(last_collected_ut > dim_last_collected_ut) {
- rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
- rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+ rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
- rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
- rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
- rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
- rrddim_acquired_release(rda);
+ rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
+ rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
+ rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
+
return PARSER_RC_OK;
}
@@ -1165,23 +1381,23 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
char *last_updated_ut_str = get_word(words, num_words, 2);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
- usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
if(last_collected_ut > chart_last_collected_ut) {
- st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
- st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+ st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
- usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0;
+ usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
if(last_updated_ut > chart_last_updated_ut) {
- st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC;
- st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC;
+ st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
+ st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
}
st->counter++;
@@ -1205,24 +1421,25 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
const char *last_entry_requested_txt = get_word(words, num_words, 6);
const char *child_world_time_txt = get_word(words, num_words, 7); // optional
- time_t update_every_child = (time_t)str2ul(update_every_child_txt);
- time_t first_entry_child = (time_t)str2ul(first_entry_child_txt);
- time_t last_entry_child = (time_t)str2ul(last_entry_child_txt);
+ time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
+ time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
+ time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
- time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt);
- time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt);
+ time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
+ time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
// the optional child world time
- time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec();
+ time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
+ child_world_time_txt) : now_realtime_sec();
PARSER_USER_OBJECT *user_object = user;
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
@@ -1235,8 +1452,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
);
#endif
- ((PARSER_USER_OBJECT *) user)->st = NULL;
- ((PARSER_USER_OBJECT *) user)->count++;
+ ((PARSER_USER_OBJECT *) user)->data_collections_count++;
if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
time_t now = now_realtime_sec();
@@ -1282,11 +1498,16 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
rrdhost_hostname(host), rrdset_id(st));
#endif
+
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+
worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
return PARSER_RC_OK;
}
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+
rrdcontext_updated_retention_rrdset(st);
bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
@@ -1295,8 +1516,319 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
+PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
+ timing_init();
+
+ char *id = get_word(words, num_words, 1);
+ char *update_every_str = get_word(words, num_words, 2);
+ char *end_time_str = get_word(words, num_words, 3);
+ char *wall_clock_time_str = get_word(words, num_words, 4);
+
+ if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+
+ timing_step(TIMING_STEP_BEGIN2_PREPARE);
+
+ RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+
+ pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN_V2);
+
+ if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED)))
+ rrdset_isnot_obsolete(st);
+
+ timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
+
+ // ------------------------------------------------------------------------
+ // parse the parameters
+
+ time_t update_every = (time_t) str2ull_encoded(update_every_str);
+ time_t end_time = (time_t) str2ull_encoded(end_time_str);
+
+ time_t wall_clock_time;
+ if(likely(*wall_clock_time_str == '#'))
+ wall_clock_time = end_time;
+ else
+ wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
+
+ if (unlikely(update_every != st->update_every))
+ rrdset_set_update_every_s(st, update_every);
+
+ timing_step(TIMING_STEP_BEGIN2_PARSE);
+
+ // ------------------------------------------------------------------------
+ // prepare our state
+
+ pluginsd_lock_rrdset_data_collection(user);
+
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ u->v2.update_every = update_every;
+ u->v2.end_time = end_time;
+ u->v2.wall_clock_time = wall_clock_time;
+ u->v2.ml_locked = ml_chart_update_begin(st);
+
+ timing_step(TIMING_STEP_BEGIN2_ML);
+
+ // ------------------------------------------------------------------------
+ // propagate it forward in v2
+
+ if(!u->v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
+ u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time);
+
+ if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) {
+ // check if receiver and sender have the same number parsing capabilities
+ bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+
+ BUFFER *wb = u->v2.stream_buffer.wb;
+
+ buffer_need_bytes(wb, 1024);
+
+ if(unlikely(u->v2.stream_buffer.begin_v2_added))
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "' ", 2);
+
+ if(can_copy)
+ buffer_strcat(wb, update_every_str);
+ else
+ buffer_print_uint64_encoded(wb, encoding, update_every);
+
+ buffer_fast_strcat(wb, " ", 1);
+
+ if(can_copy)
+ buffer_strcat(wb, end_time_str);
+ else
+ buffer_print_uint64_encoded(wb, encoding, end_time);
+
+ buffer_fast_strcat(wb, " ", 1);
+
+ if(can_copy)
+ buffer_strcat(wb, wall_clock_time_str);
+ else
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+
+ buffer_fast_strcat(wb, "\n", 1);
+
+ u->v2.stream_buffer.last_point_end_time_s = end_time;
+ u->v2.stream_buffer.begin_v2_added = true;
+ }
+
+ timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
+
+ // ------------------------------------------------------------------------
+ // store it
+
+ st->last_collected_time.tv_sec = end_time;
+ st->last_collected_time.tv_usec = 0;
+ st->last_updated.tv_sec = end_time;
+ st->last_updated.tv_usec = 0;
+ st->counter++;
+ st->counter_done++;
+
+ // these are only needed for db mode RAM, SAVE, MAP, ALLOC
+ st->current_entry++;
+ if(st->current_entry >= st->entries)
+ st->current_entry -= st->entries;
+
+ timing_step(TIMING_STEP_BEGIN2_STORE);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
+ timing_init();
+
+ char *dimension = get_word(words, num_words, 1);
+ char *collected_str = get_word(words, num_words, 2);
+ char *value_str = get_word(words, num_words, 3);
+ char *flags_str = get_word(words, num_words, 4);
+
+ if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
+
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+
+ timing_step(TIMING_STEP_SET2_PREPARE);
+
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
+ if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+
+ if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
+ rrddim_isnot_obsolete(st, rd);
+
+ timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
+
+ // ------------------------------------------------------------------------
+ // parse the parameters
+
+ collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
+
+ NETDATA_DOUBLE value;
+ if(*value_str == '#')
+ value = (NETDATA_DOUBLE)collected_value;
+ else
+ value = str2ndd_encoded(value_str, NULL);
+
+ SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
+
+ timing_step(TIMING_STEP_SET2_PARSE);
+
+ // ------------------------------------------------------------------------
+ // check value and ML
+
+ if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
+ value = NAN;
+ flags = SN_EMPTY_SLOT;
+
+ if(u->v2.ml_locked)
+ ml_dimension_is_anomalous(rd, u->v2.end_time, 0, false);
+ }
+ else if(u->v2.ml_locked) {
+ if (ml_dimension_is_anomalous(rd, u->v2.end_time, value, true)) {
+ // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
+ flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
+ }
+ else
+ flags |= SN_FLAG_NOT_ANOMALOUS;
+ }
+
+ timing_step(TIMING_STEP_SET2_ML);
+
+ // ------------------------------------------------------------------------
+ // propagate it forward in v2
+
+ if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) {
+ // check if receiver and sender have the same number parsing capabilities
+ bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+
+ BUFFER *wb = u->v2.stream_buffer.wb;
+ buffer_need_bytes(wb, 1024);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ if(can_copy)
+ buffer_strcat(wb, collected_str);
+ else
+ buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
+ buffer_fast_strcat(wb, " ", 1);
+ if(can_copy)
+ buffer_strcat(wb, value_str);
+ else
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, flags, true);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
+
+ timing_step(TIMING_STEP_SET2_PROPAGATE);
+
+ // ------------------------------------------------------------------------
+ // store it
+
+ rrddim_store_metric(rd, u->v2.end_time * USEC_PER_SEC, value, flags);
+ rd->last_collected_time.tv_sec = u->v2.end_time;
+ rd->last_collected_time.tv_usec = 0;
+ rd->last_collected_value = collected_value;
+ rd->last_stored_value = value;
+ rd->last_calculated_value = value;
+ rd->collections_counter++;
+ rd->updated = true;
+
+ timing_step(TIMING_STEP_SET2_STORE);
+
+ return PARSER_RC_OK;
+}
+
+void pluginsd_cleanup_v2(void *user) {
+ // this is called when the thread is stopped while processing
+ pluginsd_set_chart_from_parent(user, NULL, "THREAD CLEANUP");
+}
+
+PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
+ timing_init();
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ u->data_collections_count++;
+
+ timing_step(TIMING_STEP_END2_PREPARE);
+
+ // ------------------------------------------------------------------------
+ // propagate the whole chart update in v1
+
+ if(unlikely(!u->v2.stream_buffer.v2 && !u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb))
+ rrdset_push_metrics_v1(&u->v2.stream_buffer, st);
+
+ timing_step(TIMING_STEP_END2_PUSH_V1);
+
+ // ------------------------------------------------------------------------
+ // unblock data collection
+
+ ml_chart_update_end(st);
+ u->v2.ml_locked = false;
+
+ timing_step(TIMING_STEP_END2_ML);
+
+ pluginsd_unlock_rrdset_data_collection(user);
+ rrdcontext_collected_rrdset(st);
+ store_metric_collection_completed();
+
+ timing_step(TIMING_STEP_END2_RRDSET);
+
+ // ------------------------------------------------------------------------
+ // propagate it forward
+
+ rrdset_push_metrics_finished(&u->v2.stream_buffer, st);
+
+ timing_step(TIMING_STEP_END2_PROPAGATE);
+
+ // ------------------------------------------------------------------------
+ // cleanup RRDSET / RRDDIM
+
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ rd->calculated_value = 0;
+ rd->collected_value = 0;
+ rd->updated = false;
+ }
+ rrddim_foreach_done(rd);
+
+ // ------------------------------------------------------------------------
+ // reset state
+
+ u->v2 = (struct parser_user_object_v2){ 0 };
+
+ timing_step(TIMING_STEP_END2_STORE);
+ timing_report();
+
+ return PARSER_RC_OK;
+}
+
static void pluginsd_process_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
+
+ pluginsd_cleanup_v2(parser->user);
+ pluginsd_host_define_cleanup(parser->user);
+
rrd_collector_finished();
parser_destroy(parser);
}
@@ -1335,7 +1867,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
};
// fp_plugin_output = our input; fp_plugin_input = our output
- PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
+ PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1,
+ PARSER_INPUT_SPLIT, NULL);
+
+ pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
rrd_collector_started();
@@ -1344,9 +1879,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
user.parser = parser;
+ char buffer[PLUGINSD_LINE_MAX + 1];
- while (likely(!parser_next(parser))) {
- if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL)))
+ while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) {
+ if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer)))
break;
}
@@ -1354,7 +1890,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
netdata_thread_cleanup_pop(1);
cd->unsafe.enabled = user.enabled;
- size_t count = user.count;
+ size_t count = user.data_collections_count;
if (likely(count)) {
cd->successful_collections += count;
@@ -1365,3 +1901,141 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
return count;
}
+
+PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
+{
+ info("PLUGINSD: plugin called EXIT.");
+ return PARSER_RC_STOP;
+}
+
+static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) {
+
+ if (types & PARSER_INIT_PLUGINSD) {
+ add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
+ add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
+
+ add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define);
+ add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end);
+ add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels);
+ add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host);
+
+ add_func(parser, PLUGINSD_KEYWORD_EXIT, pluginsd_exit);
+ }
+
+ if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) {
+ // plugins.d plugins and streaming
+ add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
+ add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
+ add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
+ add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
+ add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
+ add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit);
+ add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel);
+ add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function);
+ add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin);
+
+ add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
+ add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set);
+ add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
+
+ inflight_functions_init(parser);
+ }
+
+ if (types & PARSER_INIT_STREAMING) {
+ add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end);
+
+ // replication
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end);
+
+ // streaming metrics v2
+ add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2);
+ add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2);
+ add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2);
+ }
+}
+
+void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) {
+ pluginsd_keywords_init_internal(parser, types, parser_add_keyword);
+}
+
+struct pluginsd_user_unittest {
+ size_t size;
+ const char **hashtable;
+ uint32_t (*hash)(const char *s);
+ size_t collisions;
+};
+
+void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) {
+ struct pluginsd_user_unittest *u = parser->user;
+
+ uint32_t hash = u->hash(keyword);
+ uint32_t slot = hash % u->size;
+
+ if(u->hashtable[slot])
+ u->collisions++;
+
+ u->hashtable[slot] = keyword;
+}
+
+static struct {
+ const char *name;
+ uint32_t (*hash)(const char *s);
+ size_t slots_needed;
+} hashers[] = {
+ { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, },
+ { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, },
+ { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, },
+ { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, },
+ { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, },
+
+ // terminator
+ { .name = NULL, NULL, .slots_needed = 0, },
+};
+
+int pluginsd_parser_unittest(void) {
+ PARSER *p;
+ size_t slots_to_check = 1000;
+ size_t i, h;
+
+ // check for hashtable collisions
+ for(h = 0; hashers[h].name ;h++) {
+ hashers[h].slots_needed = slots_to_check * 1000000;
+
+ for (i = 10; i < slots_to_check; i++) {
+ struct pluginsd_user_unittest user = {
+ .hash = hashers[h].hash,
+ .size = i,
+ .hashtable = callocz(i, sizeof(const char *)),
+ .collisions = 0,
+ };
+
+ p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
+ pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING,
+ pluginsd_keyword_collision_check);
+ parser_destroy(p);
+
+ freez(user.hashtable);
+
+ if (!user.collisions) {
+ hashers[h].slots_needed = i;
+ break;
+ }
+ }
+ }
+
+ for(h = 0; hashers[h].name ;h++) {
+ if(hashers[h].slots_needed > 1000)
+ info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check);
+ else
+ info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed);
+ }
+
+ p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
+ pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
+ parser_destroy(p);
+ return 0;
+}
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index e18b43e58..1fdc23a0e 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -3,7 +3,12 @@
#ifndef NETDATA_PLUGINSD_PARSER_H
#define NETDATA_PLUGINSD_PARSER_H
-#include "parser/parser.h"
+#include "daemon/common.h"
+
+typedef enum __attribute__ ((__packed__)) {
+ PARSER_INIT_PLUGINSD = (1 << 1),
+ PARSER_INIT_STREAMING = (1 << 2),
+} PLUGINSD_KEYWORDS;
typedef struct parser_user_object {
PARSER *parser;
@@ -14,13 +19,20 @@ typedef struct parser_user_object {
int trust_durations;
DICTIONARY *new_host_labels;
DICTIONARY *chart_rrdlabels_linked_temporarily;
- size_t count;
+ size_t data_collections_count;
int enabled;
- uint8_t st_exists;
- uint8_t host_exists;
- void *private; // the user can set this for private use
+
+ STREAM_CAPABILITIES capabilities; // receiver capabilities
struct {
+ bool parsing_host;
+ uuid_t machine_guid;
+ char machine_guid_str[UUID_STR_LEN];
+ STRING *hostname;
+ DICTIONARY *rrdlabels;
+ } host_define;
+
+ struct parser_user_object_replay {
time_t start_time;
time_t end_time;
@@ -31,9 +43,20 @@ typedef struct parser_user_object {
bool rset_enabled;
} replay;
+
+ struct parser_user_object_v2 {
+ bool locked_data_collection;
+ RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this
+ time_t update_every;
+ time_t end_time;
+ time_t wall_clock_time;
+ bool ml_locked;
+ } v2;
} PARSER_USER_OBJECT;
PARSER_RC pluginsd_function(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user);
void inflight_functions_init(PARSER *parser);
+void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types);
+
#endif //NETDATA_PLUGINSD_PARSER_H