diff options
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r-- | aclk/aclk_query.c | 81 |
1 files changed, 53 insertions, 28 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 001c1ba02..ae5659310 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -2,7 +2,6 @@ #include "aclk_query.h" #include "aclk_stats.h" -#include "aclk_query_queue.h" #include "aclk_tx_msgs.h" #define ACLK_QUERY_THREAD_NAME "ACLK_Query" @@ -59,6 +58,13 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id) { int res; uuid_t node_id_bin, host_id_bin; + + rrd_rdlock(); + RRDHOST *host = find_host_by_node_id((char *) node_id); + rrd_unlock(); + if (host) + return host; + char host_id[UUID_STR_LEN]; if (uuid_parse(node_id, node_id_bin)) { error("Couldn't parse UUID %s", node_id); @@ -99,26 +105,34 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; + buffer_strcat(log_buffer, query->data.http_api_v2.query); + size_t size = 0; + size_t sent = 0; + w->tv_in = query->created_tv; + now_realtime_timeval(&w->tv_ready); + if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); char nodeid[UUID_STR_LEN]; if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { - error("URL requests node_id but there is not enough chars following"); + error_report(CLOUD_EMSG_MALFORMED_NODE_ID); retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0); goto cleanup; } strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1); query_host = node_id_2_rrdhost(nodeid); if (!query_host) { - error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid); + error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); goto cleanup; } } - buffer_strcat(log_buffer, query->data.http_api_v2.query); - char *mysep = strchr(query->data.http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); @@ -136,11 +150,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } // execute the query - w->tv_in = query->created_tv; - now_realtime_timeval(&w->tv_ready); t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); - size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; - size_t sent = size; + size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; + sent = size; #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -174,6 +186,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) else error("Unknown error during zlib compression."); retval = 1; + w->response.code = 500; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_ZLIB_ERROR, CLOUD_EMSG_ZLIB_ERROR, NULL, 0); goto cleanup; } int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; @@ -214,8 +228,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) // send msg. aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); - // log. struct timeval tv; + +cleanup: now_realtime_timeval(&tv); log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", w->id @@ -232,7 +247,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) , strip_control_characters((char *)buffer_tostring(log_buffer)) ); -cleanup: #ifdef NETDATA_WITH_ZLIB if(w->response.zinitialized) deflateEnd(&w->response.zstream); @@ -287,27 +301,37 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) #endif aclk_query_handler aclk_query_handlers[] = { - { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, - { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, - { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata }, - { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, - { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, - { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, + { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 }, + { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query }, + { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata }, + { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata }, + { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, + { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, #ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register node", .fnc = register_node }, - { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update }, - { .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg }, - { .type = CHART_CONFIG_UPDATED, .name = "chart config updated", .fnc = send_bin_msg }, - { .type = CHART_RESET, .name = "reset chart messages", .fnc = send_bin_msg }, - { .type = RETENTION_UPDATED, .name = "update retention info", .fnc = send_bin_msg }, - { .type = UPDATE_NODE_INFO, .name = "update node info", .fnc = send_bin_msg }, - { .type = ALARM_LOG_HEALTH, .name = "alarm log health", .fnc = send_bin_msg }, - { .type = ALARM_PROVIDE_CFG, .name = "provide alarm config", .fnc = send_bin_msg }, - { .type = ALARM_SNAPSHOT, .name = "alarm snapshot", .fnc = send_bin_msg }, + { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node }, + { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update }, + { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, + { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, + { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, + { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg }, + { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg }, + { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg }, + { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg }, + { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg }, #endif { .type = UNKNOWN, .name = NULL, .fnc = NULL } }; +const char *aclk_query_get_name(aclk_query_type_t qt) +{ + aclk_query_handler *ptr = aclk_query_handlers; + while (ptr->type != UNKNOWN) { + if (ptr->type == qt) + return ptr->name; + ptr++; + } + return "unknown"; +} static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { @@ -315,13 +339,14 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que if (aclk_query_handlers[i].type == query->type) { debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); aclk_query_handlers[i].fnc(query_thr, query); - aclk_query_free(query); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; aclk_queries_per_thread[query_thr->idx]++; + aclk_metrics_per_sample.queries_per_type[query->type]++; ACLK_STATS_UNLOCK; } + aclk_query_free(query); return; } } |