diff options
Diffstat (limited to 'aclk/legacy')
-rw-r--r-- | aclk/legacy/aclk_common.c | 213 | ||||
-rw-r--r-- | aclk/legacy/aclk_common.h | 37 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_https_client.c | 6 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_https_client.h | 2 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.c | 27 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.h | 2 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.c | 96 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.h | 19 | ||||
-rw-r--r-- | aclk/legacy/aclk_rrdhost_state.h | 42 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.c | 75 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.h | 4 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.c | 134 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.h | 26 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.c | 432 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.h | 42 | ||||
-rw-r--r-- | aclk/legacy/mqtt.c | 10 | ||||
-rw-r--r-- | aclk/legacy/mqtt.h | 2 |
17 files changed, 351 insertions, 818 deletions
diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c index 96f955451..7f8368e44 100644 --- a/aclk/legacy/aclk_common.c +++ b/aclk/legacy/aclk_common.c @@ -1,201 +1,18 @@ #include "aclk_common.h" -#include "../../daemon/common.h" +#include "daemon/common.h" #ifdef ENABLE_ACLK #include <libwebsockets.h> #endif -netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +netdata_mutex_t legacy_aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; -int aclk_disable_runtime = 0; -int aclk_kill_link = 0; - -struct aclk_shared_state aclk_shared_state = { +struct legacy_aclk_shared_state legacy_aclk_shared_state = { .version_neg = 0, .version_neg_wait_till = 0 }; -struct { - ACLK_PROXY_TYPE type; - const char *url_str; -} supported_proxy_types[] = { - { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, -}; - -const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) -{ - switch (*type) { - case PROXY_DISABLED: - return "disabled"; - case PROXY_TYPE_HTTP: - return "HTTP"; - case PROXY_TYPE_SOCKS5: - return "SOCKS"; - default: - return "Unknown"; - } -} - -static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) -{ - int i = 0; - while (supported_proxy_types[i].url_str) { - if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) - return supported_proxy_types[i].type; - i++; - } - return PROXY_TYPE_UNKNOWN; -} - -ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) -{ - if (!string) - return PROXY_TYPE_UNKNOWN; - - while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove) - string++; - - if (!*string) - return PROXY_TYPE_UNKNOWN; - - return aclk_find_proxy(string); -} - -// helper function to censor user&password -// for logging purposes -void safe_log_proxy_censor(char *proxy) -{ - size_t length = strlen(proxy); - char *auth = proxy + length - 1; - char *cur; - - while ((auth >= proxy) && (*auth != '@')) - auth--; - - //if not found or @ is first char do nothing - if (auth <= proxy) - return; - - cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); - if (!cur) - cur = proxy; - else - cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); - - while (cur < auth) { - *cur = 'X'; - cur++; - } -} - -static inline void safe_log_proxy_error(char *str, const char *proxy) -{ - char *log = strdupz(proxy); - safe_log_proxy_censor(log); - error("%s Provided Value:\"%s\"", str, log); - freez(log); -} - -static inline int check_socks_environment(const char **proxy) -{ - char *tmp = getenv("socks_proxy"); - - if (!tmp) - return 1; - - if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { - *proxy = tmp; - return 0; - } - - safe_log_proxy_error( - "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", - tmp); - return 1; -} - -static inline int check_http_environment(const char **proxy) -{ - char *tmp = getenv("http_proxy"); - - if (!tmp) - return 1; - - if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { - *proxy = tmp; - return 0; - } - - safe_log_proxy_error( - "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", - tmp); - return 1; -} - -const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) -{ - const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); - *type = PROXY_DISABLED; - - if (strcmp(proxy, "none") == 0) - return proxy; - - if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { - if (check_socks_environment(&proxy) == 0) { -#ifdef LWS_WITH_SOCKS5 - *type = PROXY_TYPE_SOCKS5; - return proxy; -#else - safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " - "but Libwebsockets used doesn't have SOCKS5 support built in. " - "Ignoring and checking for other options.", - proxy); -#endif - } - if (check_http_environment(&proxy) == 0) - *type = PROXY_TYPE_HTTP; - return proxy; - } - - *type = aclk_verify_proxy(proxy); -#ifndef LWS_WITH_SOCKS5 - if (*type == PROXY_TYPE_SOCKS5) { - safe_log_proxy_error( - "Config var \"" ACLK_PROXY_CONFIG_VAR - "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", - proxy); - } -#endif - if (*type == PROXY_TYPE_UNKNOWN) { - *type = PROXY_DISABLED; - safe_log_proxy_error( - "Config var \"" ACLK_PROXY_CONFIG_VAR - "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", - proxy); - } - - return proxy; -} - -// helper function to read settings only once (static) -// as claiming, challenge/response and ACLK -// read the same thing, no need to parse again -const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) -{ - static const char *proxy = NULL; - static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; - - if (proxy_type == PROXY_NOT_SET) - proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); - - *type = proxy_type; - return proxy; -} - int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) { int pos = 0; @@ -234,27 +51,3 @@ int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); return 0; } - -struct label *add_aclk_host_labels(struct label *label) { -#ifdef ENABLE_ACLK - ACLK_PROXY_TYPE aclk_proxy; - char *proxy_str; - aclk_get_proxy(&aclk_proxy); - - switch(aclk_proxy) { - case PROXY_TYPE_SOCKS5: - proxy_str = "SOCKS5"; - break; - case PROXY_TYPE_HTTP: - proxy_str = "HTTP"; - break; - default: - proxy_str = "none"; - break; - } - label = add_label_to_list(label, "_aclk_impl", "Legacy", LABEL_SOURCE_AUTO); - return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); -#else - return label; -#endif -} diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h index eedb5b51c..080680ff1 100644 --- a/aclk/legacy/aclk_common.h +++ b/aclk/legacy/aclk_common.h @@ -1,12 +1,12 @@ #ifndef ACLK_COMMON_H #define ACLK_COMMON_H -#include "aclk_rrdhost_state.h" -#include "../../daemon/common.h" +#include "../aclk_rrdhost_state.h" +#include "daemon/common.h" -extern netdata_mutex_t aclk_shared_state_mutex; -#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) -#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) +extern netdata_mutex_t legacy_aclk_shared_state_mutex; +#define legacy_aclk_shared_state_LOCK netdata_mutex_lock(&legacy_aclk_shared_state_mutex) +#define legacy_aclk_shared_state_UNLOCK netdata_mutex_unlock(&legacy_aclk_shared_state_mutex) // minimum and maximum supported version of ACLK // in this version of agent @@ -33,8 +33,8 @@ extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING) #define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update) -extern struct aclk_shared_state { - // optimization to avoid looping trough hosts +extern struct legacy_aclk_shared_state { + // optimization to avoid looping through hosts // every time Query Thread wakes up RRDHOST *next_popcorn_host; @@ -42,31 +42,10 @@ extern struct aclk_shared_state { // protect by lock otherwise int version_neg; usec_t version_neg_wait_till; -} aclk_shared_state; - -typedef enum aclk_proxy_type { - PROXY_TYPE_UNKNOWN = 0, - PROXY_TYPE_SOCKS5, - PROXY_TYPE_HTTP, - PROXY_DISABLED, - PROXY_NOT_SET, -} ACLK_PROXY_TYPE; - -extern int aclk_kill_link; // Tells the agent to tear down the link -extern int aclk_disable_runtime; +} legacy_aclk_shared_state; const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); -#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" -#define ACLK_PROXY_ENV "env" -#define ACLK_PROXY_CONFIG_VAR "proxy" - -ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); -const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); -void safe_log_proxy_censor(char *proxy); int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); -const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); - -struct label *add_aclk_host_labels(struct label *label); #endif //ACLK_COMMON_H diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c index f41a230db..8a490c6f4 100644 --- a/aclk/legacy/aclk_lws_https_client.c +++ b/aclk/legacy/aclk_lws_https_client.c @@ -2,13 +2,7 @@ #define ACLK_LWS_HTTPS_CLIENT_INTERNAL #include "aclk_lws_https_client.h" - -#ifndef ACLK_NG #include "aclk_common.h" -#else -#include "../aclk.h" -#endif - #include "aclk_lws_wss_client.h" #define SMALL_BUFFER 16 diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h index 811809dd1..5f30a37fd 100644 --- a/aclk/legacy/aclk_lws_https_client.h +++ b/aclk/legacy/aclk_lws_https_client.h @@ -3,7 +3,7 @@ #ifndef NETDATA_LWS_HTTPS_CLIENT_H #define NETDATA_LWS_HTTPS_CLIENT_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" #define DATAMAXLEN 1024*16 diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c index f73902b30..012f2a8cc 100644 --- a/aclk/legacy/aclk_lws_wss_client.c +++ b/aclk/legacy/aclk_lws_wss_client.c @@ -3,9 +3,10 @@ #include "aclk_lws_wss_client.h" #include "libnetdata/libnetdata.h" -#include "../../daemon/common.h" +#include "daemon/common.h" #include "aclk_common.h" #include "aclk_stats.h" +#include "../aclk_proxy.h" extern int aclk_shutting_down; @@ -450,9 +451,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas if (n>=0) { data->written += n; if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.write_q_consumed += n; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.write_q_consumed += n; + LEGACY_ACLK_STATS_UNLOCK; } } //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc); @@ -473,9 +474,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas retval = 1; aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.read_q_added += len; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.read_q_added += len; + LEGACY_ACLK_STATS_UNLOCK; } // to future myself -> do not call this while read lock is active as it will eventually @@ -553,9 +554,9 @@ int aclk_lws_wss_client_write(void *buf, size_t count) aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.write_q_added += count; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.write_q_added += count; + LEGACY_ACLK_STATS_UNLOCK; } lws_callback_on_writable(engine_instance->lws_wsi); @@ -584,9 +585,9 @@ int aclk_lws_wss_client_read(void *buf, size_t count) engine_instance->data_to_read = 0; if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.read_q_consumed += data_to_be_read; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.read_q_consumed += data_to_be_read; + LEGACY_ACLK_STATS_UNLOCK; } abort: diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h index eb99ee024..c68649cf3 100644 --- a/aclk/legacy/aclk_lws_wss_client.h +++ b/aclk/legacy/aclk_lws_wss_client.h @@ -58,7 +58,7 @@ struct aclk_lws_wss_engine_instance { struct lws_wss_packet_buffer *write_buffer_head; struct lws_ring *read_ringbuffer; - //flags to be readed by engine user + //flags to be read by engine user int websocket_connection_up; // currently this is by default disabled diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c index 040068e87..21eae11fd 100644 --- a/aclk/legacy/aclk_query.c +++ b/aclk/legacy/aclk_query.c @@ -2,15 +2,16 @@ #include "aclk_query.h" #include "aclk_stats.h" #include "aclk_rx_msgs.h" +#include "agent_cloud_link.h" #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" -pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; -pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; -#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) -#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +#define ACLK_QUERY_THREAD_NAME "ACLK_Query" -volatile int aclk_connected = 0; +pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait) +#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait) #ifndef __GNUC__ #pragma region ACLK_QUEUE @@ -188,7 +189,7 @@ aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd * Add a query to execute, the result will be send to the specified topic */ -int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) +int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) { struct aclk_query *new_query, *tmp_query; @@ -205,7 +206,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run if (unlikely(tmp_query)) { if (tmp_query->run_after == run_after) { ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -220,9 +221,9 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run } if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_queued++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.queries_queued++; + LEGACY_ACLK_STATS_UNLOCK; } new_query = callocz(1, sizeof(struct aclk_query)); @@ -255,7 +256,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run aclk_queue.aclk_query_tail = new_query; aclk_queue.count++; ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -264,7 +265,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run aclk_queue.count++; ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -332,12 +333,12 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) { usec_t t = now_boottime_usec(); - aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); w->response.code = web_client_api_request_v1(host, w, url); t = now_boottime_usec() - t; - aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t); return t; } @@ -375,7 +376,7 @@ static int aclk_execute_query(struct aclk_query *this_query) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); @@ -510,7 +511,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); buffer_strcat(local_buffer, w->response.header_output->buffer); @@ -607,7 +608,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) case ACLK_CMD_ONCONNECT: ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { + if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); break; } @@ -638,7 +639,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) debug(D_ACLK, "EXECUTING a chart delete command"); //TODO: This send the info metadata for now - aclk_send_info_metadata(ACLK_METADATA_SENT, host); + legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host); break; case ACLK_CMD_ALARM: @@ -673,10 +674,10 @@ static int aclk_process_query(struct aclk_query_thread *t_info) debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[t_info->idx]++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.queries_dispatched++; + legacy_aclk_queries_per_thread[t_info->idx]++; + LEGACY_ACLK_STATS_UNLOCK; if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) { getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]); @@ -690,7 +691,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) return 1; } -void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) { if (query_threads && query_threads->thread_list) { for (int i = 0; i < query_threads->count; i++) { @@ -707,8 +708,8 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) } while (this_query); } -#define TASK_LEN_MAX 16 -void aclk_query_threads_start(struct aclk_query_threads *query_threads) +#define TASK_LEN_MAX 22 +void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads) { info("Starting %d query threads.", query_threads->count); @@ -717,10 +718,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) for (int i = 0; i < query_threads->count; i++) { query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics - if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0)) + if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) error("snprintf encoding error"); netdata_thread_create( - &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread, &query_threads->thread_list[i]); } } @@ -730,10 +731,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) * returns actual/updated popcorning state */ -ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) +ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host) { rrdhost_aclk_state_lock(host); - ACLK_POPCORNING_STATE ret = host->aclk_state.state; + ACLK_AGENT_STATE ret = host->aclk_state.state; if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ rrdhost_aclk_state_unlock(host); return ret; @@ -766,7 +767,7 @@ ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) * of no new collectors coming in in order to mark the agent * as stable (set agent_state = AGENT_STABLE) */ -void *aclk_query_main_thread(void *ptr) +void *legacy_aclk_query_main_thread(void *ptr) { struct aclk_query_thread *info = ptr; @@ -785,25 +786,24 @@ void *aclk_query_main_thread(void *ptr) sleep(1); continue; } - ACLK_SHARED_STATE_LOCK; - if (unlikely(!aclk_shared_state.version_neg)) { - if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + if (unlikely(!legacy_aclk_shared_state.version_neg)) { + if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { + legacy_aclk_shared_state_UNLOCK; info("Waiting for ACLK Version Negotiation message from Cloud"); sleep(1); continue; } - errno = 0; - error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." + info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds." " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); - aclk_shared_state.version_neg = ACLK_VERSION_MIN; - aclk_set_rx_handlers(aclk_shared_state.version_neg); + legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN; + aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; rrdhost_aclk_state_lock(localhost); if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { - if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { rrdhost_aclk_state_unlock(localhost); errno = 0; error("ACLK failed to queue on_connect command"); @@ -814,25 +814,25 @@ void *aclk_query_main_thread(void *ptr) } rrdhost_aclk_state_unlock(localhost); - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { - aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); - aclk_shared_state.next_popcorn_host = NULL; + legacy_aclk_shared_state_LOCK; + if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { + legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); + legacy_aclk_shared_state.next_popcorn_host = NULL; aclk_update_next_child_to_popcorn(); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; while (aclk_process_query(info)) { // Process all commands }; - QUERY_THREAD_LOCK; + LEGACY_QUERY_THREAD_LOCK; // TODO: Need to check if there are queries awaiting already - if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); - QUERY_THREAD_UNLOCK; + LEGACY_QUERY_THREAD_UNLOCK; } return NULL; diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h index 026985c8d..622b66e2c 100644 --- a/aclk/legacy/aclk_query.h +++ b/aclk/legacy/aclk_query.h @@ -10,14 +10,11 @@ #define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread. -extern pthread_cond_t query_cond_wait; -extern pthread_mutex_t query_lock_wait; +extern pthread_cond_t legacy_query_cond_wait; +extern pthread_mutex_t legacy_query_lock_wait; extern uint8_t *getrusage_called_this_tick; -#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) -#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) - -extern volatile int aclk_connected; - +#define LEGACY_QUERY_THREAD_WAKEUP pthread_cond_signal(&legacy_query_cond_wait) +#define LEGACY_QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&legacy_query_cond_wait) struct aclk_query_thread { netdata_thread_t thread; int idx; @@ -34,11 +31,11 @@ struct aclk_cloud_req_v2 { char *query_endpoint; }; -void *aclk_query_main_thread(void *ptr); -int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); +void *legacy_aclk_query_main_thread(void *ptr); +int legacy_aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); -void aclk_query_threads_start(struct aclk_query_threads *query_threads); -void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); +void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads); +void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); unsigned int aclk_query_size(); #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/aclk_rrdhost_state.h b/aclk/legacy/aclk_rrdhost_state.h deleted file mode 100644 index 7ab3a502e..000000000 --- a/aclk/legacy/aclk_rrdhost_state.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef ACLK_RRDHOST_STATE_H -#define ACLK_RRDHOST_STATE_H - -#include "../../libnetdata/libnetdata.h" - -typedef enum aclk_cmd { - ACLK_CMD_CLOUD, - ACLK_CMD_ONCONNECT, - ACLK_CMD_INFO, - ACLK_CMD_CHART, - ACLK_CMD_CHARTDEL, - ACLK_CMD_ALARM, - ACLK_CMD_CLOUD_QUERY_2, - ACLK_CMD_CHILD_CONNECT, - ACLK_CMD_CHILD_DISCONNECT -} ACLK_CMD; - -typedef enum aclk_metadata_state { - ACLK_METADATA_REQUIRED, - ACLK_METADATA_CMD_QUEUED, - ACLK_METADATA_SENT -} ACLK_METADATA_STATE; - -typedef enum aclk_agent_state { - ACLK_HOST_INITIALIZING, - ACLK_HOST_STABLE -} ACLK_POPCORNING_STATE; - -typedef struct aclk_rrdhost_state { - char *claimed_id; // Claimed ID if host has one otherwise NULL - -#ifdef ENABLE_ACLK - // per child popcorning - ACLK_POPCORNING_STATE state; - ACLK_METADATA_STATE metadata; - - time_t timestamp_created; - time_t t_last_popcorn_update; -#endif /* ENABLE_ACLK */ -} aclk_rrdhost_state; - -#endif /* ACLK_RRDHOST_STATE_H */ diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c index 68dad81e0..d4778bbcf 100644 --- a/aclk/legacy/aclk_rx_msgs.c +++ b/aclk/legacy/aclk_rx_msgs.c @@ -4,6 +4,7 @@ #include "aclk_common.h" #include "aclk_stats.h" #include "aclk_query.h" +#include "agent_cloud_link.h" #ifndef UUID_STR_LEN #define UUID_STR_LEN 37 @@ -107,7 +108,7 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha error( "Received \"http\" message from Cloud with version %d, but ACLK version %d is used", cloud_to_agent->version, - aclk_shared_state.version_neg); + legacy_aclk_shared_state.version_neg); return 1; } @@ -126,14 +127,14 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha return 1; } - if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) + if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_v1++; - aclk_metrics_per_sample.cloud_req_ok++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_v1++; + legacy_aclk_metrics_per_sample.cloud_req_ok++; + LEGACY_ACLK_STATS_UNLOCK; } return 0; @@ -181,11 +182,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha } // we do this here due to cloud_req being taken over by query thread - // which if crazy quick can free it after aclk_queue_query + // which if crazy quick can free it after legacy_aclk_queue_query stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint); - // aclk_queue_query takes ownership of data pointer - if (unlikely(aclk_queue_query( + // legacy_aclk_queue_query takes ownership of data pointer + if (unlikely(legacy_aclk_queue_query( cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD_QUERY_2))) { error("ACLK failed to queue incoming \"http\" v2 message"); @@ -193,11 +194,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha } if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_v2++; - aclk_metrics_per_sample.cloud_req_ok++; - aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_v2++; + legacy_aclk_metrics_per_sample.cloud_req_ok++; + legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; + LEGACY_ACLK_STATS_UNLOCK; } return 0; @@ -258,19 +259,19 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); - ACLK_SHARED_STATE_LOCK; - if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + legacy_aclk_shared_state_LOCK; + if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) { errno = 0; error("The \"version\" message came too late ignoring."); goto err_cleanup; } - if (unlikely(aclk_shared_state.version_neg)) { + if (unlikely(legacy_aclk_shared_state.version_neg)) { errno = 0; - error("Version has already been set to %d", aclk_shared_state.version_neg); + error("Version has already been set to %d", legacy_aclk_shared_state.version_neg); goto err_cleanup; } - aclk_shared_state.version_neg = version; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state.version_neg = version; + legacy_aclk_shared_state_UNLOCK; info("Choosing version %d of ACLK", version); @@ -279,7 +280,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha return 0; err_cleanup: - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return 1; } @@ -288,31 +289,31 @@ typedef struct aclk_incoming_msg_type{ int(*fnc)(struct aclk_request *, char *); }aclk_incoming_msg_type; -aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = { +aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v1 }, { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; -aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { +aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; -struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1; +struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; void aclk_set_rx_handlers(int version) { if(version >= ACLK_V_COMPRESSION) { - aclk_incoming_msg_types = aclk_incoming_msg_types_compression; + legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression; return; } - aclk_incoming_msg_types = aclk_incoming_msg_types_v1; + legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; } -int aclk_handle_cloud_message(char *payload) +int legacy_aclk_handle_cloud_message(char *payload) { struct aclk_request cloud_to_agent; memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); @@ -325,7 +326,7 @@ int aclk_handle_cloud_message(char *payload) debug(D_ACLK, "ACLK incoming message (%s)", payload); - int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse); if (unlikely(rc != JSON_OK)) { errno = 0; @@ -339,22 +340,22 @@ int aclk_handle_cloud_message(char *payload) goto err_cleanup; } - if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { + if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); goto err_cleanup; } - for (int i = 0; aclk_incoming_msg_types[i].name; i++) { - if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { - if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { + for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) { + if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { // in case of success handler is supposed to clean up after itself // or as in the case of aclk_handle_cloud_request take // ownership of the pointers (done to avoid copying) - // see what `aclk_queue_query` parameter `internal` does + // see what `legacy_aclk_queue_query` parameter `internal` does // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! // msg handlers (namely aclk_handle_version_response) - // can freely change what aclk_incoming_msg_types points to + // can freely change what legacy_aclk_incoming_msg_types points to // so either exit or restart this for loop freez(cloud_to_agent.type_id); return 0; @@ -378,9 +379,9 @@ err_cleanup: err_cleanup_nojson: if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_err++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_err++; + LEGACY_ACLK_STATS_UNLOCK; } return 1; diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h index 3095e41a7..f1f99114f 100644 --- a/aclk/legacy/aclk_rx_msgs.h +++ b/aclk/legacy/aclk_rx_msgs.h @@ -3,10 +3,10 @@ #ifndef NETDATA_ACLK_RX_MSGS_H #define NETDATA_ACLK_RX_MSGS_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" -int aclk_handle_cloud_message(char *payload); +int legacy_aclk_handle_cloud_message(char *payload); void aclk_set_rx_handlers(int version); diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c index 88679cb3c..fbbb322a1 100644 --- a/aclk/legacy/aclk_stats.c +++ b/aclk/legacy/aclk_stats.c @@ -1,33 +1,31 @@ #include "aclk_stats.h" -netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; +netdata_mutex_t legacy_aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; -int aclk_stats_enabled; - -int query_thread_count; +int legacy_query_thread_count; // data ACLK stats need per query thread -struct aclk_qt_data { +struct legacy_aclk_qt_data { RRDDIM *dim; -} *aclk_qt_data = NULL; +} *legacy_aclk_qt_data = NULL; // ACLK per query thread cpu stats -struct aclk_cpu_data { +struct legacy_aclk_cpu_data { RRDDIM *user; RRDDIM *system; RRDSET *st; -} *aclk_cpu_data = NULL; +} *legacy_aclk_cpu_data = NULL; -uint32_t *aclk_queries_per_thread = NULL; -uint32_t *aclk_queries_per_thread_sample = NULL; +uint32_t *legacy_aclk_queries_per_thread = NULL; +uint32_t *legacy_aclk_queries_per_thread_sample = NULL; struct rusage *rusage_per_thread; uint8_t *getrusage_called_this_tick = NULL; -struct aclk_metrics aclk_metrics = { +static struct legacy_aclk_metrics legacy_aclk_metrics = { .online = 0, }; -struct aclk_metrics_per_sample aclk_metrics_per_sample; +struct legacy_aclk_metrics_per_sample legacy_aclk_metrics_per_sample; struct aclk_mat_metrics aclk_mat_metrics = { #ifdef NETDATA_INTERNAL_CHECKS @@ -61,20 +59,20 @@ struct aclk_mat_metrics aclk_mat_metrics = { "by query thread (just before passing to the database)." } }; -void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) +void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) { if (aclk_stats_enabled) { - ACLK_STATS_LOCK; + LEGACY_ACLK_STATS_LOCK; if (metric->max < measurement) metric->max = measurement; metric->total += measurement; metric->count++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_UNLOCK; } } -static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent) +static void aclk_stats_collect(struct legacy_aclk_metrics_per_sample *per_sample, struct legacy_aclk_metrics *permanent) { static RRDSET *st_aclkstats = NULL; static RRDDIM *rd_online_status = NULL; @@ -93,7 +91,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc rrdset_done(st_aclkstats); } -static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_query_queue(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st_query_thread = NULL; static RRDDIM *rd_queued = NULL; @@ -115,7 +113,7 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) rrdset_done(st_query_thread); } -static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_write_q(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_wq_add = NULL; @@ -137,7 +135,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_read_q(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_rq_add = NULL; @@ -159,7 +157,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_cloud_req(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_rq_ok = NULL; @@ -181,7 +179,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -static void aclk_stats_cloud_req_version(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_cloud_req_version(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_rq_v1 = NULL; @@ -223,7 +221,7 @@ int aclk_cloud_req_type_to_idx(const char *name) return 0; } -static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_cloud_req_cmd(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st; static int initialized = 0; @@ -246,7 +244,7 @@ static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -#define MAX_DIM_NAME 16 +#define MAX_DIM_NAME 22 static void aclk_stats_query_threads(uint32_t *queries_per_thread) { static RRDSET *st = NULL; @@ -258,16 +256,16 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - for (int i = 0; i < query_thread_count; i++) { - if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) + for (int i = 0; i < legacy_query_thread_count; i++) { + if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) error("snprintf encoding error"); - aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + legacy_aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } } else rrdset_next(st); - for (int i = 0; i < query_thread_count; i++) { - rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); + for (int i = 0; i < legacy_query_thread_count; i++) { + rrddim_set_by_pointer(st, legacy_aclk_qt_data[i].dim, queries_per_thread[i]); } rrdset_done(st); @@ -301,59 +299,59 @@ static void aclk_stats_cpu_threads(void) char id[100 + 1]; char title[100 + 1]; - for (int i = 0; i < query_thread_count; i++) { - if (unlikely(!aclk_cpu_data[i].st)) { + for (int i = 0; i < legacy_query_thread_count; i++) { + if (unlikely(!legacy_aclk_cpu_data[i].st)) { snprintfz(id, 100, "aclk_thread%d_cpu", i); snprintfz(title, 100, "Cpu Usage For Thread No %d", i); - aclk_cpu_data[i].st = rrdset_create_localhost( + legacy_aclk_cpu_data[i].st = rrdset_create_localhost( "netdata", id, NULL, "aclk", NULL, title, "milliseconds/s", "netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - aclk_cpu_data[i].user = rrddim_add(aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - aclk_cpu_data[i].system = rrddim_add(aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + legacy_aclk_cpu_data[i].user = rrddim_add(legacy_aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + legacy_aclk_cpu_data[i].system = rrddim_add(legacy_aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); } else - rrdset_next(aclk_cpu_data[i].st); + rrdset_next(legacy_aclk_cpu_data[i].st); } - for (int i = 0; i < query_thread_count; i++) { - rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec); - rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec); - rrdset_done(aclk_cpu_data[i].st); + for (int i = 0; i < legacy_query_thread_count; i++) { + rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec); + rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec); + rrdset_done(legacy_aclk_cpu_data[i].st); } } -void aclk_stats_thread_cleanup() +void legacy_aclk_stats_thread_cleanup() { - freez(aclk_qt_data); - freez(aclk_queries_per_thread); - freez(aclk_queries_per_thread_sample); - freez(aclk_cpu_data); + freez(legacy_aclk_qt_data); + freez(legacy_aclk_queries_per_thread); + freez(legacy_aclk_queries_per_thread_sample); + freez(legacy_aclk_cpu_data); freez(rusage_per_thread); } -void *aclk_stats_main_thread(void *ptr) +void *legacy_aclk_stats_main_thread(void *ptr) { struct aclk_stats_thread *args = ptr; - query_thread_count = args->query_thread_count; - aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); - aclk_cpu_data = callocz(query_thread_count, sizeof(struct aclk_cpu_data)); - aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); - aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); - rusage_per_thread = callocz(query_thread_count, sizeof(struct rusage)); - getrusage_called_this_tick = callocz(query_thread_count, sizeof(uint8_t)); + legacy_query_thread_count = args->query_thread_count; + legacy_aclk_qt_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_qt_data)); + legacy_aclk_cpu_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_cpu_data)); + legacy_aclk_queries_per_thread = callocz(legacy_query_thread_count, sizeof(uint32_t)); + legacy_aclk_queries_per_thread_sample = callocz(legacy_query_thread_count, sizeof(uint32_t)); + rusage_per_thread = callocz(legacy_query_thread_count, sizeof(struct rusage)); + getrusage_called_this_tick = callocz(legacy_query_thread_count, sizeof(uint8_t)); heartbeat_t hb; heartbeat_init(&hb); usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; - memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample)); - struct aclk_metrics_per_sample per_sample; - struct aclk_metrics permanent; + struct legacy_aclk_metrics_per_sample per_sample; + struct legacy_aclk_metrics permanent; while (!netdata_exit) { netdata_thread_testcancel(); @@ -363,17 +361,17 @@ void *aclk_stats_main_thread(void *ptr) heartbeat_next(&hb, step_ut); if (netdata_exit) break; - ACLK_STATS_LOCK; + LEGACY_ACLK_STATS_LOCK; // to not hold lock longer than necessary, especially not to hold it // during database rrd* operations - memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); - memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); - memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + memcpy(&per_sample, &legacy_aclk_metrics_per_sample, sizeof(struct legacy_aclk_metrics_per_sample)); + memcpy(&permanent, &legacy_aclk_metrics, sizeof(struct legacy_aclk_metrics)); + memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample)); - memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); - memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); - memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * query_thread_count); - ACLK_STATS_UNLOCK; + memcpy(legacy_aclk_queries_per_thread_sample, legacy_aclk_queries_per_thread, sizeof(uint32_t) * legacy_query_thread_count); + memset(legacy_aclk_queries_per_thread, 0, sizeof(uint32_t) * legacy_query_thread_count); + memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * legacy_query_thread_count); + LEGACY_ACLK_STATS_UNLOCK; aclk_stats_collect(&per_sample, &permanent); aclk_stats_query_queue(&per_sample); @@ -386,7 +384,7 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_cloud_req_cmd(&per_sample); - aclk_stats_query_threads(aclk_queries_per_thread_sample); + aclk_stats_query_threads(legacy_aclk_queries_per_thread_sample); aclk_stats_cpu_threads(); @@ -400,14 +398,14 @@ void *aclk_stats_main_thread(void *ptr) return 0; } -void aclk_stats_upd_online(int online) { +void legacy_aclk_stats_upd_online(int online) { if(!aclk_stats_enabled) return; - ACLK_STATS_LOCK; - aclk_metrics.online = online; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics.online = online; if(!online) - aclk_metrics_per_sample.offline_during_sample = 1; - ACLK_STATS_UNLOCK; + legacy_aclk_metrics_per_sample.offline_during_sample = 1; + LEGACY_ACLK_STATS_UNLOCK; } diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h index 5e50a2272..560de3b5e 100644 --- a/aclk/legacy/aclk_stats.h +++ b/aclk/legacy/aclk_stats.h @@ -3,18 +3,16 @@ #ifndef NETDATA_ACLK_STATS_H #define NETDATA_ACLK_STATS_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" #include "aclk_common.h" #define ACLK_STATS_THREAD_NAME "ACLK_Stats" -extern netdata_mutex_t aclk_stats_mutex; +extern netdata_mutex_t legacy_aclk_stats_mutex; -#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex) -#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex) - -extern int aclk_stats_enabled; +#define LEGACY_ACLK_STATS_LOCK netdata_mutex_lock(&legacy_aclk_stats_mutex) +#define LEGACY_ACLK_STATS_UNLOCK netdata_mutex_unlock(&legacy_aclk_stats_mutex) struct aclk_stats_thread { netdata_thread_t *thread; @@ -22,7 +20,7 @@ struct aclk_stats_thread { }; // preserve between samples -struct aclk_metrics { +struct legacy_aclk_metrics { volatile uint8_t online; }; @@ -53,7 +51,7 @@ extern struct aclk_mat_metrics { struct aclk_metric_mat cloud_q_recvd_to_processed; } aclk_mat_metrics; -void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); +void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); #define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7 // if you change update cloud_req_type_names @@ -61,7 +59,7 @@ void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurem int aclk_cloud_req_type_to_idx(const char *name); // reset to 0 on every sample -extern struct aclk_metrics_per_sample { +extern struct legacy_aclk_metrics_per_sample { /* in the unlikely event of ACLK disconnecting and reconnecting under 1 sampling rate we want to make sure we record the disconnection @@ -90,13 +88,13 @@ extern struct aclk_metrics_per_sample { #endif struct aclk_metric_mat_data cloud_q_db_query_time; struct aclk_metric_mat_data cloud_q_recvd_to_processed; -} aclk_metrics_per_sample; +} legacy_aclk_metrics_per_sample; -extern uint32_t *aclk_queries_per_thread; +extern uint32_t *legacy_aclk_queries_per_thread; extern struct rusage *rusage_per_thread; -void *aclk_stats_main_thread(void *ptr); -void aclk_stats_thread_cleanup(); -void aclk_stats_upd_online(int online); +void *legacy_aclk_stats_main_thread(void *ptr); +void legacy_aclk_stats_thread_cleanup(); +void legacy_aclk_stats_upd_online(int online); #endif /* NETDATA_ACLK_STATS_H */ diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c index 5ed7e66af..80ca23971 100644 --- a/aclk/legacy/agent_cloud_link.c +++ b/aclk/legacy/agent_cloud_link.c @@ -6,6 +6,7 @@ #include "aclk_query.h" #include "aclk_common.h" #include "aclk_stats.h" +#include "../aclk_collector_list.h" #ifdef ENABLE_ACLK #include <libwebsockets.h> @@ -15,46 +16,20 @@ int aclk_shutting_down = 0; // Other global state static int aclk_subscribed = 0; -static int aclk_disable_single_updates = 0; static char *aclk_username = NULL; static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_force_reconnect = 0; // Indication from lower layers -usec_t aclk_session_us = 0; // Used by the mqtt layer -time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; -static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) -#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) -#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) - void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); void aclk_lws_wss_destroy_context(); -/* - * Maintain a list of collectors and chart count - * If all the charts of a collector are deleted - * then a new metadata dataset must be send to the cloud - * - */ -struct _collector { - time_t created; - uint32_t count; //chart count - uint32_t hostname_hash; - uint32_t plugin_hash; - uint32_t module_hash; - char *hostname; - char *plugin_name; - char *module_name; - struct _collector *next; -}; - -struct _collector *collector_list = NULL; char *create_uuid() { @@ -67,7 +42,7 @@ char *create_uuid() return uuid_str; } -int cloud_to_agent_parse(JSON_ENTRY *e) +int legacy_cloud_to_agent_parse(JSON_ENTRY *e) { struct aclk_request *data = e->callback_data; @@ -247,202 +222,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size) return final_topic; } -#ifndef __GNUC__ -#pragma region ACLK Internal Collector Tracking -#endif - -/* - * Free a collector structure - */ - -static void _free_collector(struct _collector *collector) -{ - if (likely(collector->plugin_name)) - freez(collector->plugin_name); - - if (likely(collector->module_name)) - freez(collector->module_name); - - if (likely(collector->hostname)) - freez(collector->hostname); - - freez(collector); -} - -/* - * This will report the collector list - * - */ -#ifdef ACLK_DEBUG -static void _dump_collector_list() -{ - struct _collector *tmp_collector; - - COLLECTOR_LOCK; - - info("DUMPING ALL COLLECTORS"); - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - info("DUMPING ALL COLLECTORS -- nothing found"); - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - - while (tmp_collector) { - info( - "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, - tmp_collector->plugin_name ? tmp_collector->plugin_name : "", - tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); - - tmp_collector = tmp_collector->next; - } - info("DUMPING ALL COLLECTORS DONE"); - COLLECTOR_UNLOCK; -} -#endif - -/* - * This will cleanup the collector list - * - */ -static void _reset_collector_list() -{ - struct _collector *tmp_collector, *next_collector; - - COLLECTOR_LOCK; - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - collector_list->count = 0; - collector_list->next = NULL; - - // We broke the link; we can unlock - COLLECTOR_UNLOCK; - - while (tmp_collector) { - next_collector = tmp_collector->next; - _free_collector(tmp_collector); - tmp_collector = next_collector; - } -} - -/* - * Find a collector (if it exists) - * Must lock before calling this - * If last_collector is not null, it will return the previous collector in the linked - * list (used in collector delete) - */ -static struct _collector *_find_collector( - const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) -{ - struct _collector *tmp_collector, *prev_collector; - uint32_t plugin_hash; - uint32_t module_hash; - uint32_t hostname_hash; - - if (unlikely(!collector_list)) { - collector_list = callocz(1, sizeof(struct _collector)); - return NULL; - } - - if (unlikely(!collector_list->next)) - return NULL; - - plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - module_hash = module_name ? simple_hash(module_name) : 1; - hostname_hash = simple_hash(hostname); - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - prev_collector = collector_list; - while (tmp_collector) { - if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && - hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && - (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && - (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { - if (unlikely(last_collector)) - *last_collector = prev_collector; - - return tmp_collector; - } - - prev_collector = tmp_collector; - tmp_collector = tmp_collector->next; - } - - return tmp_collector; -} - -/* - * Called to delete a collector - * It will reduce the count (chart_count) and will remove it - * from the linked list if the count reaches zero - * The structure will be returned to the caller to free - * the resources - * - */ -static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector, *prev_collector = NULL; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); - - if (likely(tmp_collector)) { - --tmp_collector->count; - if (unlikely(!tmp_collector->count)) - prev_collector->next = tmp_collector->next; - } - return tmp_collector; -} - -/* - * Add a new collector (plugin / module) to the list - * If it already exists just update the chart count - * - * Lock before calling - */ -static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); - - if (unlikely(!tmp_collector)) { - tmp_collector = callocz(1, sizeof(struct _collector)); - tmp_collector->hostname_hash = simple_hash(hostname); - tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; - - tmp_collector->hostname = strdupz(hostname); - tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; - tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; - - tmp_collector->next = collector_list->next; - collector_list->next = tmp_collector; - } - tmp_collector->count++; - debug( - D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", - module_name ? module_name : "*", tmp_collector->count); - return tmp_collector; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -/* Avoids the need to scan trough all RRDHOSTS +/* Avoids the need to scan through all RRDHOSTS * every time any Query Thread Wakes Up * (every time we need to check child popcorn expiry) - * call with ACLK_SHARED_STATE_LOCK held + * call with legacy_aclk_shared_state_LOCK held */ void aclk_update_next_child_to_popcorn(void) { @@ -462,19 +245,19 @@ void aclk_update_next_child_to_popcorn(void) any = 1; - if (unlikely(!aclk_shared_state.next_popcorn_host)) { - aclk_shared_state.next_popcorn_host = host; + if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) { + legacy_aclk_shared_state.next_popcorn_host = host; rrdhost_aclk_state_unlock(host); continue; } - if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) - aclk_shared_state.next_popcorn_host = host; + if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) + legacy_aclk_shared_state.next_popcorn_host = host; rrdhost_aclk_state_unlock(host); } if(!any) - aclk_shared_state.next_popcorn_host = NULL; + legacy_aclk_shared_state.next_popcorn_host = NULL; rrd_unlock(); } @@ -487,7 +270,7 @@ static int aclk_popcorn_check_bump(RRDHOST *host) { time_t now = now_monotonic_sec(); int updated = 0, ret; - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); ret = ACLK_IS_HOST_INITIALIZING(host); @@ -502,12 +285,12 @@ static int aclk_popcorn_check_bump(RRDHOST *host) if (host != localhost && updated) aclk_update_next_child_to_popcorn(); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return ret; } rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return ret; } @@ -523,13 +306,13 @@ static void aclk_start_host_popcorning(RRDHOST *host) { usec_t now = now_monotonic_sec(); info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { errno = 0; error("Localhost is allowed to do popcorning only once after startup!"); rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return; } @@ -539,16 +322,16 @@ static void aclk_start_host_popcorning(RRDHOST *host) rrdhost_aclk_state_unlock(host); if (host != localhost) aclk_update_next_child_to_popcorn(); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; } static void aclk_stop_host_popcorning(RRDHOST *host) { - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); if (!ACLK_IS_HOST_POPCORNING(host)) { rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return; } @@ -557,18 +340,18 @@ static void aclk_stop_host_popcorning(RRDHOST *host) host->aclk_state.metadata = ACLK_METADATA_REQUIRED; rrdhost_aclk_state_unlock(host); - if(host == aclk_shared_state.next_popcorn_host) { - aclk_shared_state.next_popcorn_host = NULL; + if(host == legacy_aclk_shared_state.next_popcorn_host) { + legacy_aclk_shared_state.next_popcorn_host = NULL; aclk_update_next_child_to_popcorn(); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; } /* * Add a new collector to the list * If it exists, update the chart count */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -589,7 +372,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu if(aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); } @@ -601,7 +384,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu * This function will release the memory used and schedule * a cloud update */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -628,7 +411,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu if (aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); } @@ -639,7 +422,7 @@ static void aclk_graceful_disconnect() // Send a graceful disconnect message BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}"); aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); buffer_free(b); @@ -963,10 +746,10 @@ static void aclk_try_to_connect(char *hostname, int port) aclk_connecting = 1; create_publish_base_topic(); - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = 0; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + legacy_aclk_shared_state.version_neg = 0; + legacy_aclk_shared_state.version_neg_wait_till = 0; + legacy_aclk_shared_state_UNLOCK; rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password); if (unlikely(rc)) { @@ -981,10 +764,10 @@ static inline void aclk_hello_msg() char *msg_id = create_uuid(); - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + legacy_aclk_shared_state.version_neg = 0; + legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + legacy_aclk_shared_state_UNLOCK; //Hello message is versioned separately from the rest of the protocol aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); @@ -1004,7 +787,7 @@ static inline void aclk_hello_msg() * * @return It always returns NULL */ -void *aclk_main(void *ptr) +void *legacy_aclk_main(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct aclk_query_threads query_threads; @@ -1065,7 +848,7 @@ void *aclk_main(void *ptr) stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; netdata_thread_create( - stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread, stats_thread); } @@ -1165,20 +948,20 @@ void *aclk_main(void *ptr) } if (unlikely(!query_threads.thread_list)) { - aclk_query_threads_start(&query_threads); + legacy_aclk_query_threads_start(&query_threads); } time_t now = now_monotonic_sec(); if(aclk_connected && last_periodic_query_wakeup < now) { - // to make `aclk_queue_query()` param `run_after` work + // to make `legacy_aclk_queue_query()` param `run_after` work // also makes per child popcorning work last_periodic_query_wakeup = now; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; } } // forever exited: // Wakeup query thread to cleanup - QUERY_THREAD_WAKEUP_ALL; + LEGACY_QUERY_THREAD_WAKEUP_ALL; freez(aclk_username); freez(aclk_password); @@ -1192,18 +975,18 @@ exited: if (agent_id && aclk_connected) { freez(agent_id); // Wakeup thread to cleanup - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; aclk_graceful_disconnect(); } - aclk_query_threads_cleanup(&query_threads); + legacy_aclk_query_threads_cleanup(&query_threads); _reset_collector_list(); freez(collector_list); if(aclk_stats_enabled) { netdata_thread_join(*stats_thread->thread, NULL); - aclk_stats_thread_cleanup(); + legacy_aclk_stats_thread_cleanup(); freez(stats_thread->thread); freez(stats_thread); } @@ -1306,12 +1089,12 @@ void aclk_connect() { info("Connection detected (%u queued queries)", aclk_query_size()); - aclk_stats_upd_online(1); + legacy_aclk_stats_upd_online(1); aclk_connected = 1; aclk_reconnect_delay(0); - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return; } @@ -1321,7 +1104,7 @@ void aclk_disconnect() if (likely(aclk_connected)) info("Disconnect detected (%u queued queries)", aclk_query_size()); - aclk_stats_upd_online(0); + legacy_aclk_stats_upd_online(0); aclk_subscribed = 0; rrdhost_aclk_state_lock(localhost); @@ -1372,7 +1155,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts */ void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); -void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) +void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1388,9 +1171,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); @@ -1418,7 +1201,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) * /api/v1/info * charts */ -int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) +int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1433,9 +1216,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *hos // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"info\" : "); @@ -1459,14 +1242,14 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); local_buffer->contenttype = CT_APPLICATION_JSON; - if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) - fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg); + if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg); debug(D_ACLK, "Sending Child Disconnect"); char *msg_id = create_uuid(); - aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\"payload\":"); @@ -1486,10 +1269,10 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) return 0; } -void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +void legacy_aclk_host_state_update(RRDHOST *host, int connect) { #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) return; #else #warning "This check became unnecessary. Remove" @@ -1498,19 +1281,14 @@ void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) if (unlikely(aclk_host_initializing(localhost))) return; - switch (cmd) { - case ACLK_CMD_CHILD_CONNECT: - debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); - aclk_start_host_popcorning(host); - aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); - break; - case ACLK_CMD_CHILD_DISCONNECT: - debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); - aclk_stop_host_popcorning(host); - aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); - break; - default: - error("Unknown command for aclk_host_state_update %d.", (int)cmd); + if (connect) { + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + } else { + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); } } @@ -1537,31 +1315,21 @@ void aclk_send_stress_test(size_t size) // or on request int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host) { - aclk_send_info_metadata(state, host); + legacy_aclk_send_info_metadata(state, host); if(host == localhost) - aclk_send_alarm_metadata(state); + legacy_aclk_send_alarm_metadata(state); return 0; } -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} - // Triggered by a health reload, sends the alarm metadata -void aclk_alarm_reload() +void legacy_aclk_alarm_reload() { if (unlikely(aclk_host_initializing(localhost))) return; - if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue on_connect command on alarm reload"); @@ -1585,7 +1353,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); rrdset2json(st, local_buffer, NULL, NULL, 1); @@ -1598,7 +1366,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart) return 0; } -int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) +int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create) { #ifndef ENABLE_ACLK UNUSED(host); @@ -1611,7 +1379,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (!netdata_cloud_setting) return 0; - if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) + if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) return 0; if (aclk_host_initializing(localhost)) @@ -1623,7 +1391,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (aclk_popcorn_check_bump(host)) return 0; - if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { + if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue chart_update command"); @@ -1634,7 +1402,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) #endif } -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) { BUFFER *local_buffer = NULL; @@ -1661,7 +1429,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) char *msg_id = create_uuid(); buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); @@ -1670,7 +1438,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) buffer_sprintf(local_buffer, "\n}"); - if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { + if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue alarm_command on alarm_update"); @@ -1682,3 +1450,53 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; } + +char *legacy_aclk_state(void) +{ + BUFFER *wb = buffer_create(1024); + char *ret; + + buffer_strcat(wb, + "ACLK Available: Yes\n" + "ACLK Implementation: Legacy\n" + "Claimed: " + ); + + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + buffer_strcat(wb, "No\n"); + else { + buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + freez(agent_id); + } + + buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No"); + + ret = strdupz(buffer_tostring(wb)); + buffer_free(wb); + return ret; +} + +char *legacy_aclk_state_json(void) +{ + BUFFER *wb = buffer_create(1024); + char *agent_id = is_agent_claimed(); + + buffer_sprintf(wb, + "{\"aclk-available\":true," + "\"aclk-implementation\":\"Legacy\"," + "\"agent-claimed\":%s," + "\"claimed-id\":", + agent_id ? "true" : "false" + ); + + if (agent_id) { + buffer_sprintf(wb, "\"%s\"", agent_id); + freez(agent_id); + } else + buffer_strcat(wb, "null"); + + buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false"); + + return strdupz(buffer_tostring(wb)); +} diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h index bfcfef8e9..8954a337a 100644 --- a/aclk/legacy/agent_cloud_link.h +++ b/aclk/legacy/agent_cloud_link.h @@ -3,11 +3,10 @@ #ifndef NETDATA_AGENT_CLOUD_LINK_H #define NETDATA_AGENT_CLOUD_LINK_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "mqtt.h" #include "aclk_common.h" -#define ACLK_THREAD_NAME "ACLK_Query" #define ACLK_CHART_TOPIC "outbound/meta" #define ACLK_ALARMS_TOPIC "outbound/alarms" #define ACLK_METADATA_TOPIC "outbound/meta" @@ -18,7 +17,6 @@ #define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg) #define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds -#define ACLK_QOS 1 #define ACLK_PING_INTERVAL 60 #define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop @@ -42,16 +40,7 @@ struct aclk_request { typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION; -void *aclk_main(void *ptr); - -#define NETDATA_ACLK_HOOK \ - { .name = "ACLK_Main", \ - .config_section = NULL, \ - .config_name = NULL, \ - .enabled = 1, \ - .thread = NULL, \ - .init_routine = NULL, \ - .start_routine = aclk_main }, +void *legacy_aclk_main(void *ptr); extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id); @@ -62,32 +51,35 @@ char *create_uuid(); // callbacks for agent cloud link int aclk_subscribe(char *topic, int qos); -int cloud_to_agent_parse(JSON_ENTRY *e); +int legacy_cloud_to_agent_parse(JSON_ENTRY *e); void aclk_disconnect(); void aclk_connect(); +#ifdef ENABLE_ACLK int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host); -int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); -void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); +int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); +void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); int aclk_wait_for_initialization(); char *create_publish_base_topic(); int aclk_send_single_chart(RRDHOST *host, char *chart); -int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); +int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create); +int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version); -int aclk_handle_cloud_message(char *payload); -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_alarm_reload(); +int legacy_aclk_handle_cloud_message(char *payload); +void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void legacy_aclk_alarm_reload(void); unsigned long int aclk_reconnect_delay(int mode); extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); -void aclk_single_update_enable(); -void aclk_single_update_disable(); -void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd); +void legacy_aclk_host_state_update(RRDHOST *host, int connect); int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd); void aclk_update_next_child_to_popcorn(void); +char *legacy_aclk_state(void); +char *legacy_aclk_state_json(void); +#endif + #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c index 74f774555..0e4bb2ec9 100644 --- a/aclk/legacy/mqtt.c +++ b/aclk/legacy/mqtt.c @@ -1,12 +1,16 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include <libnetdata/json/json.h> -#include "../../daemon/common.h" +#include "daemon/common.h" #include "mqtt.h" #include "aclk_lws_wss_client.h" #include "aclk_stats.h" #include "aclk_rx_msgs.h" +#include "agent_cloud_link.h" + +#define ACLK_QOS 1 + extern usec_t aclk_session_us; extern time_t aclk_session_sec; @@ -27,7 +31,7 @@ void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosqu UNUSED(mosq); UNUSED(obj); - aclk_handle_cloud_message(msg->payload); + legacy_aclk_handle_cloud_message(msg->payload); } void publish_callback(struct mosquitto *mosq, void *obj, int rc) @@ -44,7 +48,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc) info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff); - aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.latency, diff); #endif return; } diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h index cc4765d62..98d599f51 100644 --- a/aclk/legacy/mqtt.h +++ b/aclk/legacy/mqtt.h @@ -19,7 +19,7 @@ const char *_link_strerror(int rc); int _link_set_lwt(char *topic, int qos); -int aclk_handle_cloud_message(char *); +int legacy_aclk_handle_cloud_message(char *); extern char *get_topic(char *sub_topic, char *final_topic, int max_size); #endif //NETDATA_MQTT_H |