summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_query.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r--aclk/aclk_query.c81
1 files changed, 53 insertions, 28 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 001c1ba02..ae5659310 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -2,7 +2,6 @@
#include "aclk_query.h"
#include "aclk_stats.h"
-#include "aclk_query_queue.h"
#include "aclk_tx_msgs.h"
#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
@@ -59,6 +58,13 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id)
{
int res;
uuid_t node_id_bin, host_id_bin;
+
+ rrd_rdlock();
+ RRDHOST *host = find_host_by_node_id((char *) node_id);
+ rrd_unlock();
+ if (host)
+ return host;
+
char host_id[UUID_STR_LEN];
if (uuid_parse(node_id, node_id_bin)) {
error("Couldn't parse UUID %s", node_id);
@@ -99,26 +105,34 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
+ buffer_strcat(log_buffer, query->data.http_api_v2.query);
+ size_t size = 0;
+ size_t sent = 0;
+ w->tv_in = query->created_tv;
+ now_realtime_timeval(&w->tv_ready);
+
if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
- error("URL requests node_id but there is not enough chars following");
+ error_report(CLOUD_EMSG_MALFORMED_NODE_ID);
retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0);
goto cleanup;
}
strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1);
query_host = node_id_2_rrdhost(nodeid);
if (!query_host) {
- error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid);
+ error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
goto cleanup;
}
}
- buffer_strcat(log_buffer, query->data.http_api_v2.query);
-
char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
@@ -136,11 +150,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// execute the query
- w->tv_in = query->created_tv;
- now_realtime_timeval(&w->tv_ready);
t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
- size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
- size_t sent = size;
+ size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
+ sent = size;
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
@@ -174,6 +186,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
else
error("Unknown error during zlib compression.");
retval = 1;
+ w->response.code = 500;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_ZLIB_ERROR, CLOUD_EMSG_ZLIB_ERROR, NULL, 0);
goto cleanup;
}
int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
@@ -214,8 +228,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
// send msg.
aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
- // log.
struct timeval tv;
+
+cleanup:
now_realtime_timeval(&tv);
log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
w->id
@@ -232,7 +247,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
, strip_control_characters((char *)buffer_tostring(log_buffer))
);
-cleanup:
#ifdef NETDATA_WITH_ZLIB
if(w->response.zinitialized)
deflateEnd(&w->response.zstream);
@@ -287,27 +301,37 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
#endif
aclk_query_handler aclk_query_handlers[] = {
- { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
- { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
- { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata },
- { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
- { .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
- { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
+ { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 },
+ { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query },
+ { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata },
+ { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata },
+ { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
+ { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- { .type = REGISTER_NODE, .name = "register node", .fnc = register_node },
- { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update },
- { .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg },
- { .type = CHART_CONFIG_UPDATED, .name = "chart config updated", .fnc = send_bin_msg },
- { .type = CHART_RESET, .name = "reset chart messages", .fnc = send_bin_msg },
- { .type = RETENTION_UPDATED, .name = "update retention info", .fnc = send_bin_msg },
- { .type = UPDATE_NODE_INFO, .name = "update node info", .fnc = send_bin_msg },
- { .type = ALARM_LOG_HEALTH, .name = "alarm log health", .fnc = send_bin_msg },
- { .type = ALARM_PROVIDE_CFG, .name = "provide alarm config", .fnc = send_bin_msg },
- { .type = ALARM_SNAPSHOT, .name = "alarm snapshot", .fnc = send_bin_msg },
+ { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node },
+ { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update },
+ { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
+ { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
+ { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
+ { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg },
+ { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg },
+ { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg },
+ { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg },
+ { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg },
#endif
{ .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
+const char *aclk_query_get_name(aclk_query_type_t qt)
+{
+ aclk_query_handler *ptr = aclk_query_handlers;
+ while (ptr->type != UNKNOWN) {
+ if (ptr->type == qt)
+ return ptr->name;
+ ptr++;
+ }
+ return "unknown";
+}
static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
@@ -315,13 +339,14 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
if (aclk_query_handlers[i].type == query->type) {
debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
aclk_query_handlers[i].fnc(query_thr, query);
- aclk_query_free(query);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[query_thr->idx]++;
+ aclk_metrics_per_sample.queries_per_type[query->type]++;
ACLK_STATS_UNLOCK;
}
+ aclk_query_free(query);
return;
}
}