summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-10-17 09:30:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-10-17 09:30:20 +0000
commit386ccdd61e8256c8b21ee27ee2fc12438fc5ca98 (patch)
treec9fbcacdb01f029f46133a5ba7ecd610c2bcb041 /streaming/rrdpush.c
parentAdding upstream version 1.42.4. (diff)
downloadnetdata-511b962f01d7a1ee9c27f448bbba636664b441d1.tar.xz
netdata-511b962f01d7a1ee9c27f448bbba636664b441d1.zip
Adding upstream version 1.43.0.upstream/1.43.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c126
1 files changed, 125 insertions, 1 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 67c43e411..e8c46a021 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -96,6 +96,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
STREAM_HAS_COMPRESSION |
+#ifdef NETDATA_TEST_DYNCFG
+ STREAM_CAP_DYNCFG |
+#endif
(ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
(ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
0;
@@ -465,6 +468,46 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
+// TODO enable this macro before release
+#define bail_if_no_cap(cap) \
+ if(unlikely(!stream_has_capability(host->sender, cap))) { \
+ return; \
+ }
+
+#define dyncfg_check_can_push(host) \
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \
+ return; \
+ bail_if_no_cap(STREAM_CAP_DYNCFG)
+
+// assumes job is locked and acquired!!!
+void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
+ if (job->reason)
+ buffer_sprintf(wb, " \"%s\"", job->reason);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+
+ job->dirty = 0;
+}
+
+void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
@@ -489,6 +532,12 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
+ if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrd_functions_expose_global_rrdpush(host, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ }
+
RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
@@ -553,6 +602,78 @@ void rrdpush_send_global_functions(RRDHOST *host) {
sender_thread_buffer_free();
}
+void rrdpush_send_dyncfg(RRDHOST *host) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ DICTIONARY *plugins_dict = host->configurable_plugins;
+
+ struct configurable_plugin *plug;
+ dfe_start_read(plugins_dict, plug) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name);
+ struct module *mod;
+ dfe_start_read(plug->modules, mod) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type));
+ struct job *job;
+ dfe_start_read(mod->jobs, job) {
+ pthread_mutex_lock(&job->lock);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state);
+ if (job->reason)
+ buffer_sprintf(wb, " \"%s\"", job->reason);
+ buffer_sprintf(wb, "\n");
+ job->dirty = 0;
+ pthread_mutex_unlock(&job->lock);
+ } dfe_done(job);
+ } dfe_done(mod);
+ }
+ dfe_done(plug);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -588,7 +709,7 @@ int connect_to_one_of_destinations(
if(d->postpone_reconnection_until > now)
continue;
- netdata_log_info(
+ internal_error(true,
"STREAM %s: connecting to '%s' (default port: %d)...",
rrdhost_hostname(host),
string2str(d->destination),
@@ -1166,6 +1287,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
// another receiver is already connected
// try again later
+#ifdef NETDATA_INTERNAL_CHECKS
char msg[200 + 1];
snprintfz(msg, 200,
"multiple connections for same host, "
@@ -1176,6 +1298,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt,
msg,
"ALREADY CONNECTED");
+#endif
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
@@ -1280,6 +1403,7 @@ static struct {
{ STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{ STREAM_CAP_IEEE754, "IEEE754" },
{ STREAM_CAP_DATA_WITH_ML, "ML" },
+ { STREAM_CAP_DYNCFG, "DYN_CFG" },
{ 0 , NULL },
};