diff options
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r-- | aclk/aclk_query.c | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 5301c281f..9eced0811 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -4,8 +4,6 @@ #include "aclk_stats.h" #include "aclk_tx_msgs.h" -#define ACLK_QUERY_THREAD_NAME "ACLK_Query" - #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; @@ -64,19 +62,19 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) int retval = 0; usec_t t; BUFFER *local_buffer = NULL; - BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE); + BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE, &netdata_buffers_statistics.buffers_aclk); RRDHOST *query_host = localhost; #ifdef NETDATA_WITH_ZLIB int z_ret; - BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk); char *start, *end; #endif struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk); strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() w->cookie1[0] = 0; // Simulate web_client_create_on_fd() w->cookie2[0] = 0; // Simulate web_client_create_on_fd() @@ -193,7 +191,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk); local_buffer->contenttype = CT_APPLICATION_JSON; buffer_strcat(local_buffer, w->response.header_output->buffer); @@ -327,6 +325,11 @@ static void worker_aclk_register(void) { } } +static void aclk_query_request_cancel(void *data) +{ + pthread_cond_broadcast((pthread_cond_t *) data); +} + /** * Main query processing thread */ @@ -336,7 +339,9 @@ void *aclk_query_main_thread(void *ptr) struct aclk_query_thread *query_thr = ptr; - while (!netdata_exit) { + service_register(SERVICE_THREAD_TYPE_NETDATA, aclk_query_request_cancel, NULL, &query_cond_wait, false); + + while (service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) { aclk_query_process_msgs(query_thr); worker_is_idle(); @@ -359,14 +364,13 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); for (int i = 0; i < query_threads->count; i++) { query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics + query_threads->thread_list[i].client = client; - if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) + if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "ACLK_QRY[%d]", i) < 0)) error("snprintf encoding error"); netdata_thread_create( &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, &query_threads->thread_list[i]); - - query_threads->thread_list[i].client = client; } } |