diff options
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 126 |
1 files changed, 125 insertions, 1 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 67c43e41..e8c46a02 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -96,6 +96,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { STREAM_CAP_BINARY | STREAM_CAP_INTERPOLATED | STREAM_HAS_COMPRESSION | +#ifdef NETDATA_TEST_DYNCFG + STREAM_CAP_DYNCFG | +#endif (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | 0; @@ -465,6 +468,46 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } +// TODO enable this macro before release +#define bail_if_no_cap(cap) \ + if(unlikely(!stream_has_capability(host->sender, cap))) { \ + return; \ + } + +#define dyncfg_check_can_push(host) \ + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \ + return; \ + bail_if_no_cap(STREAM_CAP_DYNCFG) + +// assumes job is locked and acquired!!! +void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state); + if (job->reason) + buffer_sprintf(wb, " \"%s\"", job->reason); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); + + job->dirty = 0; +} + +void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { RRDHOST *host = st->rrdhost; @@ -489,6 +532,12 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } + if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { + BUFFER *wb = sender_start(host->sender); + rrd_functions_expose_global_rrdpush(host, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + } + RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED); bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED); @@ -553,6 +602,78 @@ void rrdpush_send_global_functions(RRDHOST *host) { sender_thread_buffer_free(); } +void rrdpush_send_dyncfg(RRDHOST *host) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + DICTIONARY *plugins_dict = host->configurable_plugins; + + struct configurable_plugin *plug; + dfe_start_read(plugins_dict, plug) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name); + struct module *mod; + dfe_start_read(plug->modules, mod) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type)); + struct job *job; + dfe_start_read(mod->jobs, job) { + pthread_mutex_lock(&job->lock); + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags); + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state); + if (job->reason) + buffer_sprintf(wb, " \"%s\"", job->reason); + buffer_sprintf(wb, "\n"); + job->dirty = 0; + pthread_mutex_unlock(&job->lock); + } dfe_done(job); + } dfe_done(mod); + } + dfe_done(plug); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -588,7 +709,7 @@ int connect_to_one_of_destinations( if(d->postpone_reconnection_until > now) continue; - netdata_log_info( + internal_error(true, "STREAM %s: connecting to '%s' (default port: %d)...", rrdhost_hostname(host), string2str(d->destination), @@ -1166,6 +1287,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri // another receiver is already connected // try again later +#ifdef NETDATA_INTERNAL_CHECKS char msg[200 + 1]; snprintfz(msg, 200, "multiple connections for same host, " @@ -1176,6 +1298,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt, msg, "ALREADY CONNECTED"); +#endif // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up buffer_flush(w->response.data); @@ -1280,6 +1403,7 @@ static struct { { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, { STREAM_CAP_IEEE754, "IEEE754" }, { STREAM_CAP_DATA_WITH_ML, "ML" }, + { STREAM_CAP_DYNCFG, "DYN_CFG" }, { 0 , NULL }, }; |