summaryrefslogtreecommitdiffstats
path: root/src/aclk/aclk_rx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/aclk_rx_msgs.c')
-rw-r--r--src/aclk/aclk_rx_msgs.c43
1 files changed, 7 insertions, 36 deletions
diff --git a/src/aclk/aclk_rx_msgs.c b/src/aclk/aclk_rx_msgs.c
index 8db8e3f1e..36bd3599d 100644
--- a/src/aclk/aclk_rx_msgs.c
+++ b/src/aclk/aclk_rx_msgs.c
@@ -2,7 +2,6 @@
#include "aclk_rx_msgs.h"
-#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk.h"
#include "aclk_capas.h"
@@ -165,7 +164,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
// it would be strange to get URL from `dedup_id`
query->data.http_api_v2.query = query->dedup_id;
query->msg_id = cloud_to_agent->msg_id;
- aclk_queue_query(query);
+ aclk_execute_query(query);
return 0;
error:
@@ -268,7 +267,7 @@ int create_node_instance_result(const char *msg, size_t msg_len)
freez(res.node_id);
return 1;
}
- update_node_id(&host_id, &node_id);
+ sql_update_node_id(&host_id, &node_id);
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
@@ -292,17 +291,16 @@ int create_node_instance_result(const char *msg, size_t msg_len)
node_state_update.capabilities = aclk_get_node_instance_capas(host);
}
- rrdhost_aclk_state_lock(localhost);
- node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ CLAIM_ID claim_id = claim_id_get();
+ node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
- rrdhost_aclk_state_unlock(localhost);
freez((void *)node_state_update.capabilities);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
- aclk_queue_query(query);
+ aclk_execute_query(query);
freez(res.node_id);
freez(res.machine_guid);
return 0;
@@ -409,7 +407,7 @@ int handle_disconnect_req(const char *msg, size_t msg_len)
"Cloud asks not to reconnect for %u seconds. We shall honor that request",
(unsigned int)cmd->reconnect_after_s);
}
- disconnect_req = 1;
+ disconnect_req = ACLK_CLOUD_DISCONNECT;
freez(cmd->error_description);
freez(cmd);
return 0;
@@ -503,12 +501,7 @@ new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash)
return NULL;
}
-const char *rx_handler_get_name(size_t i)
-{
- return rx_msgs[i].name;
-}
-
-unsigned int aclk_init_rx_msg_handlers(void)
+void aclk_init_rx_msg_handlers(void)
{
int i;
for (i = 0; rx_msgs[i].fnc; i++) {
@@ -521,29 +514,17 @@ unsigned int aclk_init_rx_msg_handlers(void)
}
rx_msgs[i].name_hash = hash;
}
- return i;
}
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic __maybe_unused)
{
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type));
netdata_log_debug(D_ACLK, "Got message named '%s' from cloud", message_type);
if (unlikely(!msg_descriptor)) {
netdata_log_error("Do not know how to handle message of type '%s'. Ignoring", message_type);
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
- }
return;
}
-
if (aclklog_enabled) {
if (!strncmp(message_type, "cmd", strlen("cmd"))) {
log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name);
@@ -554,18 +535,8 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
}
}
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++;
- ACLK_STATS_UNLOCK;
- }
if (msg_descriptor->fnc(msg, msg_len)) {
netdata_log_error("Error processing message of type '%s'", message_type);
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
- }
return;
}
}