From e970e0b37b8bd7f246feb3f70c4136418225e434 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 1 Dec 2021 07:15:04 +0100 Subject: Adding upstream version 1.32.0. Signed-off-by: Daniel Baumann --- aclk/legacy/aclk_rx_msgs.c | 75 +++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 37 deletions(-) (limited to 'aclk/legacy/aclk_rx_msgs.c') diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c index 68dad81e0..d4778bbcf 100644 --- a/aclk/legacy/aclk_rx_msgs.c +++ b/aclk/legacy/aclk_rx_msgs.c @@ -4,6 +4,7 @@ #include "aclk_common.h" #include "aclk_stats.h" #include "aclk_query.h" +#include "agent_cloud_link.h" #ifndef UUID_STR_LEN #define UUID_STR_LEN 37 @@ -107,7 +108,7 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha error( "Received \"http\" message from Cloud with version %d, but ACLK version %d is used", cloud_to_agent->version, - aclk_shared_state.version_neg); + legacy_aclk_shared_state.version_neg); return 1; } @@ -126,14 +127,14 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha return 1; } - if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) + if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_v1++; - aclk_metrics_per_sample.cloud_req_ok++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_v1++; + legacy_aclk_metrics_per_sample.cloud_req_ok++; + LEGACY_ACLK_STATS_UNLOCK; } return 0; @@ -181,11 +182,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha } // we do this here due to cloud_req being taken over by query thread - // which if crazy quick can free it after aclk_queue_query + // which if crazy quick can free it after legacy_aclk_queue_query stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint); - // aclk_queue_query takes ownership of data pointer - if (unlikely(aclk_queue_query( + // legacy_aclk_queue_query takes ownership of data pointer + if (unlikely(legacy_aclk_queue_query( cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD_QUERY_2))) { error("ACLK failed to queue incoming \"http\" v2 message"); @@ -193,11 +194,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha } if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_v2++; - aclk_metrics_per_sample.cloud_req_ok++; - aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_v2++; + legacy_aclk_metrics_per_sample.cloud_req_ok++; + legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; + LEGACY_ACLK_STATS_UNLOCK; } return 0; @@ -258,19 +259,19 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); - ACLK_SHARED_STATE_LOCK; - if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + legacy_aclk_shared_state_LOCK; + if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) { errno = 0; error("The \"version\" message came too late ignoring."); goto err_cleanup; } - if (unlikely(aclk_shared_state.version_neg)) { + if (unlikely(legacy_aclk_shared_state.version_neg)) { errno = 0; - error("Version has already been set to %d", aclk_shared_state.version_neg); + error("Version has already been set to %d", legacy_aclk_shared_state.version_neg); goto err_cleanup; } - aclk_shared_state.version_neg = version; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state.version_neg = version; + legacy_aclk_shared_state_UNLOCK; info("Choosing version %d of ACLK", version); @@ -279,7 +280,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha return 0; err_cleanup: - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return 1; } @@ -288,31 +289,31 @@ typedef struct aclk_incoming_msg_type{ int(*fnc)(struct aclk_request *, char *); }aclk_incoming_msg_type; -aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = { +aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v1 }, { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; -aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { +aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; -struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1; +struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; void aclk_set_rx_handlers(int version) { if(version >= ACLK_V_COMPRESSION) { - aclk_incoming_msg_types = aclk_incoming_msg_types_compression; + legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression; return; } - aclk_incoming_msg_types = aclk_incoming_msg_types_v1; + legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; } -int aclk_handle_cloud_message(char *payload) +int legacy_aclk_handle_cloud_message(char *payload) { struct aclk_request cloud_to_agent; memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); @@ -325,7 +326,7 @@ int aclk_handle_cloud_message(char *payload) debug(D_ACLK, "ACLK incoming message (%s)", payload); - int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse); if (unlikely(rc != JSON_OK)) { errno = 0; @@ -339,22 +340,22 @@ int aclk_handle_cloud_message(char *payload) goto err_cleanup; } - if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { + if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); goto err_cleanup; } - for (int i = 0; aclk_incoming_msg_types[i].name; i++) { - if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { - if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { + for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) { + if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { // in case of success handler is supposed to clean up after itself // or as in the case of aclk_handle_cloud_request take // ownership of the pointers (done to avoid copying) - // see what `aclk_queue_query` parameter `internal` does + // see what `legacy_aclk_queue_query` parameter `internal` does // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! // msg handlers (namely aclk_handle_version_response) - // can freely change what aclk_incoming_msg_types points to + // can freely change what legacy_aclk_incoming_msg_types points to // so either exit or restart this for loop freez(cloud_to_agent.type_id); return 0; @@ -378,9 +379,9 @@ err_cleanup: err_cleanup_nojson: if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_err++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_err++; + LEGACY_ACLK_STATS_UNLOCK; } return 1; -- cgit v1.2.3