summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_query.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r--aclk/aclk_query.c64
1 files changed, 40 insertions, 24 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index ae5659310..de970fc3d 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -111,6 +111,18 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->tv_in = query->created_tv;
now_realtime_timeval(&w->tv_ready);
+ if (query->timeout) {
+ int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000);
+ if (in_queue > query->timeout) {
+ log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout);
+ retval = 1;
+ w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0);
+ goto cleanup;
+ }
+ }
+
+ RRDHOST *temp_host = NULL;
if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
@@ -125,11 +137,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
query_host = node_id_2_rrdhost(nodeid);
if (!query_host) {
- error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
- retval = 1;
- w->response.code = 404;
- aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
- goto cleanup;
+ temp_host = sql_create_host_by_uuid(nodeid);
+ if (!temp_host) {
+ error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
+ retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
+ goto cleanup;
+ }
}
}
@@ -150,7 +165,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// execute the query
- t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
+ t = aclk_web_api_v1_request(query_host ? query_host : temp_host, w, mysep ? mysep + 1 : "noop");
+ free_temporary_host(temp_host);
size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
sent = size;
@@ -276,22 +292,6 @@ static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_qu
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) {
- // TODO create a pending registrations list
- // with some timeouts to detect registration requests that
- // go unanswered from the cloud
- aclk_generate_node_registration(query_thr->client, &query->data.node_creation);
- return 0;
-}
-
-static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) {
- // TODO create a pending registrations list
- // with some timeouts to detect registration requests that
- // go unanswered from the cloud
- aclk_generate_node_state_update(query_thr->client, &query->data.node_update);
- return 0;
-}
-
static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
// this will be simplified when legacy support is removed
@@ -308,8 +308,8 @@ aclk_query_handler aclk_query_handlers[] = {
{ .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node },
- { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update },
+ { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg },
+ { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg },
{ .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
{ .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
{ .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
@@ -337,6 +337,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
{
for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
if (aclk_query_handlers[i].type == query->type) {
+ worker_is_busy(i);
+
debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
aclk_query_handlers[i].fnc(query_thr, query);
if (aclk_stats_enabled) {
@@ -347,6 +349,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
ACLK_STATS_UNLOCK;
}
aclk_query_free(query);
+
+ worker_is_idle();
return;
}
}
@@ -364,21 +368,33 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
return 0;
}
+static void worker_aclk_register(void) {
+ worker_register("ACLKQUERY");
+ for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
+ worker_register_job_name(i, aclk_query_handlers[i].name);
+ }
+}
+
/**
* Main query processing thread
*/
void *aclk_query_main_thread(void *ptr)
{
+ worker_aclk_register();
+
struct aclk_query_thread *query_thr = ptr;
while (!netdata_exit) {
aclk_query_process_msgs(query_thr);
+ worker_is_idle();
QUERY_THREAD_LOCK;
if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
QUERY_THREAD_UNLOCK;
}
+
+ worker_unregister();
return NULL;
}