summaryrefslogtreecommitdiffstats
path: root/src/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/rrdpush.c (renamed from streaming/rrdpush.c)169
1 files changed, 22 insertions, 147 deletions
diff --git a/streaming/rrdpush.c b/src/streaming/rrdpush.c
index 7c1df2cad..1ce8e4ea8 100644
--- a/streaming/rrdpush.c
+++ b/src/streaming/rrdpush.c
@@ -192,7 +192,7 @@ int configured_as_parent() {
appconfig_wrlock(&stream_config);
for (section = stream_config.first_section; section; section = section->next) {
- uuid_t uuid;
+ nd_uuid_t uuid;
if (uuid_parse(section->name, uuid) != -1 &&
appconfig_get_boolean_by_section(section, "enabled", 0)) {
@@ -208,14 +208,14 @@ int configured_as_parent() {
// chart labels
static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
BUFFER *wb = (BUFFER *)data;
- buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL));
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL " \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL));
return 1;
}
static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
if (st->rrdlabels) {
if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0)
- buffer_sprintf(wb, "CLABEL_COMMIT\n");
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CLABEL_COMMIT "\n");
}
}
@@ -302,10 +302,10 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the chart functions
if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
- rrd_functions_expose_rrdpush(st, wb);
+ rrd_chart_functions_expose_rrdpush(st, wb);
// send the chart local custom variables
- rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+ rrdvar_print_to_streaming_custom_chart_variables(st, wb);
if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
time_t db_first_time_t, db_last_time_t;
@@ -380,7 +380,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
rrddim_foreach_done(rd);
if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
- rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+ rrdvar_print_to_streaming_custom_chart_variables(st, wb);
buffer_fast_strcat(wb, "END\n", 4);
}
@@ -475,7 +475,7 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
if(rsb->v2 && rsb->begin_v2_added) {
if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
- rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
+ rrdvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
}
@@ -485,49 +485,6 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
-// TODO enable this macro before release
-#define bail_if_no_cap(cap) \
- if(unlikely(!stream_has_capability(host->sender, cap))) { \
- return; \
- }
-
-#define dyncfg_check_can_push(host) \
- if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \
- return; \
- bail_if_no_cap(STREAM_CAP_DYNCFG)
-
-// assumes job is locked and acquired!!!
-void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) {
- dyncfg_check_can_push(host);
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
-
- if (job->reason && strlen(job->reason))
- buffer_sprintf(wb, " \"%s\"", job->reason);
-
- buffer_strcat(wb, "\n");
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-
- job->dirty = 0;
-}
-
-void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) {
- dyncfg_check_can_push(host);
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
@@ -554,7 +511,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
BUFFER *wb = sender_start(host->sender);
- rrd_functions_expose_global_rrdpush(host, wb);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
}
@@ -614,98 +571,13 @@ void rrdpush_send_global_functions(RRDHOST *host) {
BUFFER *wb = sender_start(host->sender);
- rrd_functions_expose_global_rrdpush(host, wb);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
}
-void rrdpush_send_dyncfg(RRDHOST *host) {
- dyncfg_check_can_push(host);
-
- BUFFER *wb = sender_start(host->sender);
-
- DICTIONARY *plugins_dict = host->configurable_plugins;
-
- struct configurable_plugin *plug;
- dfe_start_read(plugins_dict, plug) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name);
- struct module *mod;
- dfe_start_read(plug->modules, mod) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type));
- struct job *job;
- dfe_start_read(mod->jobs, job) {
- pthread_mutex_lock(&job->lock);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state);
- if (job->reason)
- buffer_sprintf(wb, " \"%s\"", job->reason);
- buffer_sprintf(wb, "\n");
- job->dirty = 0;
- pthread_mutex_unlock(&job->lock);
- } dfe_done(job);
- } dfe_done(mod);
- }
- dfe_done(plug);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name)
-{
- dyncfg_check_can_push(host);
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type)
-{
- dyncfg_check_can_push(host);
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags)
-{
- dyncfg_check_can_push(host);
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name)
-{
- dyncfg_check_can_push(host);
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
-
- sender_thread_buffer_free();
-}
-
void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -738,6 +610,9 @@ int connect_to_one_of_destinations(
for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) {
time_t now = now_realtime_sec();
+ if(nd_thread_signaled_to_cancel())
+ return -1;
+
if(d->postpone_reconnection_until > now)
continue;
@@ -846,7 +721,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai
host->sender->exit.reason = reason;
// signal it to cancel
- netdata_thread_cancel(host->rrdpush_sender_thread);
+ nd_thread_signal_cancel(host->rrdpush_sender_thread);
}
sender_unlock(host->sender);
@@ -872,7 +747,9 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
- if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
+ host->rrdpush_sender_thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_DEFAULT,
+ rrdpush_sender_thread, (void *)host->sender);
+ if(!host->rrdpush_sender_thread)
nd_log_daemon(NDLP_ERR, "STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
@@ -923,7 +800,7 @@ static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struc
}
void *rrdpush_receiver_thread(void *ptr);
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx) {
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx __maybe_unused) {
if(!service_running(ABILITY_STREAMING_CONNECTIONS))
return rrdpush_receiver_too_busy_now(w);
@@ -1003,9 +880,6 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
else if(!strcmp(name, "mc_version"))
rpt->system_info->mc_version = strtoul(value, NULL, 0);
- else if(!strcmp(name, "tags") && !rpt->tags)
- rpt->tags = strdupz(value);
-
else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false);
@@ -1286,7 +1160,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
netdata_mutex_unlock(&host->receiver_lock);
}
- rrd_unlock();
+ rrd_rdunlock();
if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) {
// we stopped the receiver
@@ -1328,7 +1202,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname);
tag[NETDATA_THREAD_TAG_MAX] = '\0';
- if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
+ rpt->thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt);
+ if(!rpt->thread) {
rrdpush_receive_log_status(
rpt, "can't create receiver thread",
RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
@@ -1423,6 +1298,7 @@ static struct {
{STREAM_CAP_ZSTD, "ZSTD" },
{STREAM_CAP_GZIP, "GZIP" },
{STREAM_CAP_BROTLI, "BROTLI" },
+ {STREAM_CAP_PROGRESS, "PROGRESS" },
{0 , NULL },
};
@@ -1499,10 +1375,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
STREAM_CAP_SLOTS |
+ STREAM_CAP_PROGRESS |
STREAM_CAP_COMPRESSIONS_AVAILABLE |
- #ifdef NETDATA_TEST_DYNCFG
STREAM_CAP_DYNCFG |
- #endif
STREAM_CAP_IEEE754 |
STREAM_CAP_DATA_WITH_ML |
0) & ~disabled_capabilities;