summaryrefslogtreecommitdiffstats
path: root/aclk/legacy/aclk_rx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/legacy/aclk_rx_msgs.c')
-rw-r--r--aclk/legacy/aclk_rx_msgs.c75
1 files changed, 38 insertions, 37 deletions
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
index 68dad81e0..d4778bbcf 100644
--- a/aclk/legacy/aclk_rx_msgs.c
+++ b/aclk/legacy/aclk_rx_msgs.c
@@ -4,6 +4,7 @@
#include "aclk_common.h"
#include "aclk_stats.h"
#include "aclk_query.h"
+#include "agent_cloud_link.h"
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -107,7 +108,7 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
error(
"Received \"http\" message from Cloud with version %d, but ACLK version %d is used",
cloud_to_agent->version,
- aclk_shared_state.version_neg);
+ legacy_aclk_shared_state.version_neg);
return 1;
}
@@ -126,14 +127,14 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
return 1;
}
- if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
+ if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_v1++;
- aclk_metrics_per_sample.cloud_req_ok++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.cloud_req_v1++;
+ legacy_aclk_metrics_per_sample.cloud_req_ok++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
return 0;
@@ -181,11 +182,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
}
// we do this here due to cloud_req being taken over by query thread
- // which if crazy quick can free it after aclk_queue_query
+ // which if crazy quick can free it after legacy_aclk_queue_query
stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint);
- // aclk_queue_query takes ownership of data pointer
- if (unlikely(aclk_queue_query(
+ // legacy_aclk_queue_query takes ownership of data pointer
+ if (unlikely(legacy_aclk_queue_query(
cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
ACLK_CMD_CLOUD_QUERY_2))) {
error("ACLK failed to queue incoming \"http\" v2 message");
@@ -193,11 +194,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
}
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_v2++;
- aclk_metrics_per_sample.cloud_req_ok++;
- aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.cloud_req_v2++;
+ legacy_aclk_metrics_per_sample.cloud_req_ok++;
+ legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
return 0;
@@ -258,19 +259,19 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha
version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
+ legacy_aclk_shared_state_LOCK;
+ if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) {
errno = 0;
error("The \"version\" message came too late ignoring.");
goto err_cleanup;
}
- if (unlikely(aclk_shared_state.version_neg)) {
+ if (unlikely(legacy_aclk_shared_state.version_neg)) {
errno = 0;
- error("Version has already been set to %d", aclk_shared_state.version_neg);
+ error("Version has already been set to %d", legacy_aclk_shared_state.version_neg);
goto err_cleanup;
}
- aclk_shared_state.version_neg = version;
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state.version_neg = version;
+ legacy_aclk_shared_state_UNLOCK;
info("Choosing version %d of ACLK", version);
@@ -279,7 +280,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha
return 0;
err_cleanup:
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return 1;
}
@@ -288,31 +289,31 @@ typedef struct aclk_incoming_msg_type{
int(*fnc)(struct aclk_request *, char *);
}aclk_incoming_msg_type;
-aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = {
+aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v1 },
{ .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
-aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
+aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v2 },
{ .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
-struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
void aclk_set_rx_handlers(int version)
{
if(version >= ACLK_V_COMPRESSION) {
- aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
+ legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression;
return;
}
- aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+ legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
}
-int aclk_handle_cloud_message(char *payload)
+int legacy_aclk_handle_cloud_message(char *payload)
{
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
@@ -325,7 +326,7 @@ int aclk_handle_cloud_message(char *payload)
debug(D_ACLK, "ACLK incoming message (%s)", payload);
- int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+ int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse);
if (unlikely(rc != JSON_OK)) {
errno = 0;
@@ -339,22 +340,22 @@ int aclk_handle_cloud_message(char *payload)
goto err_cleanup;
}
- if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
+ if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
goto err_cleanup;
}
- for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
- if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
- if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
+ for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) {
+ if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) {
+ if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
// in case of success handler is supposed to clean up after itself
// or as in the case of aclk_handle_cloud_request take
// ownership of the pointers (done to avoid copying)
- // see what `aclk_queue_query` parameter `internal` does
+ // see what `legacy_aclk_queue_query` parameter `internal` does
// NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
// msg handlers (namely aclk_handle_version_response)
- // can freely change what aclk_incoming_msg_types points to
+ // can freely change what legacy_aclk_incoming_msg_types points to
// so either exit or restart this for loop
freez(cloud_to_agent.type_id);
return 0;
@@ -378,9 +379,9 @@ err_cleanup:
err_cleanup_nojson:
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.cloud_req_err++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
return 1;