summaryrefslogtreecommitdiffstats
path: root/aclk/legacy/aclk_query.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/legacy/aclk_query.c')
-rw-r--r--aclk/legacy/aclk_query.c96
1 files changed, 48 insertions, 48 deletions
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c
index 040068e87..21eae11fd 100644
--- a/aclk/legacy/aclk_query.c
+++ b/aclk/legacy/aclk_query.c
@@ -2,15 +2,16 @@
#include "aclk_query.h"
#include "aclk_stats.h"
#include "aclk_rx_msgs.h"
+#include "agent_cloud_link.h"
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
-pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
-pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
-#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
-#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
-volatile int aclk_connected = 0;
+pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait)
+#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait)
#ifndef __GNUC__
#pragma region ACLK_QUEUE
@@ -188,7 +189,7 @@ aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd
* Add a query to execute, the result will be send to the specified topic
*/
-int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
+int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
struct aclk_query *new_query, *tmp_query;
@@ -205,7 +206,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
if (unlikely(tmp_query)) {
if (tmp_query->run_after == run_after) {
ACLK_QUEUE_UNLOCK;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@@ -220,9 +221,9 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
}
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_queued++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.queries_queued++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
new_query = callocz(1, sizeof(struct aclk_query));
@@ -255,7 +256,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
aclk_queue.aclk_query_tail = new_query;
aclk_queue.count++;
ACLK_QUEUE_UNLOCK;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@@ -264,7 +265,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
aclk_queue.count++;
ACLK_QUEUE_UNLOCK;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@@ -332,12 +333,12 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
{
usec_t t = now_boottime_usec();
- aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
+ legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
w->response.code = web_client_api_request_v1(host, w, url);
t = now_boottime_usec() - t;
- aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t);
+ legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t);
return t;
}
@@ -375,7 +376,7 @@ static int aclk_execute_query(struct aclk_query *this_query)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
@@ -510,7 +511,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code);
buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A");
buffer_strcat(local_buffer, w->response.header_output->buffer);
@@ -607,7 +608,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
case ACLK_CMD_ONCONNECT:
ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
- if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
+ if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
break;
}
@@ -638,7 +639,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
debug(D_ACLK, "EXECUTING a chart delete command");
//TODO: This send the info metadata for now
- aclk_send_info_metadata(ACLK_METADATA_SENT, host);
+ legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host);
break;
case ACLK_CMD_ALARM:
@@ -673,10 +674,10 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_dispatched++;
- aclk_queries_per_thread[t_info->idx]++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.queries_dispatched++;
+ legacy_aclk_queries_per_thread[t_info->idx]++;
+ LEGACY_ACLK_STATS_UNLOCK;
if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) {
getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]);
@@ -690,7 +691,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
return 1;
}
-void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
+void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
{
if (query_threads && query_threads->thread_list) {
for (int i = 0; i < query_threads->count; i++) {
@@ -707,8 +708,8 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
} while (this_query);
}
-#define TASK_LEN_MAX 16
-void aclk_query_threads_start(struct aclk_query_threads *query_threads)
+#define TASK_LEN_MAX 22
+void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads)
{
info("Starting %d query threads.", query_threads->count);
@@ -717,10 +718,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
for (int i = 0; i < query_threads->count; i++) {
query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
- if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0))
+ if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
error("snprintf encoding error");
netdata_thread_create(
- &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
+ &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread,
&query_threads->thread_list[i]);
}
}
@@ -730,10 +731,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
* returns actual/updated popcorning state
*/
-ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
+ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host)
{
rrdhost_aclk_state_lock(host);
- ACLK_POPCORNING_STATE ret = host->aclk_state.state;
+ ACLK_AGENT_STATE ret = host->aclk_state.state;
if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
rrdhost_aclk_state_unlock(host);
return ret;
@@ -766,7 +767,7 @@ ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
* of no new collectors coming in in order to mark the agent
* as stable (set agent_state = AGENT_STABLE)
*/
-void *aclk_query_main_thread(void *ptr)
+void *legacy_aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
@@ -785,25 +786,24 @@ void *aclk_query_main_thread(void *ptr)
sleep(1);
continue;
}
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(!aclk_shared_state.version_neg)) {
- if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_LOCK;
+ if (unlikely(!legacy_aclk_shared_state.version_neg)) {
+ if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
+ legacy_aclk_shared_state_UNLOCK;
info("Waiting for ACLK Version Negotiation message from Cloud");
sleep(1);
continue;
}
- errno = 0;
- error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
+ info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds."
" Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
- aclk_shared_state.version_neg = ACLK_VERSION_MIN;
- aclk_set_rx_handlers(aclk_shared_state.version_neg);
+ legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN;
+ aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg);
}
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
rrdhost_aclk_state_lock(localhost);
if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
- if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
rrdhost_aclk_state_unlock(localhost);
errno = 0;
error("ACLK failed to queue on_connect command");
@@ -814,25 +814,25 @@ void *aclk_query_main_thread(void *ptr)
}
rrdhost_aclk_state_unlock(localhost);
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
- aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
- aclk_shared_state.next_popcorn_host = NULL;
+ legacy_aclk_shared_state_LOCK;
+ if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
+ legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ legacy_aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
while (aclk_process_query(info)) {
// Process all commands
};
- QUERY_THREAD_LOCK;
+ LEGACY_QUERY_THREAD_LOCK;
// TODO: Need to check if there are queries awaiting already
- if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
- QUERY_THREAD_UNLOCK;
+ LEGACY_QUERY_THREAD_UNLOCK;
}
return NULL;