diff options
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 419 |
1 files changed, 238 insertions, 181 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index a42bc13a0..7c1df2cad 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -39,9 +39,9 @@ struct config stream_config = { }; unsigned int default_rrdpush_enabled = 0; -#ifdef ENABLE_RRDPUSH_COMPRESSION +STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE; + unsigned int default_rrdpush_compression_enabled = 1; -#endif char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; char *default_rrdpush_send_charts_matching = NULL; @@ -57,53 +57,16 @@ static void load_stream_conf() { errno = 0; char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) { - netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename); + nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load user config '%s'. Will try stock config.", filename); freez(filename); filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) - netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); + nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); } freez(filename); } -STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { - - // we can have DATA_WITH_ML when INTERPOLATED is available - bool ml_capability = true; - - if(host && sender) { - // we have DATA_WITH_ML capability - // we should remove the DATA_WITH_ML capability if our database does not have anomaly info - // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML - netdata_mutex_lock(&host->receiver_lock); - - if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) - ml_capability = false; - - netdata_mutex_unlock(&host->receiver_lock); - } - - return STREAM_CAP_V1 | - STREAM_CAP_V2 | - STREAM_CAP_VN | - STREAM_CAP_VCAPS | - STREAM_CAP_HLABELS | - STREAM_CAP_CLAIM | - STREAM_CAP_CLABELS | - STREAM_CAP_FUNCTIONS | - STREAM_CAP_REPLICATION | - STREAM_CAP_BINARY | - STREAM_CAP_INTERPOLATED | - STREAM_HAS_COMPRESSION | -#ifdef NETDATA_TEST_DYNCFG - STREAM_CAP_DYNCFG | -#endif - (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | - (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | - 0; -} - bool rrdpush_receiver_needs_dbengine() { struct section *co; @@ -145,13 +108,27 @@ int rrdpush_init() { rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s); -#ifdef ENABLE_RRDPUSH_COMPRESSION default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enable compression", default_rrdpush_compression_enabled); -#endif + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "brotli compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "zstd compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "lz4 compression acceleration", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "gzip compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP]); if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { - netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing."); + nd_log_daemon(NDLP_WARNING, "STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; } @@ -159,7 +136,7 @@ int rrdpush_init() { netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate); if(!netdata_ssl_validate_certificate_sender) - netdata_log_info("SSL: streaming senders will skip SSL certificates verification."); + nd_log_daemon(NDLP_NOTICE, "SSL: streaming senders will skip SSL certificates verification."); netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL); netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL); @@ -245,11 +222,13 @@ static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) { // Send the current chart definition. // Assumes that collector thread has already called sender_start for mutex / buffer state. static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { - bool replication_progress = false; + uint32_t version = rrdset_metadata_version(st); RRDHOST *host = st->rrdhost; + NUMBER_ENCODING integer_encoding = stream_has_capability(host->sender, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + bool with_slots = stream_has_capability(host->sender, STREAM_CAP_SLOTS) ? true : false; - rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + bool replication_progress = false; // properly set the name for the remote end to parse it char *name = ""; @@ -264,10 +243,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { } } + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_CHART, sizeof(PLUGINSD_KEYWORD_CHART) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot); + } + // send the chart buffer_sprintf( wb - , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n" + , " \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , rrdset_id(st) , name , rrdset_title(st) @@ -292,19 +278,25 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the dimensions RRDDIM *rd; rrddim_foreach_read(rd, st) { + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_DIMENSION, sizeof(PLUGINSD_KEYWORD_DIMENSION) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + buffer_sprintf( - wb - , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" - , rrddim_id(rd) - , rrddim_name(rd) - , rrd_algorithm_name(rd->algorithm) - , rd->multiplier - , rd->divisor - , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":"" - , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" - , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" + wb + , " \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" + , rrddim_id(rd) + , rrddim_name(rd) + , rrd_algorithm_name(rd->algorithm) + , rd->multiplier + , rd->divisor + , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":"" + , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" + , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" ); - rrddim_set_exposed(rd); } rrddim_foreach_done(rd); @@ -339,7 +331,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { #endif } - st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + // we can set the exposed flag, after we commit the buffer + // because replication may pick it up prematurely + rrddim_foreach_read(rd, st) { + rrddim_metadata_exposed_upstream(rd, version); + } + rrddim_foreach_done(rd); + rrdset_metadata_exposed_upstream(st, version); + + st->rrdpush.sender.resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); return replication_progress; } @@ -349,7 +351,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); - if(st->last_collected_time.tv_sec > st->upstream_resync_time_s) + if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s) buffer_print_uint64(wb, st->usec_since_last_update); else buffer_fast_strcat(wb, "0", 1); @@ -361,7 +363,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta if(unlikely(!rrddim_check_updated(rd))) continue; - if(likely(rrddim_check_exposed(rd))) { + if(likely(rrddim_check_upstream_exposed_collector(rd))) { buffer_fast_strcat(wb, "SET \"", 5); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "\" = ", 4); @@ -372,7 +374,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); // we will include it in the next iteration - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrddim_metadata_updated(rd); } } rrddim_foreach_done(rd); @@ -390,12 +392,12 @@ bool rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; if(unlikely(!rrdhost_can_send_definitions_to_parent(host) - || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST)))) + || !should_send_chart_matching(st, rrdset_flag_get(st)))) { return false; + } BUFFER *wb = sender_start(host->sender); rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); return true; @@ -410,6 +412,7 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags)) return; + bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; BUFFER *wb = rsb->wb; @@ -419,7 +422,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ if(unlikely(rsb->begin_v2_added)) buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id)); buffer_fast_strcat(wb, "' ", 2); buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every); @@ -436,7 +446,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ rsb->begin_v2_added = true; } - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); @@ -485,11 +502,14 @@ void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, cons BUFFER *wb = sender_start(host->sender); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state); - if (job->reason) + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state); + + if (job->reason && strlen(job->reason)) buffer_sprintf(wb, " \"%s\"", job->reason); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + buffer_strcat(wb, "\n"); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); @@ -503,7 +523,7 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); } @@ -522,24 +542,24 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); - netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); + nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); } return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; } else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { - netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); + nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { BUFFER *wb = sender_start(host->sender); rrd_functions_expose_global_rrdpush(host, wb); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); } - RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); - bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED); + bool exposed_upstream = rrdset_check_upstream_exposed(st); + RRDSET_FLAGS rrdset_flags = rrdset_flag_get(st); bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED); if(unlikely((exposed_upstream && replication_in_progress) || @@ -549,7 +569,6 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!exposed_upstream)) { BUFFER *wb = sender_start(host->sender); replication_in_progress = rrdpush_send_chart_definition(wb, st); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); } if(replication_in_progress) @@ -597,7 +616,7 @@ void rrdpush_send_global_functions(RRDHOST *host) { rrd_functions_expose_global_rrdpush(host, wb); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); } @@ -630,7 +649,7 @@ void rrdpush_send_dyncfg(RRDHOST *host) { } dfe_done(plug); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); } @@ -656,7 +675,7 @@ void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, cons buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); sender_thread_buffer_free(); } @@ -669,6 +688,19 @@ void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const c buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); sender_thread_buffer_free(); @@ -709,11 +741,9 @@ int connect_to_one_of_destinations( if(d->postpone_reconnection_until > now) continue; - internal_error(true, + nd_log(NDLS_DAEMON, NDLP_DEBUG, "STREAM %s: connecting to '%s' (default port: %d)...", - rrdhost_hostname(host), - string2str(d->destination), - default_port); + rrdhost_hostname(host), string2str(d->destination), default_port); if (reconnects_counter) *reconnects_counter += 1; @@ -766,7 +796,7 @@ bool destinations_init_add_one(char *entry, void *data) { DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next); t->count++; - netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); + nd_log_daemon(NDLP_INFO, "STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); return false; // we return false, so that we will get all defined destinations } @@ -835,11 +865,6 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai // ---------------------------------------------------------------------------- // rrdpush receiver thread -void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) { - netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); -} - - static void rrdpush_sender_thread_spawn(RRDHOST *host) { sender_lock(host->sender); @@ -848,7 +873,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) { snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host)); if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender)) - netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); + nd_log_daemon(NDLP_ERR, "STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); } @@ -898,16 +923,21 @@ static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struc } void *rrdpush_receiver_thread(void *ptr); -int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) { +int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx) { if(!service_running(ABILITY_STREAMING_CONNECTIONS)) return rrdpush_receiver_too_busy_now(w); struct receiver_state *rpt = callocz(1, sizeof(*rpt)); rpt->last_msg_t = now_monotonic_sec(); - rpt->capabilities = STREAM_CAP_INVALID; rpt->hops = 1; + rpt->capabilities = STREAM_CAP_INVALID; + +#ifdef ENABLE_H2O + rpt->h2o_ctx = h2o_ctx; +#endif + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); @@ -1003,7 +1033,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false); if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " + nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: " "request has parameter '%s' = '%s', which is not used." , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" , rpt->client_ip, rpt->client_port @@ -1032,9 +1062,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!rpt->key || !*rpt->key) { rrdpush_receive_log_status( - rpt, - "request without an API key", - "NO API KEY PERMISSION DENIED"); + rpt, "request without an API key, rejecting connection", + RRDPUSH_STATUS_NO_API_KEY, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1042,9 +1071,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!rpt->hostname || !*rpt->hostname) { rrdpush_receive_log_status( - rpt, - "request without a hostname", - "NO HOSTNAME PERMISSION DENIED"); + rpt, "request without a hostname, rejecting connection", + RRDPUSH_STATUS_NO_HOSTNAME, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1055,9 +1083,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!rpt->machine_guid || !*rpt->machine_guid) { rrdpush_receive_log_status( - rpt, - "request without a machine GUID", - "NO MACHINE GUID PERMISSION DENIED"); + rpt, "request without a machine GUID, rejecting connection", + RRDPUSH_STATUS_NO_MACHINE_GUID, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1068,9 +1095,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (regenerate_guid(rpt->key, buf) == -1) { rrdpush_receive_log_status( - rpt, - "API key is not a valid UUID (use the command uuidgen to generate one)", - "INVALID API KEY PERMISSION DENIED"); + rpt, "API key is not a valid UUID (use the command uuidgen to generate one)", + RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1078,9 +1104,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (regenerate_guid(rpt->machine_guid, buf) == -1) { rrdpush_receive_log_status( - rpt, - "machine GUID is not a valid UUID", - "INVALID MACHINE GUID PERMISSION DENIED"); + rpt, "machine GUID is not a valid UUID", + RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1091,9 +1116,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!api_key_type || !*api_key_type) api_key_type = "unknown"; if(strcmp(api_key_type, "api") != 0) { rrdpush_receive_log_status( - rpt, - "API key is a machine GUID", - "INVALID API KEY PERMISSION DENIED"); + rpt, "API key is a machine GUID", + RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1101,9 +1125,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) { rrdpush_receive_log_status( - rpt, - "API key is not enabled", - "API KEY DISABLED PERMISSION DENIED"); + rpt, "API key is not enabled", + RRDPUSH_STATUS_API_KEY_DISABLED, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1119,9 +1142,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri simple_pattern_free(key_allow_from); rrdpush_receive_log_status( - rpt, - "API key is not allowed from this IP", - "NOT ALLOWED IP PERMISSION DENIED"); + rpt, "API key is not allowed from this IP", + RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1137,9 +1159,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (strcmp(machine_guid_type, "machine") != 0) { rrdpush_receive_log_status( - rpt, - "machine GUID is an API key", - "INVALID MACHINE GUID PERMISSION DENIED"); + rpt, "machine GUID is an API key", + RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1148,9 +1169,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) { rrdpush_receive_log_status( - rpt, - "machine GUID is not enabled", - "MACHINE GUID DISABLED PERMISSION DENIED"); + rpt, "machine GUID is not enabled", + RRDPUSH_STATUS_MACHINE_GUID_DISABLED, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1166,9 +1186,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri simple_pattern_free(machine_allow_from); rrdpush_receive_log_status( - rpt, - "machine GUID is not allowed from this IP", - "NOT ALLOWED IP PERMISSION DENIED"); + rpt, "machine GUID is not allowed from this IP", + RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING); receiver_state_free(rpt); return rrdpush_receiver_permission_denied(w); @@ -1183,9 +1202,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rrdpush_receiver_takeover_web_connection(w, rpt); rrdpush_receive_log_status( - rpt, - "machine GUID is my own", - "LOCALHOST PERMISSION DENIED"); + rpt, "machine GUID is my own", + RRDPUSH_STATUS_LOCALHOST, NDLP_DEBUG); char initial_response[HTTP_HEADER_SIZE + 1]; snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); @@ -1196,11 +1214,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri #endif rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { - netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " - "failed to reply." - , rpt->hostname - , rpt->client_ip, rpt->client_port - ); + nd_log_daemon(NDLP_ERR, "STREAM '%s' [receive from [%s]:%s]: " + "failed to reply." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); } receiver_state_free(rpt); @@ -1221,14 +1239,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri spinlock_unlock(&spinlock); char msg[100 + 1]; - snprintfz(msg, 100, + snprintfz(msg, sizeof(msg) - 1, "rate limit, will accept new connection in %ld secs", (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t))); rrdpush_receive_log_status( - rpt, - msg, - "RATE LIMIT TRY LATER"); + rpt, msg, + RRDPUSH_STATUS_RATE_LIMIT, NDLP_NOTICE); receiver_state_free(rpt); return rrdpush_receiver_too_busy_now(w); @@ -1276,29 +1293,26 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri // we can proceed with this connection receiver_stale = false; - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " - "stopped previous stale receiver to accept this one." - , rpt->hostname - , rpt->client_ip, rpt->client_port - ); + nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: " + "stopped previous stale receiver to accept this one." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); } if (receiver_working || receiver_stale) { // another receiver is already connected // try again later -#ifdef NETDATA_INTERNAL_CHECKS char msg[200 + 1]; - snprintfz(msg, 200, + snprintfz(msg, sizeof(msg) - 1, "multiple connections for same host, " - "old connection was used %ld secs ago%s", + "old connection was last used %ld secs ago%s", age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)"); rrdpush_receive_log_status( - rpt, - msg, - "ALREADY CONNECTED"); -#endif + rpt, msg, + RRDPUSH_STATUS_ALREADY_CONNECTED, NDLP_DEBUG); // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up buffer_flush(w->response.data); @@ -1308,8 +1322,6 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } } - netdata_log_debug(D_SYSTEM, "starting STREAM receive thread."); - rrdpush_receiver_takeover_web_connection(w, rpt); char tag[NETDATA_THREAD_TAG_MAX + 1]; @@ -1318,9 +1330,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) { rrdpush_receive_log_status( - rpt, - "can't create receiver thread", - "INTERNAL SERVER ERROR"); + rpt, "can't create receiver thread", + RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR); buffer_flush(w->response.data); buffer_strcat(w->response.data, "Can't handle this request"); @@ -1364,11 +1375,15 @@ static struct { { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" }, { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" }, { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" }, - { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" }, + {STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR, "DISCONNECTED UNKNOWN SOCKET READ ERROR" }, { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" }, { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" }, { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" }, { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" }, + { STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER, "DISCONNECTED NOT SUFFICIENT READ BUFFER" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF, "DISCONNECTED SOCKET EOF" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, "DISCONNECTED SOCKET READ FAILED" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT, "DISCONNECTED SOCKET READ TIMEOUT" }, { 0, NULL }, }; @@ -1389,25 +1404,29 @@ static struct { STREAM_CAPABILITIES cap; const char *str; } capability_names[] = { - { STREAM_CAP_V1, "V1" }, - { STREAM_CAP_V2, "V2" }, - { STREAM_CAP_VN, "VN" }, - { STREAM_CAP_VCAPS, "VCAPS" }, - { STREAM_CAP_HLABELS, "HLABELS" }, - { STREAM_CAP_CLAIM, "CLAIM" }, - { STREAM_CAP_CLABELS, "CLABELS" }, - { STREAM_CAP_COMPRESSION, "COMPRESSION" }, - { STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, - { STREAM_CAP_REPLICATION, "REPLICATION" }, - { STREAM_CAP_BINARY, "BINARY" }, - { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, - { STREAM_CAP_IEEE754, "IEEE754" }, - { STREAM_CAP_DATA_WITH_ML, "ML" }, - { STREAM_CAP_DYNCFG, "DYN_CFG" }, - { 0 , NULL }, + {STREAM_CAP_V1, "V1" }, + {STREAM_CAP_V2, "V2" }, + {STREAM_CAP_VN, "VN" }, + {STREAM_CAP_VCAPS, "VCAPS" }, + {STREAM_CAP_HLABELS, "HLABELS" }, + {STREAM_CAP_CLAIM, "CLAIM" }, + {STREAM_CAP_CLABELS, "CLABELS" }, + {STREAM_CAP_LZ4, "LZ4" }, + {STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, + {STREAM_CAP_REPLICATION, "REPLICATION" }, + {STREAM_CAP_BINARY, "BINARY" }, + {STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, + {STREAM_CAP_IEEE754, "IEEE754" }, + {STREAM_CAP_DATA_WITH_ML, "ML" }, + {STREAM_CAP_DYNCFG, "DYNCFG" }, + {STREAM_CAP_SLOTS, "SLOTS" }, + {STREAM_CAP_ZSTD, "ZSTD" }, + {STREAM_CAP_GZIP, "GZIP" }, + {STREAM_CAP_BROTLI, "BROTLI" }, + {0 , NULL }, }; -static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { +void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { for(size_t i = 0; capability_names[i].str ; i++) { if(caps & capability_names[i].cap) { buffer_strcat(wb, capability_names[i].str); @@ -1434,8 +1453,8 @@ void log_receiver_capabilities(struct receiver_state *rpt) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); - netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", - rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); + nd_log_daemon(NDLP_INFO, "STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", + rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); buffer_free(wb); } @@ -1444,12 +1463,51 @@ void log_sender_capabilities(struct sender_state *s) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); - netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", - rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); + nd_log_daemon(NDLP_INFO, "STREAM %s [send to %s]: established link with negotiated capabilities: %s", + rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); buffer_free(wb); } +STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { + STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities; + + if(host && sender) { + // we have DATA_WITH_ML capability + // we should remove the DATA_WITH_ML capability if our database does not have anomaly info + // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML + netdata_mutex_lock(&host->receiver_lock); + + if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) + disabled_capabilities |= STREAM_CAP_DATA_WITH_ML; + + netdata_mutex_unlock(&host->receiver_lock); + + if(host->sender) + disabled_capabilities |= host->sender->disabled_capabilities; + } + + return (STREAM_CAP_V1 | + STREAM_CAP_V2 | + STREAM_CAP_VN | + STREAM_CAP_VCAPS | + STREAM_CAP_HLABELS | + STREAM_CAP_CLAIM | + STREAM_CAP_CLABELS | + STREAM_CAP_FUNCTIONS | + STREAM_CAP_REPLICATION | + STREAM_CAP_BINARY | + STREAM_CAP_INTERPOLATED | + STREAM_CAP_SLOTS | + STREAM_CAP_COMPRESSIONS_AVAILABLE | + #ifdef NETDATA_TEST_DYNCFG + STREAM_CAP_DYNCFG | + #endif + STREAM_CAP_IEEE754 | + STREAM_CAP_DATA_WITH_ML | + 0) & ~disabled_capabilities; +} + STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) { STREAM_CAPABILITIES caps = 0; @@ -1457,7 +1515,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS; else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM; else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS; - else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION; + else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE; else caps = version; if(caps & STREAM_CAP_VCAPS) @@ -1479,8 +1537,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH } int32_t stream_capabilities_to_vn(uint32_t caps) { - if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION; + if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4; if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS; return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM) } - |