summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/rrdpush.c211
1 files changed, 134 insertions, 77 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index c481871cc..67c43e411 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -39,8 +39,8 @@ struct config stream_config = {
};
unsigned int default_rrdpush_enabled = 0;
-#ifdef ENABLE_COMPRESSION
-unsigned int default_compression_enabled = 1;
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+unsigned int default_rrdpush_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
@@ -57,30 +57,47 @@ static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL)) {
- info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL))
- info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
}
-STREAM_CAPABILITIES stream_our_capabilities() {
- return STREAM_CAP_V1 |
- STREAM_CAP_V2 |
- STREAM_CAP_VN |
- STREAM_CAP_VCAPS |
- STREAM_CAP_HLABELS |
- STREAM_CAP_CLAIM |
- STREAM_CAP_CLABELS |
- STREAM_CAP_FUNCTIONS |
- STREAM_CAP_REPLICATION |
- STREAM_CAP_BINARY |
- STREAM_CAP_INTERPOLATED |
- STREAM_HAS_COMPRESSION |
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
+
+ // we can have DATA_WITH_ML when INTERPOLATED is available
+ bool ml_capability = true;
+
+ if(host && sender) {
+ // we have DATA_WITH_ML capability
+ // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
+ // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
+ ml_capability = false;
+
+ netdata_mutex_unlock(&host->receiver_lock);
+ }
+
+ return STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_HAS_COMPRESSION |
(ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
+ (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
0;
}
@@ -125,13 +142,13 @@ int rrdpush_init() {
rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
-#ifdef ENABLE_COMPRESSION
- default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
- "enable compression", default_compression_enabled);
+#ifdef ENABLE_RRDPUSH_COMPRESSION
+ default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
+ "enable compression", default_rrdpush_compression_enabled);
#endif
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
- error("STREAM [send]: cannot enable sending thread - information is missing.");
+ netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
}
@@ -139,7 +156,7 @@ int rrdpush_init() {
netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
if(!netdata_ssl_validate_certificate_sender)
- info("SSL: streaming senders will skip SSL certificates verification.");
+ netdata_log_info("SSL: streaming senders will skip SSL certificates verification.");
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
@@ -247,7 +264,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the chart
buffer_sprintf(
wb
- , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
+ , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, rrdset_id(st)
, name
, rrdset_title(st)
@@ -274,7 +291,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
rrddim_foreach_read(rd, st) {
buffer_sprintf(
wb
- , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
+ , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
, rrddim_id(rd)
, rrddim_name(rd)
, rrd_algorithm_name(rd->algorithm)
@@ -284,7 +301,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
, rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
, rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rd->exposed = 1;
+ rrddim_set_exposed(rd);
}
rrddim_foreach_done(rd);
@@ -338,14 +355,14 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(unlikely(!rd->updated))
+ if(unlikely(!rrddim_check_updated(rd)))
continue;
- if(likely(rd->exposed)) {
+ if(likely(rrddim_check_exposed(rd))) {
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
- buffer_print_int64(wb, rd->collected_value);
+ buffer_print_int64(wb, rd->collector.collected_value);
buffer_fast_strcat(wb, "\n", 1);
}
else {
@@ -419,10 +436,10 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- if((NETDATA_DOUBLE)rd->last_collected_value == n)
+ if((NETDATA_DOUBLE)rd->collector.last_collected_value == n)
buffer_fast_strcat(wb, "#", 1);
else
buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
@@ -462,13 +479,13 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
- error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
- info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
@@ -504,6 +521,7 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR
buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
return 1;
}
+
void rrdpush_send_host_labels(RRDHOST *host) {
if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
|| !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
@@ -519,8 +537,23 @@ void rrdpush_send_host_labels(RRDHOST *host) {
sender_thread_buffer_free();
}
-void rrdpush_claimed_id(RRDHOST *host)
-{
+void rrdpush_send_global_functions(RRDHOST *host) {
+ if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ return;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ rrd_functions_expose_global_rrdpush(host, wb);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -555,7 +588,7 @@ int connect_to_one_of_destinations(
if(d->postpone_reconnection_until > now)
continue;
- info(
+ netdata_log_info(
"STREAM %s: connecting to '%s' (default port: %d)...",
rrdhost_hostname(host),
string2str(d->destination),
@@ -564,7 +597,8 @@ int connect_to_one_of_destinations(
if (reconnects_counter)
*reconnects_counter += 1;
- d->last_attempt = now;
+ d->since = now;
+ d->attempts++;
sock = connect_to_this(string2str(d->destination), default_port, timeout);
if (sock != -1) {
@@ -611,7 +645,7 @@ bool destinations_init_add_one(char *entry, void *data) {
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
t->count++;
- info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
+ netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
return false; // we return false, so that we will get all defined destinations
}
@@ -649,11 +683,11 @@ void rrdpush_destinations_free(RRDHOST *host) {
// Either the receiver lost the connection or the host is being destroyed.
// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
-void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) {
+void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait) {
if (!host->sender)
return;
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
@@ -664,42 +698,41 @@ void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) {
netdata_thread_cancel(host->rrdpush_sender_thread);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
if(wait) {
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
while(host->sender->tid) {
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
sleep_usec(10 * USEC_PER_MS);
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
}
-
// ----------------------------------------------------------------------------
// rrdpush receiver thread
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
- log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
+ netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
}
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
- error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
+ netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
int rrdpush_receiver_permission_denied(struct web_client *w) {
@@ -750,7 +783,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
return rrdpush_receiver_too_busy_now(w);
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
- rpt->last_msg_t = now_realtime_sec();
+ rpt->last_msg_t = now_monotonic_sec();
rpt->capabilities = STREAM_CAP_INVALID;
rpt->hops = 1;
@@ -823,7 +856,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->tags = strdupz(value);
else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
- rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+ rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false);
else {
// An old Netdata child does not have a compatible streaming protocol, map to something sane.
@@ -846,10 +879,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
name = "NETDATA_HOST_OS_DETECTION";
else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
- rpt->capabilities = convert_stream_version_to_capabilities(1);
+ rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
- info("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"request has parameter '%s' = '%s', which is not used."
, (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
, rpt->client_ip, rpt->client_port
@@ -860,7 +893,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (rpt->capabilities & STREAM_CAP_INVALID)
// no version is supplied, assume version 0;
- rpt->capabilities = convert_stream_version_to_capabilities(0);
+ rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false);
// find the program name and version
if(w->user_agent && w->user_agent[0]) {
@@ -1042,7 +1075,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
#endif
rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
- error("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
"failed to reply."
, rpt->hostname
, rpt->client_ip, rpt->client_port
@@ -1058,13 +1091,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
static time_t last_stream_accepted_t = 0;
time_t now = now_realtime_sec();
- netdata_spinlock_lock(&spinlock);
+ spinlock_lock(&spinlock);
if(unlikely(last_stream_accepted_t == 0))
last_stream_accepted_t = now;
if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
char msg[100 + 1];
snprintfz(msg, 100,
@@ -1081,7 +1114,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
last_stream_accepted_t = now;
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
}
/*
@@ -1106,7 +1139,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (host) {
netdata_mutex_lock(&host->receiver_lock);
if (host->receiver) {
- age = now_realtime_sec() - host->receiver->last_msg_t;
+ age = now_monotonic_sec() - host->receiver->last_msg_t;
if (age < 30)
receiver_working = true;
@@ -1117,12 +1150,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
rrd_unlock();
- if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) {
+ if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) {
// we stopped the receiver
// we can proceed with this connection
receiver_stale = false;
- info("STREAM '%s' [receive from [%s]:%s]: "
+ netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
"stopped previous stale receiver to accept this one."
, rpt->hostname
, rpt->client_ip, rpt->client_port
@@ -1152,7 +1185,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
}
- debug(D_SYSTEM, "starting STREAM receive thread.");
+ netdata_log_debug(D_SYSTEM, "starting STREAM receive thread.");
rrdpush_receiver_takeover_web_connection(w, rpt);
@@ -1177,20 +1210,20 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
void rrdpush_reset_destinations_postpone_time(RRDHOST *host) {
- struct rrdpush_destinations *d;
- for (d = host->destinations; d; d = d->next)
- d->postpone_reconnection_until = 0;
+ uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5;
+ time_t now = now_realtime_sec();
+ for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
+ d->postpone_reconnection_until = now + wait;
}
static struct {
STREAM_HANDSHAKE err;
const char *str;
} handshake_errors[] = {
- { STREAM_HANDSHAKE_OK_V5, "OK_V5" },
- { STREAM_HANDSHAKE_OK_V4, "OK_V4" },
- { STREAM_HANDSHAKE_OK_V3, "OK_V3" },
- { STREAM_HANDSHAKE_OK_V2, "OK_V2" },
- { STREAM_HANDSHAKE_OK_V1, "OK_V1" },
+ { STREAM_HANDSHAKE_OK_V3, "CONNECTED" },
+ { STREAM_HANDSHAKE_OK_V2, "CONNECTED" },
+ { STREAM_HANDSHAKE_OK_V1, "CONNECTED" },
+ { STREAM_HANDSHAKE_NEVER, "" },
{ STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" },
{ STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" },
{ STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" },
@@ -1202,17 +1235,31 @@ static struct {
{ STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" },
{ STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" },
{ STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" },
- { STREAM_HANDSHAKE_INITIALIZATION, "INITIALIZING" },
+ { STREAM_HANDSHAKE_INITIALIZATION, "REMOTE IS INITIALIZING" },
+ { STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, "DISCONNECTED HOST CLEANUP" },
+ { STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER, "DISCONNECTED STALE RECEIVER" },
+ { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" },
+ { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" },
+ { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" },
+ { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" },
+ { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" },
+ { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" },
+ { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" },
+ { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" },
{ 0, NULL },
};
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) {
+ if(handshake_error >= STREAM_HANDSHAKE_OK_V1)
+ // handshake_error is the whole version / capabilities number
+ return "CONNECTED";
+
for(size_t i = 0; handshake_errors[i].str ; i++) {
if(handshake_error == handshake_errors[i].err)
return handshake_errors[i].str;
}
- return "";
+ return "UNKNOWN";
}
static struct {
@@ -1232,6 +1279,7 @@ static struct {
{ STREAM_CAP_BINARY, "BINARY" },
{ STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{ STREAM_CAP_IEEE754, "IEEE754" },
+ { STREAM_CAP_DATA_WITH_ML, "ML" },
{ 0 , NULL },
};
@@ -1245,7 +1293,10 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
}
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) {
- buffer_json_member_add_array(wb, key);
+ if(key)
+ buffer_json_member_add_array(wb, key);
+ else
+ buffer_json_add_array_item_array(wb);
for(size_t i = 0; capability_names[i].str ; i++) {
if(caps & capability_names[i].cap)
@@ -1259,7 +1310,7 @@ void log_receiver_capabilities(struct receiver_state *rpt) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, rpt->capabilities);
- info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
buffer_free(wb);
@@ -1269,13 +1320,13 @@ void log_sender_capabilities(struct sender_state *s) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, s->capabilities);
- info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
buffer_free(wb);
}
-STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
STREAM_CAPABILITIES caps = 0;
if(version <= 1) caps = STREAM_CAP_V1;
@@ -1294,7 +1345,13 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
if(caps & STREAM_CAP_V2)
caps &= ~(STREAM_CAP_V1);
- return caps & stream_our_capabilities();
+ STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender);
+
+ if(!(common_caps & STREAM_CAP_INTERPOLATED))
+ // DATA WITH ML requires INTERPOLATED
+ common_caps &= ~STREAM_CAP_DATA_WITH_ML;
+
+ return common_caps;
}
int32_t stream_capabilities_to_vn(uint32_t caps) {