summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c419
1 files changed, 238 insertions, 181 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index a42bc13a0..7c1df2cad 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -39,9 +39,9 @@ struct config stream_config = {
};
unsigned int default_rrdpush_enabled = 0;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
+STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE;
+
unsigned int default_rrdpush_compression_enabled = 1;
-#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
@@ -57,53 +57,16 @@ static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL)) {
- netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL))
- netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
}
-STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
-
- // we can have DATA_WITH_ML when INTERPOLATED is available
- bool ml_capability = true;
-
- if(host && sender) {
- // we have DATA_WITH_ML capability
- // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
- // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
- netdata_mutex_lock(&host->receiver_lock);
-
- if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
- ml_capability = false;
-
- netdata_mutex_unlock(&host->receiver_lock);
- }
-
- return STREAM_CAP_V1 |
- STREAM_CAP_V2 |
- STREAM_CAP_VN |
- STREAM_CAP_VCAPS |
- STREAM_CAP_HLABELS |
- STREAM_CAP_CLAIM |
- STREAM_CAP_CLABELS |
- STREAM_CAP_FUNCTIONS |
- STREAM_CAP_REPLICATION |
- STREAM_CAP_BINARY |
- STREAM_CAP_INTERPOLATED |
- STREAM_HAS_COMPRESSION |
-#ifdef NETDATA_TEST_DYNCFG
- STREAM_CAP_DYNCFG |
-#endif
- (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
- (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
- 0;
-}
-
bool rrdpush_receiver_needs_dbengine() {
struct section *co;
@@ -145,13 +108,27 @@ int rrdpush_init() {
rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
"enable compression", default_rrdpush_compression_enabled);
-#endif
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "brotli compression level",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI]);
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "zstd compression level",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD]);
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "lz4 compression acceleration",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4]);
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "gzip compression level",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP]);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
- netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing.");
+ nd_log_daemon(NDLP_WARNING, "STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
}
@@ -159,7 +136,7 @@ int rrdpush_init() {
netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
if(!netdata_ssl_validate_certificate_sender)
- netdata_log_info("SSL: streaming senders will skip SSL certificates verification.");
+ nd_log_daemon(NDLP_NOTICE, "SSL: streaming senders will skip SSL certificates verification.");
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
@@ -245,11 +222,13 @@ static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
// Send the current chart definition.
// Assumes that collector thread has already called sender_start for mutex / buffer state.
static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
- bool replication_progress = false;
+ uint32_t version = rrdset_metadata_version(st);
RRDHOST *host = st->rrdhost;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(host->sender, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ bool with_slots = stream_has_capability(host->sender, STREAM_CAP_SLOTS) ? true : false;
- rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ bool replication_progress = false;
// properly set the name for the remote end to parse it
char *name = "";
@@ -264,10 +243,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
}
}
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_CHART, sizeof(PLUGINSD_KEYWORD_CHART) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
+ }
+
// send the chart
buffer_sprintf(
wb
- , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
+ , " \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, rrdset_id(st)
, name
, rrdset_title(st)
@@ -292,19 +278,25 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_DIMENSION, sizeof(PLUGINSD_KEYWORD_DIMENSION) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
buffer_sprintf(
- wb
- , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
- , rrddim_id(rd)
- , rrddim_name(rd)
- , rrd_algorithm_name(rd->algorithm)
- , rd->multiplier
- , rd->divisor
- , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
- , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
- , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
+ wb
+ , " \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
+ , rrddim_id(rd)
+ , rrddim_name(rd)
+ , rrd_algorithm_name(rd->algorithm)
+ , rd->multiplier
+ , rd->divisor
+ , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rrddim_set_exposed(rd);
}
rrddim_foreach_done(rd);
@@ -339,7 +331,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
#endif
}
- st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ // we can set the exposed flag, after we commit the buffer
+ // because replication may pick it up prematurely
+ rrddim_foreach_read(rd, st) {
+ rrddim_metadata_exposed_upstream(rd, version);
+ }
+ rrddim_foreach_done(rd);
+ rrdset_metadata_exposed_upstream(st, version);
+
+ st->rrdpush.sender.resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
return replication_progress;
}
@@ -349,7 +351,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
- if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
+ if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s)
buffer_print_uint64(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);
@@ -361,7 +363,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
if(unlikely(!rrddim_check_updated(rd)))
continue;
- if(likely(rrddim_check_exposed(rd))) {
+ if(likely(rrddim_check_upstream_exposed_collector(rd))) {
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
@@ -372,7 +374,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
// we will include it in the next iteration
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrddim_metadata_updated(rd);
}
}
rrddim_foreach_done(rd);
@@ -390,12 +392,12 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
- || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
+ || !should_send_chart_matching(st, rrdset_flag_get(st)))) {
return false;
+ }
BUFFER *wb = sender_start(host->sender);
rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
return true;
@@ -410,6 +412,7 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
return;
+ bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false;
NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
BUFFER *wb = rsb->wb;
@@ -419,7 +422,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
if(unlikely(rsb->begin_v2_added))
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
buffer_fast_strcat(wb, "' ", 2);
buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
@@ -436,7 +446,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
rsb->begin_v2_added = true;
}
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
@@ -485,11 +502,14 @@ void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, cons
BUFFER *wb = sender_start(host->sender);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
- if (job->reason)
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
+
+ if (job->reason && strlen(job->reason))
buffer_sprintf(wb, " \"%s\"", job->reason);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ buffer_strcat(wb, "\n");
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
@@ -503,7 +523,7 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char
buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
}
@@ -522,24 +542,24 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
- netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
- netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
BUFFER *wb = sender_start(host->sender);
rrd_functions_expose_global_rrdpush(host, wb);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
}
- RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
- bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
+ bool exposed_upstream = rrdset_check_upstream_exposed(st);
+ RRDSET_FLAGS rrdset_flags = rrdset_flag_get(st);
bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
if(unlikely((exposed_upstream && replication_in_progress) ||
@@ -549,7 +569,6 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!exposed_upstream)) {
BUFFER *wb = sender_start(host->sender);
replication_in_progress = rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
}
if(replication_in_progress)
@@ -597,7 +616,7 @@ void rrdpush_send_global_functions(RRDHOST *host) {
rrd_functions_expose_global_rrdpush(host, wb);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
}
@@ -630,7 +649,7 @@ void rrdpush_send_dyncfg(RRDHOST *host) {
}
dfe_done(plug);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
}
@@ -656,7 +675,7 @@ void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, cons
buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
}
@@ -669,6 +688,19 @@ void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const c
buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name);
+
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
@@ -709,11 +741,9 @@ int connect_to_one_of_destinations(
if(d->postpone_reconnection_until > now)
continue;
- internal_error(true,
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
"STREAM %s: connecting to '%s' (default port: %d)...",
- rrdhost_hostname(host),
- string2str(d->destination),
- default_port);
+ rrdhost_hostname(host), string2str(d->destination), default_port);
if (reconnects_counter)
*reconnects_counter += 1;
@@ -766,7 +796,7 @@ bool destinations_init_add_one(char *entry, void *data) {
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
t->count++;
- netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
+ nd_log_daemon(NDLP_INFO, "STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
return false; // we return false, so that we will get all defined destinations
}
@@ -835,11 +865,6 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai
// ----------------------------------------------------------------------------
// rrdpush receiver thread
-void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
- netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
-}
-
-
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
sender_lock(host->sender);
@@ -848,7 +873,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
- netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_ERR, "STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
}
@@ -898,16 +923,21 @@ static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struc
}
void *rrdpush_receiver_thread(void *ptr);
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx) {
if(!service_running(ABILITY_STREAMING_CONNECTIONS))
return rrdpush_receiver_too_busy_now(w);
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
rpt->last_msg_t = now_monotonic_sec();
- rpt->capabilities = STREAM_CAP_INVALID;
rpt->hops = 1;
+ rpt->capabilities = STREAM_CAP_INVALID;
+
+#ifdef ENABLE_H2O
+ rpt->h2o_ctx = h2o_ctx;
+#endif
+
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
@@ -1003,7 +1033,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
"request has parameter '%s' = '%s', which is not used."
, (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
, rpt->client_ip, rpt->client_port
@@ -1032,9 +1062,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->key || !*rpt->key) {
rrdpush_receive_log_status(
- rpt,
- "request without an API key",
- "NO API KEY PERMISSION DENIED");
+ rpt, "request without an API key, rejecting connection",
+ RRDPUSH_STATUS_NO_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1042,9 +1071,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->hostname || !*rpt->hostname) {
rrdpush_receive_log_status(
- rpt,
- "request without a hostname",
- "NO HOSTNAME PERMISSION DENIED");
+ rpt, "request without a hostname, rejecting connection",
+ RRDPUSH_STATUS_NO_HOSTNAME, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1055,9 +1083,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->machine_guid || !*rpt->machine_guid) {
rrdpush_receive_log_status(
- rpt,
- "request without a machine GUID",
- "NO MACHINE GUID PERMISSION DENIED");
+ rpt, "request without a machine GUID, rejecting connection",
+ RRDPUSH_STATUS_NO_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1068,9 +1095,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (regenerate_guid(rpt->key, buf) == -1) {
rrdpush_receive_log_status(
- rpt,
- "API key is not a valid UUID (use the command uuidgen to generate one)",
- "INVALID API KEY PERMISSION DENIED");
+ rpt, "API key is not a valid UUID (use the command uuidgen to generate one)",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1078,9 +1104,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (regenerate_guid(rpt->machine_guid, buf) == -1) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not a valid UUID",
- "INVALID MACHINE GUID PERMISSION DENIED");
+ rpt, "machine GUID is not a valid UUID",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1091,9 +1116,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!api_key_type || !*api_key_type) api_key_type = "unknown";
if(strcmp(api_key_type, "api") != 0) {
rrdpush_receive_log_status(
- rpt,
- "API key is a machine GUID",
- "INVALID API KEY PERMISSION DENIED");
+ rpt, "API key is a machine GUID",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1101,9 +1125,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
rrdpush_receive_log_status(
- rpt,
- "API key is not enabled",
- "API KEY DISABLED PERMISSION DENIED");
+ rpt, "API key is not enabled",
+ RRDPUSH_STATUS_API_KEY_DISABLED, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1119,9 +1142,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
simple_pattern_free(key_allow_from);
rrdpush_receive_log_status(
- rpt,
- "API key is not allowed from this IP",
- "NOT ALLOWED IP PERMISSION DENIED");
+ rpt, "API key is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1137,9 +1159,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (strcmp(machine_guid_type, "machine") != 0) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is an API key",
- "INVALID MACHINE GUID PERMISSION DENIED");
+ rpt, "machine GUID is an API key",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1148,9 +1169,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not enabled",
- "MACHINE GUID DISABLED PERMISSION DENIED");
+ rpt, "machine GUID is not enabled",
+ RRDPUSH_STATUS_MACHINE_GUID_DISABLED, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1166,9 +1186,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
simple_pattern_free(machine_allow_from);
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not allowed from this IP",
- "NOT ALLOWED IP PERMISSION DENIED");
+ rpt, "machine GUID is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1183,9 +1202,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rrdpush_receiver_takeover_web_connection(w, rpt);
rrdpush_receive_log_status(
- rpt,
- "machine GUID is my own",
- "LOCALHOST PERMISSION DENIED");
+ rpt, "machine GUID is my own",
+ RRDPUSH_STATUS_LOCALHOST, NDLP_DEBUG);
char initial_response[HTTP_HEADER_SIZE + 1];
snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
@@ -1196,11 +1214,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
#endif
rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
- netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
- "failed to reply."
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- );
+ nd_log_daemon(NDLP_ERR, "STREAM '%s' [receive from [%s]:%s]: "
+ "failed to reply."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
}
receiver_state_free(rpt);
@@ -1221,14 +1239,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
spinlock_unlock(&spinlock);
char msg[100 + 1];
- snprintfz(msg, 100,
+ snprintfz(msg, sizeof(msg) - 1,
"rate limit, will accept new connection in %ld secs",
(long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
rrdpush_receive_log_status(
- rpt,
- msg,
- "RATE LIMIT TRY LATER");
+ rpt, msg,
+ RRDPUSH_STATUS_RATE_LIMIT, NDLP_NOTICE);
receiver_state_free(rpt);
return rrdpush_receiver_too_busy_now(w);
@@ -1276,29 +1293,26 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
// we can proceed with this connection
receiver_stale = false;
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "stopped previous stale receiver to accept this one."
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- );
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
+ "stopped previous stale receiver to accept this one."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
}
if (receiver_working || receiver_stale) {
// another receiver is already connected
// try again later
-#ifdef NETDATA_INTERNAL_CHECKS
char msg[200 + 1];
- snprintfz(msg, 200,
+ snprintfz(msg, sizeof(msg) - 1,
"multiple connections for same host, "
- "old connection was used %ld secs ago%s",
+ "old connection was last used %ld secs ago%s",
age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)");
rrdpush_receive_log_status(
- rpt,
- msg,
- "ALREADY CONNECTED");
-#endif
+ rpt, msg,
+ RRDPUSH_STATUS_ALREADY_CONNECTED, NDLP_DEBUG);
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
@@ -1308,8 +1322,6 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
}
- netdata_log_debug(D_SYSTEM, "starting STREAM receive thread.");
-
rrdpush_receiver_takeover_web_connection(w, rpt);
char tag[NETDATA_THREAD_TAG_MAX + 1];
@@ -1318,9 +1330,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
rrdpush_receive_log_status(
- rpt,
- "can't create receiver thread",
- "INTERNAL SERVER ERROR");
+ rpt, "can't create receiver thread",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
buffer_flush(w->response.data);
buffer_strcat(w->response.data, "Can't handle this request");
@@ -1364,11 +1375,15 @@ static struct {
{ STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" },
{ STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" },
{ STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" },
- { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" },
+ {STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR, "DISCONNECTED UNKNOWN SOCKET READ ERROR" },
{ STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" },
{ STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" },
{ STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" },
{ STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" },
+ { STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER, "DISCONNECTED NOT SUFFICIENT READ BUFFER" },
+ {STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF, "DISCONNECTED SOCKET EOF" },
+ {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, "DISCONNECTED SOCKET READ FAILED" },
+ {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT, "DISCONNECTED SOCKET READ TIMEOUT" },
{ 0, NULL },
};
@@ -1389,25 +1404,29 @@ static struct {
STREAM_CAPABILITIES cap;
const char *str;
} capability_names[] = {
- { STREAM_CAP_V1, "V1" },
- { STREAM_CAP_V2, "V2" },
- { STREAM_CAP_VN, "VN" },
- { STREAM_CAP_VCAPS, "VCAPS" },
- { STREAM_CAP_HLABELS, "HLABELS" },
- { STREAM_CAP_CLAIM, "CLAIM" },
- { STREAM_CAP_CLABELS, "CLABELS" },
- { STREAM_CAP_COMPRESSION, "COMPRESSION" },
- { STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
- { STREAM_CAP_REPLICATION, "REPLICATION" },
- { STREAM_CAP_BINARY, "BINARY" },
- { STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
- { STREAM_CAP_IEEE754, "IEEE754" },
- { STREAM_CAP_DATA_WITH_ML, "ML" },
- { STREAM_CAP_DYNCFG, "DYN_CFG" },
- { 0 , NULL },
+ {STREAM_CAP_V1, "V1" },
+ {STREAM_CAP_V2, "V2" },
+ {STREAM_CAP_VN, "VN" },
+ {STREAM_CAP_VCAPS, "VCAPS" },
+ {STREAM_CAP_HLABELS, "HLABELS" },
+ {STREAM_CAP_CLAIM, "CLAIM" },
+ {STREAM_CAP_CLABELS, "CLABELS" },
+ {STREAM_CAP_LZ4, "LZ4" },
+ {STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
+ {STREAM_CAP_REPLICATION, "REPLICATION" },
+ {STREAM_CAP_BINARY, "BINARY" },
+ {STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
+ {STREAM_CAP_IEEE754, "IEEE754" },
+ {STREAM_CAP_DATA_WITH_ML, "ML" },
+ {STREAM_CAP_DYNCFG, "DYNCFG" },
+ {STREAM_CAP_SLOTS, "SLOTS" },
+ {STREAM_CAP_ZSTD, "ZSTD" },
+ {STREAM_CAP_GZIP, "GZIP" },
+ {STREAM_CAP_BROTLI, "BROTLI" },
+ {0 , NULL },
};
-static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
+void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
for(size_t i = 0; capability_names[i].str ; i++) {
if(caps & capability_names[i].cap) {
buffer_strcat(wb, capability_names[i].str);
@@ -1434,8 +1453,8 @@ void log_receiver_capabilities(struct receiver_state *rpt) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, rpt->capabilities);
- netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
- rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
buffer_free(wb);
}
@@ -1444,12 +1463,51 @@ void log_sender_capabilities(struct sender_state *s) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, s->capabilities);
- netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
- rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
buffer_free(wb);
}
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
+ STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities;
+
+ if(host && sender) {
+ // we have DATA_WITH_ML capability
+ // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
+ // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
+ disabled_capabilities |= STREAM_CAP_DATA_WITH_ML;
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ if(host->sender)
+ disabled_capabilities |= host->sender->disabled_capabilities;
+ }
+
+ return (STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_CAP_SLOTS |
+ STREAM_CAP_COMPRESSIONS_AVAILABLE |
+ #ifdef NETDATA_TEST_DYNCFG
+ STREAM_CAP_DYNCFG |
+ #endif
+ STREAM_CAP_IEEE754 |
+ STREAM_CAP_DATA_WITH_ML |
+ 0) & ~disabled_capabilities;
+}
+
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
STREAM_CAPABILITIES caps = 0;
@@ -1457,7 +1515,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH
else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
- else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION;
+ else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE;
else caps = version;
if(caps & STREAM_CAP_VCAPS)
@@ -1479,8 +1537,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH
}
int32_t stream_capabilities_to_vn(uint32_t caps) {
- if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION;
+ if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4;
if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
}
-