diff options
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 211 |
1 files changed, 134 insertions, 77 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index c481871cc..67c43e411 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -39,8 +39,8 @@ struct config stream_config = { }; unsigned int default_rrdpush_enabled = 0; -#ifdef ENABLE_COMPRESSION -unsigned int default_compression_enabled = 1; +#ifdef ENABLE_RRDPUSH_COMPRESSION +unsigned int default_rrdpush_compression_enabled = 1; #endif char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; @@ -57,30 +57,47 @@ static void load_stream_conf() { errno = 0; char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) { - info("CONFIG: cannot load user config '%s'. Will try stock config.", filename); + netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename); freez(filename); filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf"); if(!appconfig_load(&stream_config, filename, 0, NULL)) - info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); + netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename); } freez(filename); } -STREAM_CAPABILITIES stream_our_capabilities() { - return STREAM_CAP_V1 | - STREAM_CAP_V2 | - STREAM_CAP_VN | - STREAM_CAP_VCAPS | - STREAM_CAP_HLABELS | - STREAM_CAP_CLAIM | - STREAM_CAP_CLABELS | - STREAM_CAP_FUNCTIONS | - STREAM_CAP_REPLICATION | - STREAM_CAP_BINARY | - STREAM_CAP_INTERPOLATED | - STREAM_HAS_COMPRESSION | +STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { + + // we can have DATA_WITH_ML when INTERPOLATED is available + bool ml_capability = true; + + if(host && sender) { + // we have DATA_WITH_ML capability + // we should remove the DATA_WITH_ML capability if our database does not have anomaly info + // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML + netdata_mutex_lock(&host->receiver_lock); + + if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) + ml_capability = false; + + netdata_mutex_unlock(&host->receiver_lock); + } + + return STREAM_CAP_V1 | + STREAM_CAP_V2 | + STREAM_CAP_VN | + STREAM_CAP_VCAPS | + STREAM_CAP_HLABELS | + STREAM_CAP_CLAIM | + STREAM_CAP_CLABELS | + STREAM_CAP_FUNCTIONS | + STREAM_CAP_REPLICATION | + STREAM_CAP_BINARY | + STREAM_CAP_INTERPOLATED | + STREAM_HAS_COMPRESSION | (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | + (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | 0; } @@ -125,13 +142,13 @@ int rrdpush_init() { rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s); -#ifdef ENABLE_COMPRESSION - default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, - "enable compression", default_compression_enabled); +#ifdef ENABLE_RRDPUSH_COMPRESSION + default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, + "enable compression", default_rrdpush_compression_enabled); #endif if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { - error("STREAM [send]: cannot enable sending thread - information is missing."); + netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; } @@ -139,7 +156,7 @@ int rrdpush_init() { netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate); if(!netdata_ssl_validate_certificate_sender) - info("SSL: streaming senders will skip SSL certificates verification."); + netdata_log_info("SSL: streaming senders will skip SSL certificates verification."); netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL); netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL); @@ -247,7 +264,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the chart buffer_sprintf( wb - , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" + , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , rrdset_id(st) , name , rrdset_title(st) @@ -274,7 +291,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { rrddim_foreach_read(rd, st) { buffer_sprintf( wb - , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n" + , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n" , rrddim_id(rd) , rrddim_name(rd) , rrd_algorithm_name(rd->algorithm) @@ -284,7 +301,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":"" , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":"" ); - rd->exposed = 1; + rrddim_set_exposed(rd); } rrddim_foreach_done(rd); @@ -338,14 +355,14 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta RRDDIM *rd; rrddim_foreach_read(rd, st) { - if(unlikely(!rd->updated)) + if(unlikely(!rrddim_check_updated(rd))) continue; - if(likely(rd->exposed)) { + if(likely(rrddim_check_exposed(rd))) { buffer_fast_strcat(wb, "SET \"", 5); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "\" = ", 4); - buffer_print_int64(wb, rd->collected_value); + buffer_print_int64(wb, rd->collector.collected_value); buffer_fast_strcat(wb, "\n", 1); } else { @@ -419,10 +436,10 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); - buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value); + buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); - if((NETDATA_DOUBLE)rd->last_collected_value == n) + if((NETDATA_DOUBLE)rd->collector.last_collected_value == n) buffer_fast_strcat(wb, "#", 1); else buffer_print_netdata_double_encoded(wb, doubles_encoding, n); @@ -462,13 +479,13 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); - error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); } return (RRDSET_STREAM_BUFFER) { .wb = NULL, }; } else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { - info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); + netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } @@ -504,6 +521,7 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); return 1; } + void rrdpush_send_host_labels(RRDHOST *host) { if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !stream_has_capability(host->sender, STREAM_CAP_HLABELS))) @@ -519,8 +537,23 @@ void rrdpush_send_host_labels(RRDHOST *host) { sender_thread_buffer_free(); } -void rrdpush_claimed_id(RRDHOST *host) -{ +void rrdpush_send_global_functions(RRDHOST *host) { + if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) + return; + + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) + return; + + BUFFER *wb = sender_start(host->sender); + + rrd_functions_expose_global_rrdpush(host, wb); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -555,7 +588,7 @@ int connect_to_one_of_destinations( if(d->postpone_reconnection_until > now) continue; - info( + netdata_log_info( "STREAM %s: connecting to '%s' (default port: %d)...", rrdhost_hostname(host), string2str(d->destination), @@ -564,7 +597,8 @@ int connect_to_one_of_destinations( if (reconnects_counter) *reconnects_counter += 1; - d->last_attempt = now; + d->since = now; + d->attempts++; sock = connect_to_this(string2str(d->destination), default_port, timeout); if (sock != -1) { @@ -611,7 +645,7 @@ bool destinations_init_add_one(char *entry, void *data) { DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next); t->count++; - info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); + netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host)); return false; // we return false, so that we will get all defined destinations } @@ -649,11 +683,11 @@ void rrdpush_destinations_free(RRDHOST *host) { // Either the receiver lost the connection or the host is being destroyed. // The sender mutex guards thread creation, any spurious data is wiped on reconnection. -void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) { +void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait) { if (!host->sender) return; - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { @@ -664,42 +698,41 @@ void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) { netdata_thread_cancel(host->rrdpush_sender_thread); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); if(wait) { - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); while(host->sender->tid) { - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); sleep_usec(10 * USEC_PER_MS); - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } } - // ---------------------------------------------------------------------------- // rrdpush receiver thread void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) { - log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); + netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); } static void rrdpush_sender_thread_spawn(RRDHOST *host) { - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host)); if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender)) - error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); + netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } int rrdpush_receiver_permission_denied(struct web_client *w) { @@ -750,7 +783,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri return rrdpush_receiver_too_busy_now(w); struct receiver_state *rpt = callocz(1, sizeof(*rpt)); - rpt->last_msg_t = now_realtime_sec(); + rpt->last_msg_t = now_monotonic_sec(); rpt->capabilities = STREAM_CAP_INVALID; rpt->hops = 1; @@ -823,7 +856,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt->tags = strdupz(value); else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID)) - rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0)); + rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false); else { // An old Netdata child does not have a compatible streaming protocol, map to something sane. @@ -846,10 +879,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri name = "NETDATA_HOST_OS_DETECTION"; else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID)) - rpt->capabilities = convert_stream_version_to_capabilities(1); + rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false); if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) { - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "request has parameter '%s' = '%s', which is not used." , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-" , rpt->client_ip, rpt->client_port @@ -860,7 +893,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (rpt->capabilities & STREAM_CAP_INVALID) // no version is supplied, assume version 0; - rpt->capabilities = convert_stream_version_to_capabilities(0); + rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false); // find the program name and version if(w->user_agent && w->user_agent[0]) { @@ -1042,7 +1075,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri #endif rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { - error("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_error("STREAM '%s' [receive from [%s]:%s]: " "failed to reply." , rpt->hostname , rpt->client_ip, rpt->client_port @@ -1058,13 +1091,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri static time_t last_stream_accepted_t = 0; time_t now = now_realtime_sec(); - netdata_spinlock_lock(&spinlock); + spinlock_lock(&spinlock); if(unlikely(last_stream_accepted_t == 0)) last_stream_accepted_t = now; if(now - last_stream_accepted_t < web_client_streaming_rate_t) { - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); char msg[100 + 1]; snprintfz(msg, 100, @@ -1081,7 +1114,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } last_stream_accepted_t = now; - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); } /* @@ -1106,7 +1139,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri if (host) { netdata_mutex_lock(&host->receiver_lock); if (host->receiver) { - age = now_realtime_sec() - host->receiver->last_msg_t; + age = now_monotonic_sec() - host->receiver->last_msg_t; if (age < 30) receiver_working = true; @@ -1117,12 +1150,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } rrd_unlock(); - if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) { + if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) { // we stopped the receiver // we can proceed with this connection receiver_stale = false; - info("STREAM '%s' [receive from [%s]:%s]: " + netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " "stopped previous stale receiver to accept this one." , rpt->hostname , rpt->client_ip, rpt->client_port @@ -1152,7 +1185,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } } - debug(D_SYSTEM, "starting STREAM receive thread."); + netdata_log_debug(D_SYSTEM, "starting STREAM receive thread."); rrdpush_receiver_takeover_web_connection(w, rpt); @@ -1177,20 +1210,20 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri } void rrdpush_reset_destinations_postpone_time(RRDHOST *host) { - struct rrdpush_destinations *d; - for (d = host->destinations; d; d = d->next) - d->postpone_reconnection_until = 0; + uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5; + time_t now = now_realtime_sec(); + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = now + wait; } static struct { STREAM_HANDSHAKE err; const char *str; } handshake_errors[] = { - { STREAM_HANDSHAKE_OK_V5, "OK_V5" }, - { STREAM_HANDSHAKE_OK_V4, "OK_V4" }, - { STREAM_HANDSHAKE_OK_V3, "OK_V3" }, - { STREAM_HANDSHAKE_OK_V2, "OK_V2" }, - { STREAM_HANDSHAKE_OK_V1, "OK_V1" }, + { STREAM_HANDSHAKE_OK_V3, "CONNECTED" }, + { STREAM_HANDSHAKE_OK_V2, "CONNECTED" }, + { STREAM_HANDSHAKE_OK_V1, "CONNECTED" }, + { STREAM_HANDSHAKE_NEVER, "" }, { STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" }, { STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" }, { STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" }, @@ -1202,17 +1235,31 @@ static struct { { STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" }, { STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" }, { STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" }, - { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" }, + { STREAM_HANDSHAKE_INITIALIZATION, "REMOTE IS INITIALIZING" }, + { STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, "DISCONNECTED HOST CLEANUP" }, + { STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER, "DISCONNECTED STALE RECEIVER" }, + { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" }, + { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" }, + { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" }, + { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" }, + { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" }, + { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" }, + { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" }, + { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" }, { 0, NULL }, }; const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) { + if(handshake_error >= STREAM_HANDSHAKE_OK_V1) + // handshake_error is the whole version / capabilities number + return "CONNECTED"; + for(size_t i = 0; handshake_errors[i].str ; i++) { if(handshake_error == handshake_errors[i].err) return handshake_errors[i].str; } - return ""; + return "UNKNOWN"; } static struct { @@ -1232,6 +1279,7 @@ static struct { { STREAM_CAP_BINARY, "BINARY" }, { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, { STREAM_CAP_IEEE754, "IEEE754" }, + { STREAM_CAP_DATA_WITH_ML, "ML" }, { 0 , NULL }, }; @@ -1245,7 +1293,10 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) } void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) { - buffer_json_member_add_array(wb, key); + if(key) + buffer_json_member_add_array(wb, key); + else + buffer_json_add_array_item_array(wb); for(size_t i = 0; capability_names[i].str ; i++) { if(caps & capability_names[i].cap) @@ -1259,7 +1310,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, rpt->capabilities); - info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", + netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb)); buffer_free(wb); @@ -1269,13 +1320,13 @@ void log_sender_capabilities(struct sender_state *s) { BUFFER *wb = buffer_create(100, NULL); stream_capabilities_to_string(wb, s->capabilities); - info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", + netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s", rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb)); buffer_free(wb); } -STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) { +STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) { STREAM_CAPABILITIES caps = 0; if(version <= 1) caps = STREAM_CAP_V1; @@ -1294,7 +1345,13 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) { if(caps & STREAM_CAP_V2) caps &= ~(STREAM_CAP_V1); - return caps & stream_our_capabilities(); + STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender); + + if(!(common_caps & STREAM_CAP_INTERPOLATED)) + // DATA WITH ML requires INTERPOLATED + common_caps &= ~STREAM_CAP_DATA_WITH_ML; + + return common_caps; } int32_t stream_capabilities_to_vn(uint32_t caps) { |