summaryrefslogtreecommitdiffstats
path: root/aclk/legacy/aclk_rx_msgs.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/legacy/aclk_rx_msgs.c')
-rw-r--r--aclk/legacy/aclk_rx_msgs.c36
1 files changed, 29 insertions, 7 deletions
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
index 99fa9d987..2681445b4 100644
--- a/aclk/legacy/aclk_rx_msgs.c
+++ b/aclk/legacy/aclk_rx_msgs.c
@@ -25,7 +25,7 @@ static inline int aclk_extract_v2_data(char *payload, char **data)
#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref))
static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req)
{
- const char *start, *end, *ptr;
+ const char *start, *end, *ptr, *query_type;
char uuid_str[UUID_STR_LEN];
uuid_t uuid;
@@ -66,6 +66,8 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req,
error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
return 1;
}
+ ptr += strlen(ACLK_CLOUD_REQ_V2_PREFIX);
+ query_type = ptr;
if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) {
errno = 0;
@@ -73,6 +75,11 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req,
return 1;
}
+ if(!(ptr = strchr(ptr, '?')) || ptr > end)
+ ptr = end;
+ cloud_req->query_endpoint = mallocz((ptr - query_type) + 1);
+ strncpyz(cloud_req->query_endpoint, query_type, ptr - query_type);
+
req->payload = mallocz((end - start) + 1);
strncpyz(req->payload, start, end - start);
@@ -122,6 +129,13 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_v1++;
+ aclk_metrics_per_sample.cloud_req_ok++;
+ ACLK_STATS_UNLOCK;
+ }
+
return 0;
}
@@ -131,6 +145,7 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
struct aclk_cloud_req_v2 *cloud_req;
char *data;
+ int stat_idx;
errno = 0;
if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
@@ -165,6 +180,10 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
goto cleanup;
}
+ // we do this here due to cloud_req being taken over by query thread
+ // which if crazy quick can free it after aclk_queue_query
+ stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint);
+
// aclk_queue_query takes ownership of data pointer
if (unlikely(aclk_queue_query(
cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
@@ -173,8 +192,17 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
goto cleanup;
}
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_v2++;
+ aclk_metrics_per_sample.cloud_req_ok++;
+ aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
+ ACLK_STATS_UNLOCK;
+ }
+
return 0;
cleanup:
+ freez(cloud_req->query_endpoint);
freez(cloud_req->data);
freez(cloud_req);
return 1;
@@ -289,12 +317,6 @@ int aclk_handle_cloud_message(char *payload)
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
-
if (unlikely(!payload)) {
errno = 0;
error("ACLK incoming message is empty");