diff options
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r-- | aclk/aclk_query.c | 64 |
1 files changed, 40 insertions, 24 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index ae5659310..de970fc3d 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -111,6 +111,18 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->tv_in = query->created_tv; now_realtime_timeval(&w->tv_ready); + if (query->timeout) { + int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000); + if (in_queue > query->timeout) { + log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout); + retval = 1; + w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0); + goto cleanup; + } + } + + RRDHOST *temp_host = NULL; if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); char nodeid[UUID_STR_LEN]; @@ -125,11 +137,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) query_host = node_id_2_rrdhost(nodeid); if (!query_host) { - error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); - retval = 1; - w->response.code = 404; - aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); - goto cleanup; + temp_host = sql_create_host_by_uuid(nodeid); + if (!temp_host) { + error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); + retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); + goto cleanup; + } } } @@ -150,7 +165,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } // execute the query - t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); + t = aclk_web_api_v1_request(query_host ? query_host : temp_host, w, mysep ? mysep + 1 : "noop"); + free_temporary_host(temp_host); size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; sent = size; @@ -276,22 +292,6 @@ static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_qu } #ifdef ENABLE_NEW_CLOUD_PROTOCOL -static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) { - // TODO create a pending registrations list - // with some timeouts to detect registration requests that - // go unanswered from the cloud - aclk_generate_node_registration(query_thr->client, &query->data.node_creation); - return 0; -} - -static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) { - // TODO create a pending registrations list - // with some timeouts to detect registration requests that - // go unanswered from the cloud - aclk_generate_node_state_update(query_thr->client, &query->data.node_update); - return 0; -} - static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { // this will be simplified when legacy support is removed @@ -308,8 +308,8 @@ aclk_query_handler aclk_query_handlers[] = { { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, #ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node }, - { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update }, + { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg }, + { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg }, { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, @@ -337,6 +337,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que { for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { if (aclk_query_handlers[i].type == query->type) { + worker_is_busy(i); + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); aclk_query_handlers[i].fnc(query_thr, query); if (aclk_stats_enabled) { @@ -347,6 +349,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que ACLK_STATS_UNLOCK; } aclk_query_free(query); + + worker_is_idle(); return; } } @@ -364,21 +368,33 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) return 0; } +static void worker_aclk_register(void) { + worker_register("ACLKQUERY"); + for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { + worker_register_job_name(i, aclk_query_handlers[i].name); + } +} + /** * Main query processing thread */ void *aclk_query_main_thread(void *ptr) { + worker_aclk_register(); + struct aclk_query_thread *query_thr = ptr; while (!netdata_exit) { aclk_query_process_msgs(query_thr); + worker_is_idle(); QUERY_THREAD_LOCK; if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); QUERY_THREAD_UNLOCK; } + + worker_unregister(); return NULL; } |