diff options
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r-- | aclk/aclk_rx_msgs.c | 70 |
1 files changed, 24 insertions, 46 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index e6ed332c..83bc5508 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -5,6 +5,7 @@ #include "aclk_stats.h" #include "aclk_query_queue.h" #include "aclk.h" +#include "aclk_capas.h" #include "schema-wrappers/proto_2_json.h" @@ -274,7 +275,7 @@ int create_node_instance_result(const char *msg, size_t msg_len) .node_id = res.node_id }; - RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid); if (host) { // not all host must have RRDHOST struct created for them // if they never connected during runtime of agent @@ -289,20 +290,15 @@ int create_node_instance_result(const char *msg, size_t msg_len) } } - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + node_state_update.capabilities = aclk_get_node_instance_capas(host); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); + freez((void *)node_state_update.capabilities); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; @@ -322,44 +318,25 @@ int send_node_instances(const char *msg, size_t msg_len) int stream_charts_and_dimensions(const char *msg, size_t msg_len) { - aclk_ctx_based = 0; - stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return 1; - } - chart_batch_id = res.batch_id; - aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); - freez(res.claim_id); - freez(res.node_id); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensions msg"); return 0; } int charts_and_dimensions_ack(const char *msg, size_t msg_len) { - chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return 1; - } - aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); - freez(res.claim_id); - freez(res.node_id); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensionsAck msg"); return 0; } int update_chart_configs(const char *msg, size_t msg_len) { - struct update_chart_config res = parse_update_chart_config(msg, msg_len); - if (!res.claim_id || !res.node_id || !res.hashes) - error("Error parsing UpdateChartConfigs msg"); - else - aclk_get_chart_config(res.hashes); - destroy_update_chart_config(&res); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete UpdateChartConfigs msg"); return 0; } @@ -527,7 +504,7 @@ unsigned int aclk_init_rx_msg_handlers(void) return i; } -void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic) +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic __maybe_unused) { if (aclk_stats_enabled) { ACLK_STATS_LOCK; @@ -546,15 +523,16 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t return; } -#ifdef NETDATA_INTERNAL_CHECKS - if (!strncmp(message_type, "cmd", strlen("cmd"))) { - log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name); - } else { - char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name); - log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name); - freez(json); + + if (aclklog_enabled) { + if (!strncmp(message_type, "cmd", strlen("cmd"))) { + log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name); + } else { + char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name); + log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name); + freez(json); + } } -#endif if (aclk_stats_enabled) { ACLK_STATS_LOCK; |