summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_query.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r--aclk/aclk_query.c28
1 files changed, 16 insertions, 12 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 5301c281f..9eced0811 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -4,8 +4,6 @@
#include "aclk_stats.h"
#include "aclk_tx_msgs.h"
-#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
-
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
@@ -64,19 +62,19 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
int retval = 0;
usec_t t;
BUFFER *local_buffer = NULL;
- BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE);
+ BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE, &netdata_buffers_statistics.buffers_aclk);
RRDHOST *query_host = localhost;
#ifdef NETDATA_WITH_ZLIB
int z_ret;
- BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
char *start, *end;
#endif
struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
- w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk);
strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
@@ -193,7 +191,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w);
- local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
local_buffer->contenttype = CT_APPLICATION_JSON;
buffer_strcat(local_buffer, w->response.header_output->buffer);
@@ -327,6 +325,11 @@ static void worker_aclk_register(void) {
}
}
+static void aclk_query_request_cancel(void *data)
+{
+ pthread_cond_broadcast((pthread_cond_t *) data);
+}
+
/**
* Main query processing thread
*/
@@ -336,7 +339,9 @@ void *aclk_query_main_thread(void *ptr)
struct aclk_query_thread *query_thr = ptr;
- while (!netdata_exit) {
+ service_register(SERVICE_THREAD_TYPE_NETDATA, aclk_query_request_cancel, NULL, &query_cond_wait, false);
+
+ while (service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) {
aclk_query_process_msgs(query_thr);
worker_is_idle();
@@ -359,14 +364,13 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss
query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
for (int i = 0; i < query_threads->count; i++) {
query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
+ query_threads->thread_list[i].client = client;
- if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
+ if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "ACLK_QRY[%d]", i) < 0))
error("snprintf encoding error");
netdata_thread_create(
&query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
&query_threads->thread_list[i]);
-
- query_threads->thread_list[i].client = client;
}
}