diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-01-26 18:05:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-01-26 18:05:10 +0000 |
commit | 34a0b66bc2d48223748ed1cf5bc1b305c396bd74 (patch) | |
tree | fbd36be86cc6bc4288fe627f2b5beada569848bb /aclk/aclk_rx_msgs.c | |
parent | Adding upstream version 1.32.1. (diff) | |
download | netdata-34a0b66bc2d48223748ed1cf5bc1b305c396bd74.tar.xz netdata-34a0b66bc2d48223748ed1cf5bc1b305c396bd74.zip |
Adding upstream version 1.33.0.upstream/1.33.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r-- | aclk/aclk_rx_msgs.c | 468 |
1 files changed, 265 insertions, 203 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index e7ce932ea..ecb2b4179 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -119,7 +119,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur }\ ACLK_SHARED_STATE_UNLOCK; -static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) +static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { if (!aclk_use_new_cloud_arch) { HTTP_CHECK_AGENT_INITIALIZED(); @@ -172,73 +172,43 @@ error: return 1; } -typedef struct aclk_incoming_msg_type{ - char *name; - int(*fnc)(struct aclk_request *, char *); -}aclk_incoming_msg_type; - -aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { - { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, - { .name = NULL, .fnc = NULL } -}; - -struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression; - -int aclk_handle_cloud_message(char *payload) +int aclk_handle_cloud_cmd_message(char *payload) { struct aclk_request cloud_to_agent; memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_recvd++; - ACLK_STATS_UNLOCK; - } - if (unlikely(!payload)) { - errno = 0; - error("ACLK incoming message is empty"); - goto err_cleanup_nojson; + error_report("ACLK incoming 'cmd' message is empty"); + return 1; } - debug(D_ACLK, "ACLK incoming message (%s)", payload); + debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload); int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); if (unlikely(rc != JSON_OK)) { - errno = 0; - error("Malformed json request (%s)", payload); + error_report("Malformed json request (%s)", payload); goto err_cleanup; } if (!cloud_to_agent.type_id) { - errno = 0; - error("Cloud message is missing compulsory key \"type\""); + error_report("Cloud message is missing compulsory key \"type\""); goto err_cleanup; } - - for (int i = 0; aclk_incoming_msg_types[i].name; i++) { - if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { - if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { - // in case of success handler is supposed to clean up after itself - // or as in the case of aclk_handle_cloud_request take - // ownership of the pointers (done to avoid copying) - // see what `aclk_queue_query` parameter `internal` does - - // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! - // msg handlers (namely aclk_handle_version_responce) - // can freely change what aclk_incoming_msg_types points to - // so either exit or restart this for loop - freez(cloud_to_agent.type_id); - return 0; - } - goto err_cleanup; - } + // Originally we were expecting to have multiple types of 'cmd' message, + // but after the new protocol was designed we will ever only have 'http' + if (strcmp(cloud_to_agent.type_id, "http")) { + error_report("Only 'http' cmd message is supported"); + goto err_cleanup; } - errno = 0; - error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id); + if (likely(!aclk_handle_cloud_http_request_v2(&cloud_to_agent, payload))) { + // aclk_handle_cloud_request takes ownership of the pointers + // (to avoid copying) in case of success + freez(cloud_to_agent.type_id); + return 0; + } err_cleanup: if (cloud_to_agent.payload) @@ -250,191 +220,283 @@ err_cleanup: if (cloud_to_agent.callback_topic) freez(cloud_to_agent.callback_topic); -err_cleanup_nojson: - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_err++; - ACLK_STATS_UNLOCK; - } - return 1; } #ifdef ENABLE_NEW_CLOUD_PROTOCOL -void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +typedef uint32_t simple_hash_t; +typedef int(*rx_msg_handler)(const char *msg, size_t msg_len); + +int handle_old_proto_cmd(const char *msg, size_t msg_len) { - // TODO do the look up table with hashes to optimize when there are more - // than few - if (!strcmp(message_type, "cmd")) { - // msg is binary payload in all other cases - // however in this message from old legacy cloud - // we have to convert it to C string - char *str = mallocz(msg_len+1); - memcpy(str, msg, msg_len); - str[msg_len] = 0; - aclk_handle_cloud_message(str); + // msg is binary payload in all other cases + // however in this message from old legacy cloud + // we have to convert it to C string + char *str = mallocz(msg_len+1); + memcpy(str, msg, msg_len); + str[msg_len] = 0; + if (aclk_handle_cloud_cmd_message(str)) { freez(str); - return; + return 1; } - if (!strcmp(message_type, "CreateNodeInstanceResult")) { - node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); - if (!res.machine_guid || !res.node_id) { - error_report("Error parsing CreateNodeInstanceResult"); - freez(res.machine_guid); - freez(res.node_id); - return; - } + freez(str); + return 0; +} - debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); +int create_node_instance_result(const char *msg, size_t msg_len) +{ + node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); + if (!res.machine_guid || !res.node_id) { + error_report("Error parsing CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return 1; + } - uuid_t host_id, node_id; - if (uuid_parse(res.machine_guid, host_id)) { - error("Error parsing machine_guid provided by CreateNodeInstanceResult"); - freez(res.machine_guid); - freez(res.node_id); - return; - } - if (uuid_parse(res.node_id, node_id)) { - error("Error parsing node_id provided by CreateNodeInstanceResult"); - freez(res.machine_guid); - freez(res.node_id); - return; - } - update_node_id(&host_id, &node_id); - - aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); - query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded - rrdhost_aclk_state_lock(localhost); - query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); - rrdhost_aclk_state_unlock(localhost); - - RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); - query->data.node_update.live = 0; - - if (host) { - // not all host must have RRDHOST struct created for them - // if they never connected during runtime of agent - if (host == localhost) { - query->data.node_update.live = 1; - query->data.node_update.hops = 0; - } else { - netdata_mutex_lock(&host->receiver_lock); - query->data.node_update.live = (host->receiver != NULL); - netdata_mutex_unlock(&host->receiver_lock); - query->data.node_update.hops = host->system_info->hops; - } - } + debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); - query->data.node_update.node_id = res.node_id; // aclk_query_free will free it - query->data.node_update.queryable = 1; - query->data.node_update.session_id = aclk_session_newarch; - aclk_queue_query(query); + uuid_t host_id, node_id; + if (uuid_parse(res.machine_guid, host_id)) { + error("Error parsing machine_guid provided by CreateNodeInstanceResult"); freez(res.machine_guid); - return; + freez(res.node_id); + return 1; } - if (!strcmp(message_type, "SendNodeInstances")) { - debug(D_ACLK, "Got SendNodeInstances"); - aclk_send_node_instances(); - return; + if (uuid_parse(res.node_id, node_id)) { + error("Error parsing node_id provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return 1; } - - if (!strcmp(message_type, "StreamChartsAndDimensions")) { - stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return; + update_node_id(&host_id, &node_id); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + query->data.node_update.live = 0; + + if (host) { + // not all host must have RRDHOST struct created for them + // if they never connected during runtime of agent + if (host == localhost) { + query->data.node_update.live = 1; + query->data.node_update.hops = 0; + } else { + netdata_mutex_lock(&host->receiver_lock); + query->data.node_update.live = (host->receiver != NULL); + netdata_mutex_unlock(&host->receiver_lock); + query->data.node_update.hops = host->system_info->hops; } - chart_batch_id = res.batch_id; - aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); + } + + query->data.node_update.node_id = res.node_id; // aclk_query_free will free it + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + freez(res.machine_guid); + return 0; +} + +int send_node_instances(const char *msg, size_t msg_len) +{ + UNUSED(msg); + UNUSED(msg_len); + aclk_send_node_instances(); + return 0; +} + +int stream_charts_and_dimensions(const char *msg, size_t msg_len) +{ + stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); freez(res.claim_id); freez(res.node_id); - return; + return 1; } - if (!strcmp(message_type, "ChartsAndDimensionsAck")) { - chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return; - } - aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); + chart_batch_id = res.batch_id; + aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); + freez(res.claim_id); + freez(res.node_id); + return 0; +} + +int charts_and_dimensions_ack(const char *msg, size_t msg_len) +{ + chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); freez(res.claim_id); freez(res.node_id); - return; - } - if (!strcmp(message_type, "UpdateChartConfigs")) { - struct update_chart_config res = parse_update_chart_config(msg, msg_len); - if (!res.claim_id || !res.node_id || !res.hashes) - error("Error parsing UpdateChartConfigs msg"); - else - aclk_get_chart_config(res.hashes); - destroy_update_chart_config(&res); - return; + return 1; } - if (!strcmp(message_type, "StartAlarmStreaming")) { - struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); - if (!res.node_id || !res.batch_id) { - error("Error parsing StartAlarmStreaming"); - freez(res.node_id); - return; - } - aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); + freez(res.claim_id); + freez(res.node_id); + return 0; +} + +int update_chart_configs(const char *msg, size_t msg_len) +{ + struct update_chart_config res = parse_update_chart_config(msg, msg_len); + if (!res.claim_id || !res.node_id || !res.hashes) + error("Error parsing UpdateChartConfigs msg"); + else + aclk_get_chart_config(res.hashes); + destroy_update_chart_config(&res); + return 0; +} + +int start_alarm_streaming(const char *msg, size_t msg_len) +{ + struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); + if (!res.node_id || !res.batch_id) { + error("Error parsing StartAlarmStreaming"); freez(res.node_id); - return; + return 1; } - if (!strcmp(message_type, "SendAlarmLogHealth")) { - char *node_id = parse_send_alarm_log_health(msg, msg_len); - if (!node_id) { - error("Error parsing SendAlarmLogHealth"); - return; - } - aclk_send_alarm_health_log(node_id); - freez(node_id); - return; + aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + freez(res.node_id); + return 0; +} + +int send_alarm_log_health(const char *msg, size_t msg_len) +{ + char *node_id = parse_send_alarm_log_health(msg, msg_len); + if (!node_id) { + error("Error parsing SendAlarmLogHealth"); + return 1; } - if (!strcmp(message_type, "SendAlarmConfiguration")) { - char *config_hash = parse_send_alarm_configuration(msg, msg_len); - if (!config_hash || !*config_hash) { - error("Error parsing SendAlarmConfiguration"); - freez(config_hash); - return; - } - aclk_send_alarm_configuration(config_hash); + aclk_send_alarm_health_log(node_id); + freez(node_id); + return 0; +} + +int send_alarm_configuration(const char *msg, size_t msg_len) +{ + char *config_hash = parse_send_alarm_configuration(msg, msg_len); + if (!config_hash || !*config_hash) { + error("Error parsing SendAlarmConfiguration"); freez(config_hash); - return; + return 1; } - if (!strcmp(message_type, "SendAlarmSnapshot")) { - struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); - if (!sas->node_id || !sas->claim_id) { - error("Error parsing SendAlarmSnapshot"); - destroy_send_alarm_snapshot(sas); - return; - } - aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + aclk_send_alarm_configuration(config_hash); + freez(config_hash); + return 0; +} + +int send_alarm_snapshot(const char *msg, size_t msg_len) +{ + struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); + if (!sas->node_id || !sas->claim_id) { + error("Error parsing SendAlarmSnapshot"); destroy_send_alarm_snapshot(sas); - return; + return 1; + } + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + destroy_send_alarm_snapshot(sas); + return 0; +} + +int handle_disconnect_req(const char *msg, size_t msg_len) +{ + struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); + if (!cmd) + return 1; + if (cmd->permaban) { + error("Cloud Banned This Agent!"); + aclk_disable_runtime = 1; + } + info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); + if (cmd->reconnect_after_s > 0) { + aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; + info( + "Cloud asks not to reconnect for %u seconds. We shall honor that request", + (unsigned int)cmd->reconnect_after_s); + } + disconnect_req = 1; + freez(cmd->error_description); + freez(cmd); + return 0; +} + +typedef struct { + const char *name; + simple_hash_t name_hash; + rx_msg_handler fnc; +} new_cloud_rx_msg_t; + +new_cloud_rx_msg_t rx_msgs[] = { + { .name = "cmd", .name_hash = 0, .fnc = handle_old_proto_cmd }, + { .name = "CreateNodeInstanceResult", .name_hash = 0, .fnc = create_node_instance_result }, + { .name = "SendNodeInstances", .name_hash = 0, .fnc = send_node_instances }, + { .name = "StreamChartsAndDimensions", .name_hash = 0, .fnc = stream_charts_and_dimensions }, + { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack }, + { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs }, + { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming }, + { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health }, + { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, + { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, + { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, + { .name = NULL, .name_hash = 0, .fnc = NULL }, +}; + +new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash) +{ + // we can afford to not compare strings after hash match + // because we check for collisions at initialization in + // aclk_init_rx_msg_handlers() + for (int i = 0; rx_msgs[i].fnc; i++) { + if (rx_msgs[i].name_hash == hash) + return &rx_msgs[i]; } - if (!strcmp(message_type, "DisconnectReq")) { - struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); - if (!cmd) - return; - if (cmd->permaban) { - error ("Cloud Banned This Agent!"); - aclk_disable_runtime = 1; + return NULL; +} + +void aclk_init_rx_msg_handlers(void) +{ + for (int i = 0; rx_msgs[i].fnc; i++) { + simple_hash_t hash = simple_hash(rx_msgs[i].name); + new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash); + if (unlikely(hdl)) { + // the list of message names changes only by changing + // the source code, therefore fatal is appropriate + fatal("Hash collision. Choose better hash. Added '%s' clashes with existing '%s'", rx_msgs[i].name, hdl->name); + } + rx_msgs[i].name_hash = hash; + } +} + +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +{ + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_recvd++; + ACLK_STATS_UNLOCK; + } + new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type)); + debug(D_ACLK, "Got message named '%s' from cloud", message_type); + if (unlikely(!msg_descriptor)) { + error("Do not know how to handle message of type '%s'. Ignoring", message_type); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; } - info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); - if (cmd->reconnect_after_s > 0) { - aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; - info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s); + return; + } + if (msg_descriptor->fnc(msg, msg_len)) { + error("Error processing message of type '%s'", message_type); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; } - disconnect_req = 1; - freez(cmd->error_description); - freez(cmd); return; } - error ("Unknown new cloud arch message type received \"%s\"", message_type); } #endif |