diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/streaming/rrdpush.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/streaming/rrdpush.c (renamed from streaming/rrdpush.c) | 151 |
1 files changed, 10 insertions, 141 deletions
diff --git a/streaming/rrdpush.c b/src/streaming/rrdpush.c index 7c1df2cad..874d4eb2f 100644 --- a/streaming/rrdpush.c +++ b/src/streaming/rrdpush.c @@ -208,14 +208,14 @@ int configured_as_parent() { // chart labels static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { BUFFER *wb = (BUFFER *)data; - buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL)); + buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL " \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL)); return 1; } static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) { if (st->rrdlabels) { if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0) - buffer_sprintf(wb, "CLABEL_COMMIT\n"); + buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL_COMMIT "\n"); } } @@ -302,10 +302,10 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the chart functions if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) - rrd_functions_expose_rrdpush(st, wb); + rrd_chart_functions_expose_rrdpush(st, wb); // send the chart local custom variables - rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); + rrdvar_print_to_streaming_custom_chart_variables(st, wb); if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) { time_t db_first_time_t, db_last_time_t; @@ -380,7 +380,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta rrddim_foreach_done(rd); if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) - rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); + rrdvar_print_to_streaming_custom_chart_variables(st, wb); buffer_fast_strcat(wb, "END\n", 4); } @@ -475,7 +475,7 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { if(rsb->v2 && rsb->begin_v2_added) { if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) - rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb); + rrdvar_print_to_streaming_custom_chart_variables(st, rsb->wb); buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); } @@ -485,49 +485,6 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } -// TODO enable this macro before release -#define bail_if_no_cap(cap) \ - if(unlikely(!stream_has_capability(host->sender, cap))) { \ - return; \ - } - -#define dyncfg_check_can_push(host) \ - if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \ - return; \ - bail_if_no_cap(STREAM_CAP_DYNCFG) - -// assumes job is locked and acquired!!! -void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) { - dyncfg_check_can_push(host); - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state); - - if (job->reason && strlen(job->reason)) - buffer_sprintf(wb, " \"%s\"", job->reason); - - buffer_strcat(wb, "\n"); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); - - job->dirty = 0; -} - -void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) { - dyncfg_check_can_push(host); - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { RRDHOST *host = st->rrdhost; @@ -554,7 +511,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { BUFFER *wb = sender_start(host->sender); - rrd_functions_expose_global_rrdpush(host, wb); + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); } @@ -614,98 +571,13 @@ void rrdpush_send_global_functions(RRDHOST *host) { BUFFER *wb = sender_start(host->sender); - rrd_functions_expose_global_rrdpush(host, wb); + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); } -void rrdpush_send_dyncfg(RRDHOST *host) { - dyncfg_check_can_push(host); - - BUFFER *wb = sender_start(host->sender); - - DICTIONARY *plugins_dict = host->configurable_plugins; - - struct configurable_plugin *plug; - dfe_start_read(plugins_dict, plug) { - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name); - struct module *mod; - dfe_start_read(plug->modules, mod) { - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type)); - struct job *job; - dfe_start_read(mod->jobs, job) { - pthread_mutex_lock(&job->lock); - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state); - if (job->reason) - buffer_sprintf(wb, " \"%s\"", job->reason); - buffer_sprintf(wb, "\n"); - job->dirty = 0; - pthread_mutex_unlock(&job->lock); - } dfe_done(job); - } dfe_done(mod); - } - dfe_done(plug); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) -{ - dyncfg_check_can_push(host); - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) -{ - dyncfg_check_can_push(host); - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) -{ - dyncfg_check_can_push(host); - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name) -{ - dyncfg_check_can_push(host); - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); - - sender_thread_buffer_free(); -} - void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -1003,9 +875,6 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri else if(!strcmp(name, "mc_version")) rpt->system_info->mc_version = strtoul(value, NULL, 0); - else if(!strcmp(name, "tags") && !rpt->tags) - rpt->tags = strdupz(value); - else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID)) rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false); @@ -1423,6 +1292,7 @@ static struct { {STREAM_CAP_ZSTD, "ZSTD" }, {STREAM_CAP_GZIP, "GZIP" }, {STREAM_CAP_BROTLI, "BROTLI" }, + {STREAM_CAP_PROGRESS, "PROGRESS" }, {0 , NULL }, }; @@ -1499,10 +1369,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { STREAM_CAP_BINARY | STREAM_CAP_INTERPOLATED | STREAM_CAP_SLOTS | + STREAM_CAP_PROGRESS | STREAM_CAP_COMPRESSIONS_AVAILABLE | - #ifdef NETDATA_TEST_DYNCFG STREAM_CAP_DYNCFG | - #endif STREAM_CAP_IEEE754 | STREAM_CAP_DATA_WITH_ML | 0) & ~disabled_capabilities; |