diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-08-12 07:26:11 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-08-12 07:26:11 +0000 |
commit | 3c315f0fff93aa072472abc10815963ac0035268 (patch) | |
tree | a95f6a96e0e7bd139c010f8dc60b40e5b3062a99 /aclk/aclk_rx_msgs.c | |
parent | Adding upstream version 1.35.1. (diff) | |
download | netdata-3c315f0fff93aa072472abc10815963ac0035268.tar.xz netdata-3c315f0fff93aa072472abc10815963ac0035268.zip |
Adding upstream version 1.36.0.upstream/1.36.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r-- | aclk/aclk_rx_msgs.c | 84 |
1 files changed, 65 insertions, 19 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 27f1bf2dc..e6ed332cc 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -6,6 +6,8 @@ #include "aclk_query_queue.h" #include "aclk.h" +#include "schema-wrappers/proto_2_json.h" + #define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" #define ACLK_CLOUD_REQ_V2_PREFIX "GET /" @@ -55,19 +57,19 @@ static int cloud_to_agent_parse(JSON_ENTRY *e) break; case JSON_NUMBER: if (!strcmp(e->name, "version")) { - data->version = e->data.number; + data->version = (int)e->data.number; break; } if (!strcmp(e->name, "timeout")) { - data->timeout = e->data.number; + data->timeout = (int)e->data.number; break; } if (!strcmp(e->name, "min-version")) { - data->min_version = e->data.number; + data->min_version = (int)e->data.number; break; } if (!strcmp(e->name, "max-version")) { - data->max_version = e->data.number; + data->max_version = (int)e->data.number; break; } @@ -116,20 +118,8 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur return 0; } -#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\ - debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ - ACLK_SHARED_STATE_UNLOCK;\ - return 1;\ - }\ - ACLK_SHARED_STATE_UNLOCK; - static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { - if (!aclk_use_new_cloud_arch) { - HTTP_CHECK_AGENT_INITIALIZED(); - } - aclk_query_t query; errno = 0; @@ -229,7 +219,6 @@ err_cleanup: return 1; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL typedef uint32_t simple_hash_t; typedef int(*rx_msg_handler)(const char *msg, size_t msg_len); @@ -300,6 +289,15 @@ int create_node_instance_result(const char *msg, size_t msg_len) } } + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -324,6 +322,7 @@ int send_node_instances(const char *msg, size_t msg_len) int stream_charts_and_dimensions(const char *msg, size_t msg_len) { + aclk_ctx_based = 0; stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); if (!res.claim_id || !res.node_id) { error("Error parsing StreamChartsAndDimensions msg"); @@ -437,6 +436,41 @@ int handle_disconnect_req(const char *msg, size_t msg_len) return 0; } +int contexts_checkpoint(const char *msg, size_t msg_len) +{ + aclk_ctx_based = 1; + + struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_checkpoint_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + +int stop_streaming_contexts(const char *msg, size_t msg_len) +{ + if (!aclk_ctx_based) { + error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message"); + return 1; + } + + struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_stop_streaming_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + typedef struct { const char *name; simple_hash_t name_hash; @@ -455,6 +489,8 @@ new_cloud_rx_msg_t rx_msgs[] = { { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, + { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint }, + { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts }, { .name = NULL, .name_hash = 0, .fnc = NULL }, }; @@ -491,7 +527,7 @@ unsigned int aclk_init_rx_msg_handlers(void) return i; } -void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic) { if (aclk_stats_enabled) { ACLK_STATS_LOCK; @@ -509,6 +545,17 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t } return; } + +#ifdef NETDATA_INTERNAL_CHECKS + if (!strncmp(message_type, "cmd", strlen("cmd"))) { + log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name); + } else { + char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name); + log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name); + freez(json); + } +#endif + if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++; @@ -524,4 +571,3 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t return; } } -#endif |