diff options
Diffstat (limited to '')
-rw-r--r-- | aclk/legacy/aclk_query.c | 96 |
1 files changed, 48 insertions, 48 deletions
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c index 040068e87..21eae11fd 100644 --- a/aclk/legacy/aclk_query.c +++ b/aclk/legacy/aclk_query.c @@ -2,15 +2,16 @@ #include "aclk_query.h" #include "aclk_stats.h" #include "aclk_rx_msgs.h" +#include "agent_cloud_link.h" #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" -pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; -pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; -#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) -#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +#define ACLK_QUERY_THREAD_NAME "ACLK_Query" -volatile int aclk_connected = 0; +pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait) +#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait) #ifndef __GNUC__ #pragma region ACLK_QUEUE @@ -188,7 +189,7 @@ aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd * Add a query to execute, the result will be send to the specified topic */ -int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) +int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) { struct aclk_query *new_query, *tmp_query; @@ -205,7 +206,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run if (unlikely(tmp_query)) { if (tmp_query->run_after == run_after) { ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -220,9 +221,9 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run } if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_queued++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.queries_queued++; + LEGACY_ACLK_STATS_UNLOCK; } new_query = callocz(1, sizeof(struct aclk_query)); @@ -255,7 +256,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run aclk_queue.aclk_query_tail = new_query; aclk_queue.count++; ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -264,7 +265,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run aclk_queue.count++; ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -332,12 +333,12 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) { usec_t t = now_boottime_usec(); - aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); w->response.code = web_client_api_request_v1(host, w, url); t = now_boottime_usec() - t; - aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t); return t; } @@ -375,7 +376,7 @@ static int aclk_execute_query(struct aclk_query *this_query) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); @@ -510,7 +511,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); buffer_strcat(local_buffer, w->response.header_output->buffer); @@ -607,7 +608,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) case ACLK_CMD_ONCONNECT: ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { + if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); break; } @@ -638,7 +639,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) debug(D_ACLK, "EXECUTING a chart delete command"); //TODO: This send the info metadata for now - aclk_send_info_metadata(ACLK_METADATA_SENT, host); + legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host); break; case ACLK_CMD_ALARM: @@ -673,10 +674,10 @@ static int aclk_process_query(struct aclk_query_thread *t_info) debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[t_info->idx]++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.queries_dispatched++; + legacy_aclk_queries_per_thread[t_info->idx]++; + LEGACY_ACLK_STATS_UNLOCK; if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) { getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]); @@ -690,7 +691,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) return 1; } -void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) { if (query_threads && query_threads->thread_list) { for (int i = 0; i < query_threads->count; i++) { @@ -707,8 +708,8 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) } while (this_query); } -#define TASK_LEN_MAX 16 -void aclk_query_threads_start(struct aclk_query_threads *query_threads) +#define TASK_LEN_MAX 22 +void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads) { info("Starting %d query threads.", query_threads->count); @@ -717,10 +718,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) for (int i = 0; i < query_threads->count; i++) { query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics - if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0)) + if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) error("snprintf encoding error"); netdata_thread_create( - &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread, &query_threads->thread_list[i]); } } @@ -730,10 +731,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) * returns actual/updated popcorning state */ -ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) +ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host) { rrdhost_aclk_state_lock(host); - ACLK_POPCORNING_STATE ret = host->aclk_state.state; + ACLK_AGENT_STATE ret = host->aclk_state.state; if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ rrdhost_aclk_state_unlock(host); return ret; @@ -766,7 +767,7 @@ ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) * of no new collectors coming in in order to mark the agent * as stable (set agent_state = AGENT_STABLE) */ -void *aclk_query_main_thread(void *ptr) +void *legacy_aclk_query_main_thread(void *ptr) { struct aclk_query_thread *info = ptr; @@ -785,25 +786,24 @@ void *aclk_query_main_thread(void *ptr) sleep(1); continue; } - ACLK_SHARED_STATE_LOCK; - if (unlikely(!aclk_shared_state.version_neg)) { - if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + if (unlikely(!legacy_aclk_shared_state.version_neg)) { + if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { + legacy_aclk_shared_state_UNLOCK; info("Waiting for ACLK Version Negotiation message from Cloud"); sleep(1); continue; } - errno = 0; - error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." + info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds." " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); - aclk_shared_state.version_neg = ACLK_VERSION_MIN; - aclk_set_rx_handlers(aclk_shared_state.version_neg); + legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN; + aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; rrdhost_aclk_state_lock(localhost); if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { - if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { rrdhost_aclk_state_unlock(localhost); errno = 0; error("ACLK failed to queue on_connect command"); @@ -814,25 +814,25 @@ void *aclk_query_main_thread(void *ptr) } rrdhost_aclk_state_unlock(localhost); - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { - aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); - aclk_shared_state.next_popcorn_host = NULL; + legacy_aclk_shared_state_LOCK; + if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { + legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); + legacy_aclk_shared_state.next_popcorn_host = NULL; aclk_update_next_child_to_popcorn(); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; while (aclk_process_query(info)) { // Process all commands }; - QUERY_THREAD_LOCK; + LEGACY_QUERY_THREAD_LOCK; // TODO: Need to check if there are queries awaiting already - if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); - QUERY_THREAD_UNLOCK; + LEGACY_QUERY_THREAD_UNLOCK; } return NULL; |