diff options
Diffstat (limited to 'aclk/legacy/aclk_query.c')
-rw-r--r-- | aclk/legacy/aclk_query.c | 843 |
1 files changed, 0 insertions, 843 deletions
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c deleted file mode 100644 index 21eae11f..00000000 --- a/aclk/legacy/aclk_query.c +++ /dev/null @@ -1,843 +0,0 @@ -#include "aclk_common.h" -#include "aclk_query.h" -#include "aclk_stats.h" -#include "aclk_rx_msgs.h" -#include "agent_cloud_link.h" - -#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" - -#define ACLK_QUERY_THREAD_NAME "ACLK_Query" - -pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER; -pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER; -#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait) -#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait) - -#ifndef __GNUC__ -#pragma region ACLK_QUEUE -#endif - -static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; -#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex) -#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex) - -struct aclk_query { - usec_t created; - struct timeval tv_in; - usec_t created_boot_time; - time_t run_after; // Delay run until after this time - ACLK_CMD cmd; // What command is this - char *topic; // Topic to respond to - char *data; // Internal data (NULL if request from the cloud) - char *msg_id; // msg_id generated by the cloud (NULL if internal) - char *query; // The actual query - u_char deleted; // Mark deleted for garbage collect - int idx; // index of query thread - struct aclk_query *next; -}; - -struct aclk_query_queue { - struct aclk_query *aclk_query_head; - struct aclk_query *aclk_query_tail; - unsigned int count; -} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; - - -unsigned int aclk_query_size() -{ - int r; - ACLK_QUEUE_LOCK; - r = aclk_queue.count; - ACLK_QUEUE_UNLOCK; - return r; -} - -/* - * Free a query structure when done - */ -static void aclk_query_free(struct aclk_query *this_query) -{ - if (unlikely(!this_query)) - return; - - freez(this_query->topic); - if (likely(this_query->query)) - freez(this_query->query); - if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) { - struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data; - freez(del->query_endpoint); - freez(del->data); - freez(del); - } - if (likely(this_query->msg_id)) - freez(this_query->msg_id); - freez(this_query); -} - -/* - * Get the next query to process - NULL if nothing there - * The caller needs to free memory by calling aclk_query_free() - * - * topic - * query - * The structure itself - * - */ -static struct aclk_query *aclk_queue_pop() -{ - struct aclk_query *this_query; - - ACLK_QUEUE_LOCK; - - if (likely(!aclk_queue.aclk_query_head)) { - ACLK_QUEUE_UNLOCK; - return NULL; - } - - this_query = aclk_queue.aclk_query_head; - - // Get rid of the deleted entries - while (this_query && this_query->deleted) { - aclk_queue.count--; - - aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; - - if (likely(!aclk_queue.aclk_query_head)) { - aclk_queue.aclk_query_tail = NULL; - } - - aclk_query_free(this_query); - - this_query = aclk_queue.aclk_query_head; - } - - if (likely(!this_query)) { - ACLK_QUEUE_UNLOCK; - return NULL; - } - - if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { - info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); - ACLK_QUEUE_UNLOCK; - return NULL; - } - - aclk_queue.count--; - aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; - - if (likely(!aclk_queue.aclk_query_head)) { - aclk_queue.aclk_query_tail = NULL; - } - - ACLK_QUEUE_UNLOCK; - return this_query; -} - -// Returns the entry after which we need to create a new entry to run at the specified time -// If NULL is returned we need to add to HEAD -// Need to have a QUERY lock before calling this - -static struct aclk_query *aclk_query_find_position(time_t time_to_run) -{ - struct aclk_query *tmp_query, *last_query; - - // Quick check if we will add to the end - if (likely(aclk_queue.aclk_query_tail)) { - if (aclk_queue.aclk_query_tail->run_after <= time_to_run) - return aclk_queue.aclk_query_tail; - } - - last_query = NULL; - tmp_query = aclk_queue.aclk_query_head; - - while (tmp_query) { - if (tmp_query->run_after > time_to_run) - return last_query; - last_query = tmp_query; - tmp_query = tmp_query->next; - } - return last_query; -} - -// Need to have a QUERY lock before calling this -static struct aclk_query * -aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) -{ - struct aclk_query *tmp_query, *prev_query; - UNUSED(cmd); - - tmp_query = aclk_queue.aclk_query_head; - prev_query = NULL; - while (tmp_query) { - if (likely(!tmp_query->deleted)) { - if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { - if ((!data || data == tmp_query->data) && - (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { - if (likely(last_query)) - *last_query = prev_query; - return tmp_query; - } - } - } - prev_query = tmp_query; - tmp_query = tmp_query->next; - } - return NULL; -} - -/* - * Add a query to execute, the result will be send to the specified topic - */ - -int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) -{ - struct aclk_query *new_query, *tmp_query; - - // Ignore all commands while we wait for the agent to initialize - if (unlikely(!aclk_connected)) - return 1; - - run_after = now_realtime_sec() + run_after; - - ACLK_QUEUE_LOCK; - struct aclk_query *last_query = NULL; - - tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); - if (unlikely(tmp_query)) { - if (tmp_query->run_after == run_after) { - ACLK_QUEUE_UNLOCK; - LEGACY_QUERY_THREAD_WAKEUP; - return 0; - } - - if (last_query) - last_query->next = tmp_query->next; - else - aclk_queue.aclk_query_head = tmp_query->next; - - debug(D_ACLK, "Removing double entry"); - aclk_query_free(tmp_query); - aclk_queue.count--; - } - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.queries_queued++; - LEGACY_ACLK_STATS_UNLOCK; - } - - new_query = callocz(1, sizeof(struct aclk_query)); - new_query->cmd = aclk_cmd; - if (internal) { - new_query->topic = strdupz(topic); - if (likely(query)) - new_query->query = strdupz(query); - } else { - new_query->topic = topic; - new_query->query = query; - new_query->msg_id = msg_id; - } - - new_query->data = data; - new_query->next = NULL; - now_realtime_timeval(&new_query->tv_in); - new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec; - new_query->created_boot_time = now_boottime_usec(); - new_query->run_after = run_after; - - debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); - - tmp_query = aclk_query_find_position(run_after); - - if (tmp_query) { - new_query->next = tmp_query->next; - tmp_query->next = new_query; - if (tmp_query == aclk_queue.aclk_query_tail) - aclk_queue.aclk_query_tail = new_query; - aclk_queue.count++; - ACLK_QUEUE_UNLOCK; - LEGACY_QUERY_THREAD_WAKEUP; - return 0; - } - - new_query->next = aclk_queue.aclk_query_head; - aclk_queue.aclk_query_head = new_query; - aclk_queue.count++; - - ACLK_QUEUE_UNLOCK; - LEGACY_QUERY_THREAD_WAKEUP; - return 0; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -#ifndef __GNUC__ -#pragma region Helper Functions -#endif - -/* - * Take a buffer, encode it and rewrite it - * - */ - -static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines) -{ - char *tmp_buffer = mallocz(content_size * 2); - char *dst = tmp_buffer; - while (content_size > 0) { - switch (*src) { - case '\n': - if (keep_newlines) - { - *dst++ = '\\'; - *dst++ = 'n'; - } - break; - case '\t': - break; - case 0x01 ... 0x08: - case 0x0b ... 0x1F: - *dst++ = '\\'; - *dst++ = 'u'; - *dst++ = '0'; - *dst++ = '0'; - *dst++ = (*src < 0x0F) ? '0' : '1'; - *dst++ = to_hex(*src); - break; - case '\"': - *dst++ = '\\'; - *dst++ = *src; - break; - default: - *dst++ = *src; - } - src++; - content_size--; - } - *dst = '\0'; - - return tmp_buffer; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -#ifndef __GNUC__ -#pragma region ACLK_QUERY -#endif - - -static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) -{ - usec_t t = now_boottime_usec(); - legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); - - w->response.code = web_client_api_request_v1(host, w, url); - t = now_boottime_usec() - t; - - legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t); - - return t; -} - -static int aclk_execute_query(struct aclk_query *this_query) -{ - if (strncmp(this_query->query, "/api/v1/", 8) == 0) { - struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() - w->cookie1[0] = 0; // Simulate web_client_create_on_fd() - w->cookie2[0] = 0; // Simulate web_client_create_on_fd() - w->acl = 0x1f; - - char *mysep = strchr(this_query->query, '?'); - if (mysep) { - strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); - *mysep = '\0'; - } else - strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); - - mysep = strrchr(this_query->query, '/'); - - // TODO: handle bad response perhaps in a different way. For now it does to the payload - w->tv_in = this_query->tv_in; - now_realtime_timeval(&w->tv_ready); - aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); - size_t size = w->response.data->len; - size_t sent = size; - w->response.data->date = w->tv_ready.tv_sec; - web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready - BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); - char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); - - buffer_sprintf( - local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}", - w->response.code, encoded_response, encoded_header); - - buffer_sprintf(local_buffer, "\n}"); - - debug(D_ACLK, "Response:%s", encoded_header); - - aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); - - struct timeval tv; - now_realtime_timeval(&tv); - - log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", - w->id - , gettid() - , this_query->idx - , "DATA" - , sent - , size - , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) - , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 - , dt_usec(&tv, &w->tv_ready) / 1000.0 - , dt_usec(&tv, &w->tv_in) / 1000.0 - , w->response.code - , strip_control_characters(this_query->query) - ); - - buffer_free(w->response.data); - buffer_free(w->response.header); - buffer_free(w->response.header_output); - freez(w); - buffer_free(local_buffer); - freez(encoded_response); - freez(encoded_header); - return 0; - } - return 1; -} - -static int aclk_execute_query_v2(struct aclk_query *this_query) -{ - int retval = 0; - usec_t t; - BUFFER *local_buffer = NULL; - struct aclk_cloud_req_v2 *cloud_req = (struct aclk_cloud_req_v2 *)this_query->data; - -#ifdef NETDATA_WITH_ZLIB - int z_ret; - BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - char *start, *end; -#endif - - struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() - w->cookie1[0] = 0; // Simulate web_client_create_on_fd() - w->cookie2[0] = 0; // Simulate web_client_create_on_fd() - w->acl = 0x1f; - - char *mysep = strchr(this_query->query, '?'); - if (mysep) { - url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); - *mysep = '\0'; - } else - url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1); - - mysep = strrchr(this_query->query, '/'); - - // execute the query - w->tv_in = this_query->tv_in; - now_realtime_timeval(&w->tv_ready); - t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); - size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len; - size_t sent = size; - -#ifdef NETDATA_WITH_ZLIB - // check if gzip encoding can and should be used - if ((start = strstr(cloud_req->data, WEB_HDR_ACCEPT_ENC))) { - start += strlen(WEB_HDR_ACCEPT_ENC); - end = strstr(start, "\x0D\x0A"); - start = strstr(start, "gzip"); - - if (start && start < end) { - w->response.zstream.zalloc = Z_NULL; - w->response.zstream.zfree = Z_NULL; - w->response.zstream.opaque = Z_NULL; - if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { - w->response.zinitialized = 1; - w->response.zoutput = 1; - } else - error("Failed to initialize zlib. Proceeding without compression."); - } - } - - if (w->response.data->len && w->response.zinitialized) { - w->response.zstream.next_in = (Bytef *)w->response.data->buffer; - w->response.zstream.avail_in = w->response.data->len; - do { - w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE; - w->response.zstream.next_out = w->response.zbuffer; - z_ret = deflate(&w->response.zstream, Z_FINISH); - if(z_ret < 0) { - if(w->response.zstream.msg) - error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); - else - error("Unknown error during zlib compression."); - retval = 1; - goto cleanup; - } - int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; - buffer_need_bytes(z_buffer, bytes_to_cpy); - memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy); - z_buffer->len += bytes_to_cpy; - } while(z_ret != Z_STREAM_END); - // so that web_client_build_http_header - // puts correct content length into header - buffer_free(w->response.data); - w->response.data = z_buffer; - z_buffer = NULL; - } -#endif - - w->response.data->date = w->tv_ready.tv_sec; - web_client_build_http_header(w); - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - local_buffer->contenttype = CT_APPLICATION_JSON; - - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); - buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); - buffer_strcat(local_buffer, w->response.header_output->buffer); - - if (w->response.data->len) { -#ifdef NETDATA_WITH_ZLIB - if (w->response.zinitialized) { - buffer_need_bytes(local_buffer, w->response.data->len); - memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); - local_buffer->len += w->response.data->len; - sent = sent - size + w->response.data->len; - } else { -#endif - buffer_strcat(local_buffer, w->response.data->buffer); -#ifdef NETDATA_WITH_ZLIB - } -#endif - } - - aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id); - - struct timeval tv; - now_realtime_timeval(&tv); - - log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", - w->id - , gettid() - , this_query->idx - , "DATA" - , sent - , size - , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) - , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 - , dt_usec(&tv, &w->tv_ready) / 1000.0 - , dt_usec(&tv, &w->tv_in) / 1000.0 - , w->response.code - , strip_control_characters(this_query->query) - ); -cleanup: -#ifdef NETDATA_WITH_ZLIB - if(w->response.zinitialized) - deflateEnd(&w->response.zstream); - buffer_free(z_buffer); -#endif - buffer_free(w->response.data); - buffer_free(w->response.header); - buffer_free(w->response.header_output); - freez(w); - buffer_free(local_buffer); - return retval; -} - -#define ACLK_HOST_PTR_COMPULSORY(x) \ - if (unlikely(!host)) { \ - errno = 0; \ - error(x " needs host pointer"); \ - break; \ - } - -/* - * This function will fetch the next pending command and process it - * - */ -static int aclk_process_query(struct aclk_query_thread *t_info) -{ - struct aclk_query *this_query; - static long int query_count = 0; - ACLK_METADATA_STATE meta_state; - RRDHOST *host; - - if (!aclk_connected) - return 0; - - this_query = aclk_queue_pop(); - if (likely(!this_query)) { - return 0; - } - - if (unlikely(this_query->deleted)) { - debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); - aclk_query_free(this_query); - return 1; - } - query_count++; - - host = (RRDHOST*)this_query->data; - this_query->idx = t_info->idx; - - debug( - D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic, - this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS); - - switch (this_query->cmd) { - case ACLK_CMD_ONCONNECT: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); -#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { - error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); - break; - } -#else -#warning "This check became unnecessary. Remove" -#endif - - debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"", - host->hostname, - host->machine_guid); - - rrdhost_aclk_state_lock(host); - meta_state = host->aclk_state.metadata; - host->aclk_state.metadata = ACLK_METADATA_SENT; - rrdhost_aclk_state_unlock(host); - aclk_send_metadata(meta_state, host); - break; - - case ACLK_CMD_CHART: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART"); - - debug(D_ACLK, "EXECUTING a chart update command"); - aclk_send_single_chart(host, this_query->query); - break; - - case ACLK_CMD_CHARTDEL: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL"); - - debug(D_ACLK, "EXECUTING a chart delete command"); - //TODO: This send the info metadata for now - legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host); - break; - - case ACLK_CMD_ALARM: - debug(D_ACLK, "EXECUTING an alarm update command"); - aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); - break; - - case ACLK_CMD_CLOUD: - debug(D_ACLK, "EXECUTING a cloud command"); - aclk_execute_query(this_query); - break; - case ACLK_CMD_CLOUD_QUERY_2: - debug(D_ACLK, "EXECUTING Cloud Query v2"); - aclk_execute_query_v2(this_query); - break; - - case ACLK_CMD_CHILD_CONNECT: - case ACLK_CMD_CHILD_DISCONNECT: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT"); - - debug( - D_ACLK, "Execution Child %s command", - this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect"); - aclk_send_info_child_connection(host, this_query->cmd); - break; - - default: - errno = 0; - error("Unknown ACLK Query Command"); - break; - } - debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.queries_dispatched++; - legacy_aclk_queries_per_thread[t_info->idx]++; - LEGACY_ACLK_STATS_UNLOCK; - - if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) { - getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]); - getrusage_called_this_tick[t_info->idx]++; - } - - } - - aclk_query_free(this_query); - - return 1; -} - -void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) -{ - if (query_threads && query_threads->thread_list) { - for (int i = 0; i < query_threads->count; i++) { - netdata_thread_join(query_threads->thread_list[i].thread, NULL); - } - freez(query_threads->thread_list); - } - - struct aclk_query *this_query; - - do { - this_query = aclk_queue_pop(); - aclk_query_free(this_query); - } while (this_query); -} - -#define TASK_LEN_MAX 22 -void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads) -{ - info("Starting %d query threads.", query_threads->count); - - char thread_name[TASK_LEN_MAX]; - query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); - for (int i = 0; i < query_threads->count; i++) { - query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics - - if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) - error("snprintf encoding error"); - netdata_thread_create( - &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread, - &query_threads->thread_list[i]); - } -} - -/** - * Checks and updates popcorning state of rrdhost - * returns actual/updated popcorning state - */ - -ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host) -{ - rrdhost_aclk_state_lock(host); - ACLK_AGENT_STATE ret = host->aclk_state.state; - if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ - rrdhost_aclk_state_unlock(host); - return ret; - } - - if (!host->aclk_state.t_last_popcorn_update){ - rrdhost_aclk_state_unlock(host); - return ret; - } - - time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update; - - if (t_diff >= ACLK_STABLE_TIMEOUT) { - host->aclk_state.state = ACLK_HOST_STABLE; - host->aclk_state.t_last_popcorn_update = 0; - rrdhost_aclk_state_unlock(host); - info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff); - return ACLK_HOST_STABLE; - } - - rrdhost_aclk_state_unlock(host); - return ret; -} - -/** - * Main query processing thread - * - * On startup wait for the agent collectors to initialize - * Expect at least a time of ACLK_STABLE_TIMEOUT seconds - * of no new collectors coming in in order to mark the agent - * as stable (set agent_state = AGENT_STABLE) - */ -void *legacy_aclk_query_main_thread(void *ptr) -{ - struct aclk_query_thread *info = ptr; - - while (!netdata_exit) { - if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) { -#ifdef ACLK_DEBUG - _dump_collector_list(); -#endif - break; - } - sleep_usec(USEC_PER_SEC * 1); - } - - while (!netdata_exit) { - if(aclk_disable_runtime) { - sleep(1); - continue; - } - legacy_aclk_shared_state_LOCK; - if (unlikely(!legacy_aclk_shared_state.version_neg)) { - if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { - legacy_aclk_shared_state_UNLOCK; - info("Waiting for ACLK Version Negotiation message from Cloud"); - sleep(1); - continue; - } - info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds." - " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); - legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN; - aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg); - } - legacy_aclk_shared_state_UNLOCK; - - rrdhost_aclk_state_lock(localhost); - if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { - if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { - rrdhost_aclk_state_unlock(localhost); - errno = 0; - error("ACLK failed to queue on_connect command"); - sleep(1); - continue; - } - localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED; - } - rrdhost_aclk_state_unlock(localhost); - - legacy_aclk_shared_state_LOCK; - if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { - legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); - legacy_aclk_shared_state.next_popcorn_host = NULL; - aclk_update_next_child_to_popcorn(); - } - legacy_aclk_shared_state_UNLOCK; - - while (aclk_process_query(info)) { - // Process all commands - }; - - LEGACY_QUERY_THREAD_LOCK; - - // TODO: Need to check if there are queries awaiting already - if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait))) - sleep_usec(USEC_PER_SEC * 1); - - LEGACY_QUERY_THREAD_UNLOCK; - } - - return NULL; -} - -#ifndef __GNUC__ -#pragma endregion -#endif |