From bb8713bbc1c4594366fc735c04910edbf4c61aab Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 31 Mar 2021 14:59:21 +0200 Subject: Merging upstream version 1.30.0. Signed-off-by: Daniel Baumann --- aclk/legacy/aclk_query.c | 60 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) (limited to 'aclk/legacy/aclk_query.c') diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c index 7ab534f16..27ad9ac16 100644 --- a/aclk/legacy/aclk_query.c +++ b/aclk/legacy/aclk_query.c @@ -22,6 +22,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; struct aclk_query { usec_t created; + struct timeval tv_in; usec_t created_boot_time; time_t run_after; // Delay run until after this time ACLK_CMD cmd; // What command is this @@ -30,6 +31,7 @@ struct aclk_query { char *msg_id; // msg_id generated by the cloud (NULL if internal) char *query; // The actual query u_char deleted; // Mark deleted for garbage collect + int idx; // index of query thread struct aclk_query *next; }; @@ -62,6 +64,7 @@ static void aclk_query_free(struct aclk_query *this_query) freez(this_query->query); if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) { struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data; + freez(del->query_endpoint); freez(del->data); freez(del); } @@ -236,7 +239,8 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run new_query->data = data; new_query->next = NULL; - new_query->created = now_realtime_usec(); + now_realtime_timeval(&new_query->tv_in); + new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec; new_query->created_boot_time = now_boottime_usec(); new_query->run_after = run_after; @@ -324,6 +328,7 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli #pragma region ACLK_QUERY #endif + static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) { usec_t t = now_boottime_usec(); @@ -359,8 +364,11 @@ static int aclk_execute_query(struct aclk_query *this_query) mysep = strrchr(this_query->query, '/'); // TODO: handle bad response perhaps in a different way. For now it does to the payload - aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + w->tv_in = this_query->tv_in; now_realtime_timeval(&w->tv_ready); + aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + size_t size = w->response.data->len; + size_t sent = size; w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -382,6 +390,24 @@ static int aclk_execute_query(struct aclk_query *this_query) aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); + struct timeval tv; + now_realtime_timeval(&tv); + + log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + w->id + , gettid() + , this_query->idx + , "DATA" + , sent + , size + , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) + , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 + , dt_usec(&tv, &w->tv_ready) / 1000.0 + , dt_usec(&tv, &w->tv_in) / 1000.0 + , w->response.code + , strip_control_characters(this_query->query) + ); + buffer_free(w->response.data); buffer_free(w->response.header); buffer_free(w->response.header_output); @@ -426,7 +452,11 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) mysep = strrchr(this_query->query, '/'); // execute the query + w->tv_in = this_query->tv_in; + now_realtime_timeval(&w->tv_ready); t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len; + size_t sent = size; #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -475,7 +505,6 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) } #endif - now_realtime_timeval(&w->tv_ready); w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -492,6 +521,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) buffer_need_bytes(local_buffer, w->response.data->len); memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); local_buffer->len += w->response.data->len; + sent = sent - size + w->response.data->len; } else { #endif buffer_strcat(local_buffer, w->response.data->buffer); @@ -502,6 +532,23 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id); + struct timeval tv; + now_realtime_timeval(&tv); + + log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + w->id + , gettid() + , this_query->idx + , "DATA" + , sent + , size + , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) + , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 + , dt_usec(&tv, &w->tv_ready) / 1000.0 + , dt_usec(&tv, &w->tv_in) / 1000.0 + , w->response.code + , strip_control_characters(this_query->query) + ); cleanup: #ifdef NETDATA_WITH_ZLIB if(w->response.zinitialized) @@ -550,6 +597,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) query_count++; host = (RRDHOST*)this_query->data; + this_query->idx = t_info->idx; debug( D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic, @@ -629,6 +677,12 @@ static int aclk_process_query(struct aclk_query_thread *t_info) aclk_metrics_per_sample.queries_dispatched++; aclk_queries_per_thread[t_info->idx]++; ACLK_STATS_UNLOCK; + + if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) { + getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]); + getrusage_called_this_tick[t_info->idx]++; + } + } aclk_query_free(this_query); -- cgit v1.2.3