summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_rx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_rx_msgs.c')
-rw-r--r--aclk/aclk_rx_msgs.c468
1 files changed, 265 insertions, 203 deletions
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index e7ce932ea..ecb2b4179 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -119,7 +119,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
}\
ACLK_SHARED_STATE_UNLOCK;
-static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
+static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
if (!aclk_use_new_cloud_arch) {
HTTP_CHECK_AGENT_INITIALIZED();
@@ -172,73 +172,43 @@ error:
return 1;
}
-typedef struct aclk_incoming_msg_type{
- char *name;
- int(*fnc)(struct aclk_request *, char *);
-}aclk_incoming_msg_type;
-
-aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
- { .name = "http", .fnc = aclk_handle_cloud_request_v2 },
- { .name = NULL, .fnc = NULL }
-};
-
-struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
-
-int aclk_handle_cloud_message(char *payload)
+int aclk_handle_cloud_cmd_message(char *payload)
{
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
-
if (unlikely(!payload)) {
- errno = 0;
- error("ACLK incoming message is empty");
- goto err_cleanup_nojson;
+ error_report("ACLK incoming 'cmd' message is empty");
+ return 1;
}
- debug(D_ACLK, "ACLK incoming message (%s)", payload);
+ debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload);
int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
if (unlikely(rc != JSON_OK)) {
- errno = 0;
- error("Malformed json request (%s)", payload);
+ error_report("Malformed json request (%s)", payload);
goto err_cleanup;
}
if (!cloud_to_agent.type_id) {
- errno = 0;
- error("Cloud message is missing compulsory key \"type\"");
+ error_report("Cloud message is missing compulsory key \"type\"");
goto err_cleanup;
}
-
- for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
- if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
- if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
- // in case of success handler is supposed to clean up after itself
- // or as in the case of aclk_handle_cloud_request take
- // ownership of the pointers (done to avoid copying)
- // see what `aclk_queue_query` parameter `internal` does
-
- // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
- // msg handlers (namely aclk_handle_version_responce)
- // can freely change what aclk_incoming_msg_types points to
- // so either exit or restart this for loop
- freez(cloud_to_agent.type_id);
- return 0;
- }
- goto err_cleanup;
- }
+ // Originally we were expecting to have multiple types of 'cmd' message,
+ // but after the new protocol was designed we will ever only have 'http'
+ if (strcmp(cloud_to_agent.type_id, "http")) {
+ error_report("Only 'http' cmd message is supported");
+ goto err_cleanup;
}
- errno = 0;
- error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
+ if (likely(!aclk_handle_cloud_http_request_v2(&cloud_to_agent, payload))) {
+ // aclk_handle_cloud_request takes ownership of the pointers
+ // (to avoid copying) in case of success
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
err_cleanup:
if (cloud_to_agent.payload)
@@ -250,191 +220,283 @@ err_cleanup:
if (cloud_to_agent.callback_topic)
freez(cloud_to_agent.callback_topic);
-err_cleanup_nojson:
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
- }
-
return 1;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+typedef uint32_t simple_hash_t;
+typedef int(*rx_msg_handler)(const char *msg, size_t msg_len);
+
+int handle_old_proto_cmd(const char *msg, size_t msg_len)
{
- // TODO do the look up table with hashes to optimize when there are more
- // than few
- if (!strcmp(message_type, "cmd")) {
- // msg is binary payload in all other cases
- // however in this message from old legacy cloud
- // we have to convert it to C string
- char *str = mallocz(msg_len+1);
- memcpy(str, msg, msg_len);
- str[msg_len] = 0;
- aclk_handle_cloud_message(str);
+ // msg is binary payload in all other cases
+ // however in this message from old legacy cloud
+ // we have to convert it to C string
+ char *str = mallocz(msg_len+1);
+ memcpy(str, msg, msg_len);
+ str[msg_len] = 0;
+ if (aclk_handle_cloud_cmd_message(str)) {
freez(str);
- return;
+ return 1;
}
- if (!strcmp(message_type, "CreateNodeInstanceResult")) {
- node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
- if (!res.machine_guid || !res.node_id) {
- error_report("Error parsing CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
+ freez(str);
+ return 0;
+}
- debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
+int create_node_instance_result(const char *msg, size_t msg_len)
+{
+ node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
+ if (!res.machine_guid || !res.node_id) {
+ error_report("Error parsing CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
+ }
- uuid_t host_id, node_id;
- if (uuid_parse(res.machine_guid, host_id)) {
- error("Error parsing machine_guid provided by CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
- if (uuid_parse(res.node_id, node_id)) {
- error("Error parsing node_id provided by CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
- update_node_id(&host_id, &node_id);
-
- aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
- rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
- rrdhost_aclk_state_unlock(localhost);
-
- RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
- query->data.node_update.live = 0;
-
- if (host) {
- // not all host must have RRDHOST struct created for them
- // if they never connected during runtime of agent
- if (host == localhost) {
- query->data.node_update.live = 1;
- query->data.node_update.hops = 0;
- } else {
- netdata_mutex_lock(&host->receiver_lock);
- query->data.node_update.live = (host->receiver != NULL);
- netdata_mutex_unlock(&host->receiver_lock);
- query->data.node_update.hops = host->system_info->hops;
- }
- }
+ debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
- query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- aclk_queue_query(query);
+ uuid_t host_id, node_id;
+ if (uuid_parse(res.machine_guid, host_id)) {
+ error("Error parsing machine_guid provided by CreateNodeInstanceResult");
freez(res.machine_guid);
- return;
+ freez(res.node_id);
+ return 1;
}
- if (!strcmp(message_type, "SendNodeInstances")) {
- debug(D_ACLK, "Got SendNodeInstances");
- aclk_send_node_instances();
- return;
+ if (uuid_parse(res.node_id, node_id)) {
+ error("Error parsing node_id provided by CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
}
-
- if (!strcmp(message_type, "StreamChartsAndDimensions")) {
- stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return;
+ update_node_id(&host_id, &node_id);
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+
+ RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
+ query->data.node_update.live = 0;
+
+ if (host) {
+ // not all host must have RRDHOST struct created for them
+ // if they never connected during runtime of agent
+ if (host == localhost) {
+ query->data.node_update.live = 1;
+ query->data.node_update.hops = 0;
+ } else {
+ netdata_mutex_lock(&host->receiver_lock);
+ query->data.node_update.live = (host->receiver != NULL);
+ netdata_mutex_unlock(&host->receiver_lock);
+ query->data.node_update.hops = host->system_info->hops;
}
- chart_batch_id = res.batch_id;
- aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ }
+
+ query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ freez(res.machine_guid);
+ return 0;
+}
+
+int send_node_instances(const char *msg, size_t msg_len)
+{
+ UNUSED(msg);
+ UNUSED(msg_len);
+ aclk_send_node_instances();
+ return 0;
+}
+
+int stream_charts_and_dimensions(const char *msg, size_t msg_len)
+{
+ stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
freez(res.claim_id);
freez(res.node_id);
- return;
+ return 1;
}
- if (!strcmp(message_type, "ChartsAndDimensionsAck")) {
- chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return;
- }
- aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ chart_batch_id = res.batch_id;
+ aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int charts_and_dimensions_ack(const char *msg, size_t msg_len)
+{
+ chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
freez(res.claim_id);
freez(res.node_id);
- return;
- }
- if (!strcmp(message_type, "UpdateChartConfigs")) {
- struct update_chart_config res = parse_update_chart_config(msg, msg_len);
- if (!res.claim_id || !res.node_id || !res.hashes)
- error("Error parsing UpdateChartConfigs msg");
- else
- aclk_get_chart_config(res.hashes);
- destroy_update_chart_config(&res);
- return;
+ return 1;
}
- if (!strcmp(message_type, "StartAlarmStreaming")) {
- struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
- if (!res.node_id || !res.batch_id) {
- error("Error parsing StartAlarmStreaming");
- freez(res.node_id);
- return;
- }
- aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int update_chart_configs(const char *msg, size_t msg_len)
+{
+ struct update_chart_config res = parse_update_chart_config(msg, msg_len);
+ if (!res.claim_id || !res.node_id || !res.hashes)
+ error("Error parsing UpdateChartConfigs msg");
+ else
+ aclk_get_chart_config(res.hashes);
+ destroy_update_chart_config(&res);
+ return 0;
+}
+
+int start_alarm_streaming(const char *msg, size_t msg_len)
+{
+ struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
+ if (!res.node_id || !res.batch_id) {
+ error("Error parsing StartAlarmStreaming");
freez(res.node_id);
- return;
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmLogHealth")) {
- char *node_id = parse_send_alarm_log_health(msg, msg_len);
- if (!node_id) {
- error("Error parsing SendAlarmLogHealth");
- return;
- }
- aclk_send_alarm_health_log(node_id);
- freez(node_id);
- return;
+ aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int send_alarm_log_health(const char *msg, size_t msg_len)
+{
+ char *node_id = parse_send_alarm_log_health(msg, msg_len);
+ if (!node_id) {
+ error("Error parsing SendAlarmLogHealth");
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmConfiguration")) {
- char *config_hash = parse_send_alarm_configuration(msg, msg_len);
- if (!config_hash || !*config_hash) {
- error("Error parsing SendAlarmConfiguration");
- freez(config_hash);
- return;
- }
- aclk_send_alarm_configuration(config_hash);
+ aclk_send_alarm_health_log(node_id);
+ freez(node_id);
+ return 0;
+}
+
+int send_alarm_configuration(const char *msg, size_t msg_len)
+{
+ char *config_hash = parse_send_alarm_configuration(msg, msg_len);
+ if (!config_hash || !*config_hash) {
+ error("Error parsing SendAlarmConfiguration");
freez(config_hash);
- return;
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmSnapshot")) {
- struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
- if (!sas->node_id || !sas->claim_id) {
- error("Error parsing SendAlarmSnapshot");
- destroy_send_alarm_snapshot(sas);
- return;
- }
- aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ aclk_send_alarm_configuration(config_hash);
+ freez(config_hash);
+ return 0;
+}
+
+int send_alarm_snapshot(const char *msg, size_t msg_len)
+{
+ struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
+ if (!sas->node_id || !sas->claim_id) {
+ error("Error parsing SendAlarmSnapshot");
destroy_send_alarm_snapshot(sas);
- return;
+ return 1;
+ }
+ aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ destroy_send_alarm_snapshot(sas);
+ return 0;
+}
+
+int handle_disconnect_req(const char *msg, size_t msg_len)
+{
+ struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
+ if (!cmd)
+ return 1;
+ if (cmd->permaban) {
+ error("Cloud Banned This Agent!");
+ aclk_disable_runtime = 1;
+ }
+ info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
+ if (cmd->reconnect_after_s > 0) {
+ aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
+ info(
+ "Cloud asks not to reconnect for %u seconds. We shall honor that request",
+ (unsigned int)cmd->reconnect_after_s);
+ }
+ disconnect_req = 1;
+ freez(cmd->error_description);
+ freez(cmd);
+ return 0;
+}
+
+typedef struct {
+ const char *name;
+ simple_hash_t name_hash;
+ rx_msg_handler fnc;
+} new_cloud_rx_msg_t;
+
+new_cloud_rx_msg_t rx_msgs[] = {
+ { .name = "cmd", .name_hash = 0, .fnc = handle_old_proto_cmd },
+ { .name = "CreateNodeInstanceResult", .name_hash = 0, .fnc = create_node_instance_result },
+ { .name = "SendNodeInstances", .name_hash = 0, .fnc = send_node_instances },
+ { .name = "StreamChartsAndDimensions", .name_hash = 0, .fnc = stream_charts_and_dimensions },
+ { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack },
+ { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs },
+ { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming },
+ { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health },
+ { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
+ { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
+ { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
+ { .name = NULL, .name_hash = 0, .fnc = NULL },
+};
+
+new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash)
+{
+ // we can afford to not compare strings after hash match
+ // because we check for collisions at initialization in
+ // aclk_init_rx_msg_handlers()
+ for (int i = 0; rx_msgs[i].fnc; i++) {
+ if (rx_msgs[i].name_hash == hash)
+ return &rx_msgs[i];
}
- if (!strcmp(message_type, "DisconnectReq")) {
- struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
- if (!cmd)
- return;
- if (cmd->permaban) {
- error ("Cloud Banned This Agent!");
- aclk_disable_runtime = 1;
+ return NULL;
+}
+
+void aclk_init_rx_msg_handlers(void)
+{
+ for (int i = 0; rx_msgs[i].fnc; i++) {
+ simple_hash_t hash = simple_hash(rx_msgs[i].name);
+ new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash);
+ if (unlikely(hdl)) {
+ // the list of message names changes only by changing
+ // the source code, therefore fatal is appropriate
+ fatal("Hash collision. Choose better hash. Added '%s' clashes with existing '%s'", rx_msgs[i].name, hdl->name);
+ }
+ rx_msgs[i].name_hash = hash;
+ }
+}
+
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+{
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
+ }
+ new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type));
+ debug(D_ACLK, "Got message named '%s' from cloud", message_type);
+ if (unlikely(!msg_descriptor)) {
+ error("Do not know how to handle message of type '%s'. Ignoring", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
}
- info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
- if (cmd->reconnect_after_s > 0) {
- aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
- info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s);
+ return;
+ }
+ if (msg_descriptor->fnc(msg, msg_len)) {
+ error("Error processing message of type '%s'", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
}
- disconnect_req = 1;
- freez(cmd->error_description);
- freez(cmd);
return;
}
- error ("Unknown new cloud arch message type received \"%s\"", message_type);
}
#endif