summaryrefslogtreecommitdiffstats
path: root/aclk/legacy
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-01 06:15:11 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-01 06:15:11 +0000
commit483926a283e118590da3f9ecfa75a8a4d62143ce (patch)
treecb77052778df9a128a8cd3ff5bf7645322a13bc5 /aclk/legacy
parentReleasing debian version 1.31.0-4. (diff)
downloadnetdata-483926a283e118590da3f9ecfa75a8a4d62143ce.tar.xz
netdata-483926a283e118590da3f9ecfa75a8a4d62143ce.zip
Merging upstream version 1.32.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/legacy')
-rw-r--r--aclk/legacy/aclk_common.c213
-rw-r--r--aclk/legacy/aclk_common.h37
-rw-r--r--aclk/legacy/aclk_lws_https_client.c6
-rw-r--r--aclk/legacy/aclk_lws_https_client.h2
-rw-r--r--aclk/legacy/aclk_lws_wss_client.c27
-rw-r--r--aclk/legacy/aclk_lws_wss_client.h2
-rw-r--r--aclk/legacy/aclk_query.c96
-rw-r--r--aclk/legacy/aclk_query.h19
-rw-r--r--aclk/legacy/aclk_rrdhost_state.h42
-rw-r--r--aclk/legacy/aclk_rx_msgs.c75
-rw-r--r--aclk/legacy/aclk_rx_msgs.h4
-rw-r--r--aclk/legacy/aclk_stats.c134
-rw-r--r--aclk/legacy/aclk_stats.h26
-rw-r--r--aclk/legacy/agent_cloud_link.c432
-rw-r--r--aclk/legacy/agent_cloud_link.h42
-rw-r--r--aclk/legacy/mqtt.c10
-rw-r--r--aclk/legacy/mqtt.h2
17 files changed, 351 insertions, 818 deletions
diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c
index 96f95545..7f8368e4 100644
--- a/aclk/legacy/aclk_common.c
+++ b/aclk/legacy/aclk_common.c
@@ -1,201 +1,18 @@
#include "aclk_common.h"
-#include "../../daemon/common.h"
+#include "daemon/common.h"
#ifdef ENABLE_ACLK
#include <libwebsockets.h>
#endif
-netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+netdata_mutex_t legacy_aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
-int aclk_disable_runtime = 0;
-int aclk_kill_link = 0;
-
-struct aclk_shared_state aclk_shared_state = {
+struct legacy_aclk_shared_state legacy_aclk_shared_state = {
.version_neg = 0,
.version_neg_wait_till = 0
};
-struct {
- ACLK_PROXY_TYPE type;
- const char *url_str;
-} supported_proxy_types[] = {
- { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
- { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
- { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
- { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
-};
-
-const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
-{
- switch (*type) {
- case PROXY_DISABLED:
- return "disabled";
- case PROXY_TYPE_HTTP:
- return "HTTP";
- case PROXY_TYPE_SOCKS5:
- return "SOCKS";
- default:
- return "Unknown";
- }
-}
-
-static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
-{
- int i = 0;
- while (supported_proxy_types[i].url_str) {
- if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
- return supported_proxy_types[i].type;
- i++;
- }
- return PROXY_TYPE_UNKNOWN;
-}
-
-ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
-{
- if (!string)
- return PROXY_TYPE_UNKNOWN;
-
- while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove)
- string++;
-
- if (!*string)
- return PROXY_TYPE_UNKNOWN;
-
- return aclk_find_proxy(string);
-}
-
-// helper function to censor user&password
-// for logging purposes
-void safe_log_proxy_censor(char *proxy)
-{
- size_t length = strlen(proxy);
- char *auth = proxy + length - 1;
- char *cur;
-
- while ((auth >= proxy) && (*auth != '@'))
- auth--;
-
- //if not found or @ is first char do nothing
- if (auth <= proxy)
- return;
-
- cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
- if (!cur)
- cur = proxy;
- else
- cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
-
- while (cur < auth) {
- *cur = 'X';
- cur++;
- }
-}
-
-static inline void safe_log_proxy_error(char *str, const char *proxy)
-{
- char *log = strdupz(proxy);
- safe_log_proxy_censor(log);
- error("%s Provided Value:\"%s\"", str, log);
- freez(log);
-}
-
-static inline int check_socks_environment(const char **proxy)
-{
- char *tmp = getenv("socks_proxy");
-
- if (!tmp)
- return 1;
-
- if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
- *proxy = tmp;
- return 0;
- }
-
- safe_log_proxy_error(
- "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
- tmp);
- return 1;
-}
-
-static inline int check_http_environment(const char **proxy)
-{
- char *tmp = getenv("http_proxy");
-
- if (!tmp)
- return 1;
-
- if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
- *proxy = tmp;
- return 0;
- }
-
- safe_log_proxy_error(
- "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
- tmp);
- return 1;
-}
-
-const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
-{
- const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
- *type = PROXY_DISABLED;
-
- if (strcmp(proxy, "none") == 0)
- return proxy;
-
- if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
- if (check_socks_environment(&proxy) == 0) {
-#ifdef LWS_WITH_SOCKS5
- *type = PROXY_TYPE_SOCKS5;
- return proxy;
-#else
- safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
- "but Libwebsockets used doesn't have SOCKS5 support built in. "
- "Ignoring and checking for other options.",
- proxy);
-#endif
- }
- if (check_http_environment(&proxy) == 0)
- *type = PROXY_TYPE_HTTP;
- return proxy;
- }
-
- *type = aclk_verify_proxy(proxy);
-#ifndef LWS_WITH_SOCKS5
- if (*type == PROXY_TYPE_SOCKS5) {
- safe_log_proxy_error(
- "Config var \"" ACLK_PROXY_CONFIG_VAR
- "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
- proxy);
- }
-#endif
- if (*type == PROXY_TYPE_UNKNOWN) {
- *type = PROXY_DISABLED;
- safe_log_proxy_error(
- "Config var \"" ACLK_PROXY_CONFIG_VAR
- "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
- proxy);
- }
-
- return proxy;
-}
-
-// helper function to read settings only once (static)
-// as claiming, challenge/response and ACLK
-// read the same thing, no need to parse again
-const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
-{
- static const char *proxy = NULL;
- static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
-
- if (proxy_type == PROXY_NOT_SET)
- proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
-
- *type = proxy_type;
- return proxy;
-}
-
int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
{
int pos = 0;
@@ -234,27 +51,3 @@ int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
return 0;
}
-
-struct label *add_aclk_host_labels(struct label *label) {
-#ifdef ENABLE_ACLK
- ACLK_PROXY_TYPE aclk_proxy;
- char *proxy_str;
- aclk_get_proxy(&aclk_proxy);
-
- switch(aclk_proxy) {
- case PROXY_TYPE_SOCKS5:
- proxy_str = "SOCKS5";
- break;
- case PROXY_TYPE_HTTP:
- proxy_str = "HTTP";
- break;
- default:
- proxy_str = "none";
- break;
- }
- label = add_label_to_list(label, "_aclk_impl", "Legacy", LABEL_SOURCE_AUTO);
- return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
-#else
- return label;
-#endif
-}
diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h
index eedb5b51..080680ff 100644
--- a/aclk/legacy/aclk_common.h
+++ b/aclk/legacy/aclk_common.h
@@ -1,12 +1,12 @@
#ifndef ACLK_COMMON_H
#define ACLK_COMMON_H
-#include "aclk_rrdhost_state.h"
-#include "../../daemon/common.h"
+#include "../aclk_rrdhost_state.h"
+#include "daemon/common.h"
-extern netdata_mutex_t aclk_shared_state_mutex;
-#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
-#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+extern netdata_mutex_t legacy_aclk_shared_state_mutex;
+#define legacy_aclk_shared_state_LOCK netdata_mutex_lock(&legacy_aclk_shared_state_mutex)
+#define legacy_aclk_shared_state_UNLOCK netdata_mutex_unlock(&legacy_aclk_shared_state_mutex)
// minimum and maximum supported version of ACLK
// in this version of agent
@@ -33,8 +33,8 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING)
#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update)
-extern struct aclk_shared_state {
- // optimization to avoid looping trough hosts
+extern struct legacy_aclk_shared_state {
+ // optimization to avoid looping through hosts
// every time Query Thread wakes up
RRDHOST *next_popcorn_host;
@@ -42,31 +42,10 @@ extern struct aclk_shared_state {
// protect by lock otherwise
int version_neg;
usec_t version_neg_wait_till;
-} aclk_shared_state;
-
-typedef enum aclk_proxy_type {
- PROXY_TYPE_UNKNOWN = 0,
- PROXY_TYPE_SOCKS5,
- PROXY_TYPE_HTTP,
- PROXY_DISABLED,
- PROXY_NOT_SET,
-} ACLK_PROXY_TYPE;
-
-extern int aclk_kill_link; // Tells the agent to tear down the link
-extern int aclk_disable_runtime;
+} legacy_aclk_shared_state;
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
-#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
-#define ACLK_PROXY_ENV "env"
-#define ACLK_PROXY_CONFIG_VAR "proxy"
-
-ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
-const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
-void safe_log_proxy_censor(char *proxy);
int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
-const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
-
-struct label *add_aclk_host_labels(struct label *label);
#endif //ACLK_COMMON_H
diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c
index f41a230d..8a490c6f 100644
--- a/aclk/legacy/aclk_lws_https_client.c
+++ b/aclk/legacy/aclk_lws_https_client.c
@@ -2,13 +2,7 @@
#define ACLK_LWS_HTTPS_CLIENT_INTERNAL
#include "aclk_lws_https_client.h"
-
-#ifndef ACLK_NG
#include "aclk_common.h"
-#else
-#include "../aclk.h"
-#endif
-
#include "aclk_lws_wss_client.h"
#define SMALL_BUFFER 16
diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h
index 811809dd..5f30a37f 100644
--- a/aclk/legacy/aclk_lws_https_client.h
+++ b/aclk/legacy/aclk_lws_https_client.h
@@ -3,7 +3,7 @@
#ifndef NETDATA_LWS_HTTPS_CLIENT_H
#define NETDATA_LWS_HTTPS_CLIENT_H
-#include "../../daemon/common.h"
+#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
#define DATAMAXLEN 1024*16
diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c
index f73902b3..012f2a8c 100644
--- a/aclk/legacy/aclk_lws_wss_client.c
+++ b/aclk/legacy/aclk_lws_wss_client.c
@@ -3,9 +3,10 @@
#include "aclk_lws_wss_client.h"
#include "libnetdata/libnetdata.h"
-#include "../../daemon/common.h"
+#include "daemon/common.h"
#include "aclk_common.h"
#include "aclk_stats.h"
+#include "../aclk_proxy.h"
extern int aclk_shutting_down;
@@ -450,9 +451,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
if (n>=0) {
data->written += n;
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.write_q_consumed += n;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.write_q_consumed += n;
+ LEGACY_ACLK_STATS_UNLOCK;
}
}
//error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc);
@@ -473,9 +474,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
retval = 1;
aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.read_q_added += len;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.read_q_added += len;
+ LEGACY_ACLK_STATS_UNLOCK;
}
// to future myself -> do not call this while read lock is active as it will eventually
@@ -553,9 +554,9 @@ int aclk_lws_wss_client_write(void *buf, size_t count)
aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.write_q_added += count;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.write_q_added += count;
+ LEGACY_ACLK_STATS_UNLOCK;
}
lws_callback_on_writable(engine_instance->lws_wsi);
@@ -584,9 +585,9 @@ int aclk_lws_wss_client_read(void *buf, size_t count)
engine_instance->data_to_read = 0;
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.read_q_consumed += data_to_be_read;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.read_q_consumed += data_to_be_read;
+ LEGACY_ACLK_STATS_UNLOCK;
}
abort:
diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h
index eb99ee02..c68649cf 100644
--- a/aclk/legacy/aclk_lws_wss_client.h
+++ b/aclk/legacy/aclk_lws_wss_client.h
@@ -58,7 +58,7 @@ struct aclk_lws_wss_engine_instance {
struct lws_wss_packet_buffer *write_buffer_head;
struct lws_ring *read_ringbuffer;
- //flags to be readed by engine user
+ //flags to be read by engine user
int websocket_connection_up;
// currently this is by default disabled
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c
index 040068e8..21eae11f 100644
--- a/aclk/legacy/aclk_query.c
+++ b/aclk/legacy/aclk_query.c
@@ -2,15 +2,16 @@
#include "aclk_query.h"
#include "aclk_stats.h"
#include "aclk_rx_msgs.h"
+#include "agent_cloud_link.h"
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
-pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
-pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
-#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
-#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
-volatile int aclk_connected = 0;
+pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait)
+#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait)
#ifndef __GNUC__
#pragma region ACLK_QUEUE
@@ -188,7 +189,7 @@ aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd
* Add a query to execute, the result will be send to the specified topic
*/
-int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
+int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
struct aclk_query *new_query, *tmp_query;
@@ -205,7 +206,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
if (unlikely(tmp_query)) {
if (tmp_query->run_after == run_after) {
ACLK_QUEUE_UNLOCK;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@@ -220,9 +221,9 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
}
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_queued++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.queries_queued++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
new_query = callocz(1, sizeof(struct aclk_query));
@@ -255,7 +256,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
aclk_queue.aclk_query_tail = new_query;
aclk_queue.count++;
ACLK_QUEUE_UNLOCK;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@@ -264,7 +265,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
aclk_queue.count++;
ACLK_QUEUE_UNLOCK;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@@ -332,12 +333,12 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
{
usec_t t = now_boottime_usec();
- aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
+ legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
w->response.code = web_client_api_request_v1(host, w, url);
t = now_boottime_usec() - t;
- aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t);
+ legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t);
return t;
}
@@ -375,7 +376,7 @@ static int aclk_execute_query(struct aclk_query *this_query)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
@@ -510,7 +511,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code);
buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A");
buffer_strcat(local_buffer, w->response.header_output->buffer);
@@ -607,7 +608,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
case ACLK_CMD_ONCONNECT:
ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
- if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
+ if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
break;
}
@@ -638,7 +639,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
debug(D_ACLK, "EXECUTING a chart delete command");
//TODO: This send the info metadata for now
- aclk_send_info_metadata(ACLK_METADATA_SENT, host);
+ legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host);
break;
case ACLK_CMD_ALARM:
@@ -673,10 +674,10 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_dispatched++;
- aclk_queries_per_thread[t_info->idx]++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.queries_dispatched++;
+ legacy_aclk_queries_per_thread[t_info->idx]++;
+ LEGACY_ACLK_STATS_UNLOCK;
if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) {
getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]);
@@ -690,7 +691,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
return 1;
}
-void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
+void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
{
if (query_threads && query_threads->thread_list) {
for (int i = 0; i < query_threads->count; i++) {
@@ -707,8 +708,8 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
} while (this_query);
}
-#define TASK_LEN_MAX 16
-void aclk_query_threads_start(struct aclk_query_threads *query_threads)
+#define TASK_LEN_MAX 22
+void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads)
{
info("Starting %d query threads.", query_threads->count);
@@ -717,10 +718,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
for (int i = 0; i < query_threads->count; i++) {
query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
- if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0))
+ if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
error("snprintf encoding error");
netdata_thread_create(
- &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
+ &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread,
&query_threads->thread_list[i]);
}
}
@@ -730,10 +731,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
* returns actual/updated popcorning state
*/
-ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
+ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host)
{
rrdhost_aclk_state_lock(host);
- ACLK_POPCORNING_STATE ret = host->aclk_state.state;
+ ACLK_AGENT_STATE ret = host->aclk_state.state;
if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
rrdhost_aclk_state_unlock(host);
return ret;
@@ -766,7 +767,7 @@ ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
* of no new collectors coming in in order to mark the agent
* as stable (set agent_state = AGENT_STABLE)
*/
-void *aclk_query_main_thread(void *ptr)
+void *legacy_aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
@@ -785,25 +786,24 @@ void *aclk_query_main_thread(void *ptr)
sleep(1);
continue;
}
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(!aclk_shared_state.version_neg)) {
- if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_LOCK;
+ if (unlikely(!legacy_aclk_shared_state.version_neg)) {
+ if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
+ legacy_aclk_shared_state_UNLOCK;
info("Waiting for ACLK Version Negotiation message from Cloud");
sleep(1);
continue;
}
- errno = 0;
- error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
+ info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds."
" Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
- aclk_shared_state.version_neg = ACLK_VERSION_MIN;
- aclk_set_rx_handlers(aclk_shared_state.version_neg);
+ legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN;
+ aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg);
}
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
rrdhost_aclk_state_lock(localhost);
if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
- if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
rrdhost_aclk_state_unlock(localhost);
errno = 0;
error("ACLK failed to queue on_connect command");
@@ -814,25 +814,25 @@ void *aclk_query_main_thread(void *ptr)
}
rrdhost_aclk_state_unlock(localhost);
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
- aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
- aclk_shared_state.next_popcorn_host = NULL;
+ legacy_aclk_shared_state_LOCK;
+ if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
+ legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ legacy_aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
while (aclk_process_query(info)) {
// Process all commands
};
- QUERY_THREAD_LOCK;
+ LEGACY_QUERY_THREAD_LOCK;
// TODO: Need to check if there are queries awaiting already
- if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
- QUERY_THREAD_UNLOCK;
+ LEGACY_QUERY_THREAD_UNLOCK;
}
return NULL;
diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h
index 026985c8..622b66e2 100644
--- a/aclk/legacy/aclk_query.h
+++ b/aclk/legacy/aclk_query.h
@@ -10,14 +10,11 @@
#define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread.
-extern pthread_cond_t query_cond_wait;
-extern pthread_mutex_t query_lock_wait;
+extern pthread_cond_t legacy_query_cond_wait;
+extern pthread_mutex_t legacy_query_lock_wait;
extern uint8_t *getrusage_called_this_tick;
-#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
-#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
-
-extern volatile int aclk_connected;
-
+#define LEGACY_QUERY_THREAD_WAKEUP pthread_cond_signal(&legacy_query_cond_wait)
+#define LEGACY_QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&legacy_query_cond_wait)
struct aclk_query_thread {
netdata_thread_t thread;
int idx;
@@ -34,11 +31,11 @@ struct aclk_cloud_req_v2 {
char *query_endpoint;
};
-void *aclk_query_main_thread(void *ptr);
-int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
+void *legacy_aclk_query_main_thread(void *ptr);
+int legacy_aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
-void aclk_query_threads_start(struct aclk_query_threads *query_threads);
-void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
+void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads);
+void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
unsigned int aclk_query_size();
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/legacy/aclk_rrdhost_state.h b/aclk/legacy/aclk_rrdhost_state.h
deleted file mode 100644
index 7ab3a502..00000000
--- a/aclk/legacy/aclk_rrdhost_state.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#ifndef ACLK_RRDHOST_STATE_H
-#define ACLK_RRDHOST_STATE_H
-
-#include "../../libnetdata/libnetdata.h"
-
-typedef enum aclk_cmd {
- ACLK_CMD_CLOUD,
- ACLK_CMD_ONCONNECT,
- ACLK_CMD_INFO,
- ACLK_CMD_CHART,
- ACLK_CMD_CHARTDEL,
- ACLK_CMD_ALARM,
- ACLK_CMD_CLOUD_QUERY_2,
- ACLK_CMD_CHILD_CONNECT,
- ACLK_CMD_CHILD_DISCONNECT
-} ACLK_CMD;
-
-typedef enum aclk_metadata_state {
- ACLK_METADATA_REQUIRED,
- ACLK_METADATA_CMD_QUEUED,
- ACLK_METADATA_SENT
-} ACLK_METADATA_STATE;
-
-typedef enum aclk_agent_state {
- ACLK_HOST_INITIALIZING,
- ACLK_HOST_STABLE
-} ACLK_POPCORNING_STATE;
-
-typedef struct aclk_rrdhost_state {
- char *claimed_id; // Claimed ID if host has one otherwise NULL
-
-#ifdef ENABLE_ACLK
- // per child popcorning
- ACLK_POPCORNING_STATE state;
- ACLK_METADATA_STATE metadata;
-
- time_t timestamp_created;
- time_t t_last_popcorn_update;
-#endif /* ENABLE_ACLK */
-} aclk_rrdhost_state;
-
-#endif /* ACLK_RRDHOST_STATE_H */
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
index 68dad81e..d4778bbc 100644
--- a/aclk/legacy/aclk_rx_msgs.c
+++ b/aclk/legacy/aclk_rx_msgs.c
@@ -4,6 +4,7 @@
#include "aclk_common.h"
#include "aclk_stats.h"
#include "aclk_query.h"
+#include "agent_cloud_link.h"
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -107,7 +108,7 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
error(
"Received \"http\" message from Cloud with version %d, but ACLK version %d is used",
cloud_to_agent->version,
- aclk_shared_state.version_neg);
+ legacy_aclk_shared_state.version_neg);
return 1;
}
@@ -126,14 +127,14 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
return 1;
}
- if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
+ if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_v1++;
- aclk_metrics_per_sample.cloud_req_ok++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.cloud_req_v1++;
+ legacy_aclk_metrics_per_sample.cloud_req_ok++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
return 0;
@@ -181,11 +182,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
}
// we do this here due to cloud_req being taken over by query thread
- // which if crazy quick can free it after aclk_queue_query
+ // which if crazy quick can free it after legacy_aclk_queue_query
stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint);
- // aclk_queue_query takes ownership of data pointer
- if (unlikely(aclk_queue_query(
+ // legacy_aclk_queue_query takes ownership of data pointer
+ if (unlikely(legacy_aclk_queue_query(
cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
ACLK_CMD_CLOUD_QUERY_2))) {
error("ACLK failed to queue incoming \"http\" v2 message");
@@ -193,11 +194,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
}
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_v2++;
- aclk_metrics_per_sample.cloud_req_ok++;
- aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.cloud_req_v2++;
+ legacy_aclk_metrics_per_sample.cloud_req_ok++;
+ legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
return 0;
@@ -258,19 +259,19 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha
version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
+ legacy_aclk_shared_state_LOCK;
+ if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) {
errno = 0;
error("The \"version\" message came too late ignoring.");
goto err_cleanup;
}
- if (unlikely(aclk_shared_state.version_neg)) {
+ if (unlikely(legacy_aclk_shared_state.version_neg)) {
errno = 0;
- error("Version has already been set to %d", aclk_shared_state.version_neg);
+ error("Version has already been set to %d", legacy_aclk_shared_state.version_neg);
goto err_cleanup;
}
- aclk_shared_state.version_neg = version;
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state.version_neg = version;
+ legacy_aclk_shared_state_UNLOCK;
info("Choosing version %d of ACLK", version);
@@ -279,7 +280,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha
return 0;
err_cleanup:
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return 1;
}
@@ -288,31 +289,31 @@ typedef struct aclk_incoming_msg_type{
int(*fnc)(struct aclk_request *, char *);
}aclk_incoming_msg_type;
-aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = {
+aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v1 },
{ .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
-aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
+aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v2 },
{ .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
-struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
void aclk_set_rx_handlers(int version)
{
if(version >= ACLK_V_COMPRESSION) {
- aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
+ legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression;
return;
}
- aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+ legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
}
-int aclk_handle_cloud_message(char *payload)
+int legacy_aclk_handle_cloud_message(char *payload)
{
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
@@ -325,7 +326,7 @@ int aclk_handle_cloud_message(char *payload)
debug(D_ACLK, "ACLK incoming message (%s)", payload);
- int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+ int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse);
if (unlikely(rc != JSON_OK)) {
errno = 0;
@@ -339,22 +340,22 @@ int aclk_handle_cloud_message(char *payload)
goto err_cleanup;
}
- if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
+ if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
goto err_cleanup;
}
- for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
- if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
- if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
+ for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) {
+ if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) {
+ if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
// in case of success handler is supposed to clean up after itself
// or as in the case of aclk_handle_cloud_request take
// ownership of the pointers (done to avoid copying)
- // see what `aclk_queue_query` parameter `internal` does
+ // see what `legacy_aclk_queue_query` parameter `internal` does
// NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
// msg handlers (namely aclk_handle_version_response)
- // can freely change what aclk_incoming_msg_types points to
+ // can freely change what legacy_aclk_incoming_msg_types points to
// so either exit or restart this for loop
freez(cloud_to_agent.type_id);
return 0;
@@ -378,9 +379,9 @@ err_cleanup:
err_cleanup_nojson:
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics_per_sample.cloud_req_err++;
+ LEGACY_ACLK_STATS_UNLOCK;
}
return 1;
diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h
index 3095e41a..f1f99114 100644
--- a/aclk/legacy/aclk_rx_msgs.h
+++ b/aclk/legacy/aclk_rx_msgs.h
@@ -3,10 +3,10 @@
#ifndef NETDATA_ACLK_RX_MSGS_H
#define NETDATA_ACLK_RX_MSGS_H
-#include "../../daemon/common.h"
+#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
-int aclk_handle_cloud_message(char *payload);
+int legacy_aclk_handle_cloud_message(char *payload);
void aclk_set_rx_handlers(int version);
diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c
index 88679cb3..fbbb322a 100644
--- a/aclk/legacy/aclk_stats.c
+++ b/aclk/legacy/aclk_stats.c
@@ -1,33 +1,31 @@
#include "aclk_stats.h"
-netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
+netdata_mutex_t legacy_aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
-int aclk_stats_enabled;
-
-int query_thread_count;
+int legacy_query_thread_count;
// data ACLK stats need per query thread
-struct aclk_qt_data {
+struct legacy_aclk_qt_data {
RRDDIM *dim;
-} *aclk_qt_data = NULL;
+} *legacy_aclk_qt_data = NULL;
// ACLK per query thread cpu stats
-struct aclk_cpu_data {
+struct legacy_aclk_cpu_data {
RRDDIM *user;
RRDDIM *system;
RRDSET *st;
-} *aclk_cpu_data = NULL;
+} *legacy_aclk_cpu_data = NULL;
-uint32_t *aclk_queries_per_thread = NULL;
-uint32_t *aclk_queries_per_thread_sample = NULL;
+uint32_t *legacy_aclk_queries_per_thread = NULL;
+uint32_t *legacy_aclk_queries_per_thread_sample = NULL;
struct rusage *rusage_per_thread;
uint8_t *getrusage_called_this_tick = NULL;
-struct aclk_metrics aclk_metrics = {
+static struct legacy_aclk_metrics legacy_aclk_metrics = {
.online = 0,
};
-struct aclk_metrics_per_sample aclk_metrics_per_sample;
+struct legacy_aclk_metrics_per_sample legacy_aclk_metrics_per_sample;
struct aclk_mat_metrics aclk_mat_metrics = {
#ifdef NETDATA_INTERNAL_CHECKS
@@ -61,20 +59,20 @@ struct aclk_mat_metrics aclk_mat_metrics = {
"by query thread (just before passing to the database)." }
};
-void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement)
+void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement)
{
if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
+ LEGACY_ACLK_STATS_LOCK;
if (metric->max < measurement)
metric->max = measurement;
metric->total += measurement;
metric->count++;
- ACLK_STATS_UNLOCK;
+ LEGACY_ACLK_STATS_UNLOCK;
}
}
-static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent)
+static void aclk_stats_collect(struct legacy_aclk_metrics_per_sample *per_sample, struct legacy_aclk_metrics *permanent)
{
static RRDSET *st_aclkstats = NULL;
static RRDDIM *rd_online_status = NULL;
@@ -93,7 +91,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc
rrdset_done(st_aclkstats);
}
-static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_query_queue(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st_query_thread = NULL;
static RRDDIM *rd_queued = NULL;
@@ -115,7 +113,7 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st_query_thread);
}
-static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_write_q(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_wq_add = NULL;
@@ -137,7 +135,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
-static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_read_q(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_add = NULL;
@@ -159,7 +157,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
-static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_cloud_req(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_ok = NULL;
@@ -181,7 +179,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
-static void aclk_stats_cloud_req_version(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_cloud_req_version(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_v1 = NULL;
@@ -223,7 +221,7 @@ int aclk_cloud_req_type_to_idx(const char *name)
return 0;
}
-static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_cloud_req_cmd(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st;
static int initialized = 0;
@@ -246,7 +244,7 @@ static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
-#define MAX_DIM_NAME 16
+#define MAX_DIM_NAME 22
static void aclk_stats_query_threads(uint32_t *queries_per_thread)
{
static RRDSET *st = NULL;
@@ -258,16 +256,16 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
"netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
"netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
- for (int i = 0; i < query_thread_count; i++) {
- if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
+ for (int i = 0; i < legacy_query_thread_count; i++) {
+ if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
error("snprintf encoding error");
- aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ legacy_aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
}
} else
rrdset_next(st);
- for (int i = 0; i < query_thread_count; i++) {
- rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
+ for (int i = 0; i < legacy_query_thread_count; i++) {
+ rrddim_set_by_pointer(st, legacy_aclk_qt_data[i].dim, queries_per_thread[i]);
}
rrdset_done(st);
@@ -301,59 +299,59 @@ static void aclk_stats_cpu_threads(void)
char id[100 + 1];
char title[100 + 1];
- for (int i = 0; i < query_thread_count; i++) {
- if (unlikely(!aclk_cpu_data[i].st)) {
+ for (int i = 0; i < legacy_query_thread_count; i++) {
+ if (unlikely(!legacy_aclk_cpu_data[i].st)) {
snprintfz(id, 100, "aclk_thread%d_cpu", i);
snprintfz(title, 100, "Cpu Usage For Thread No %d", i);
- aclk_cpu_data[i].st = rrdset_create_localhost(
+ legacy_aclk_cpu_data[i].st = rrdset_create_localhost(
"netdata", id, NULL, "aclk", NULL, title, "milliseconds/s",
"netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
- aclk_cpu_data[i].user = rrddim_add(aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- aclk_cpu_data[i].system = rrddim_add(aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ legacy_aclk_cpu_data[i].user = rrddim_add(legacy_aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ legacy_aclk_cpu_data[i].system = rrddim_add(legacy_aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
} else
- rrdset_next(aclk_cpu_data[i].st);
+ rrdset_next(legacy_aclk_cpu_data[i].st);
}
- for (int i = 0; i < query_thread_count; i++) {
- rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec);
- rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec);
- rrdset_done(aclk_cpu_data[i].st);
+ for (int i = 0; i < legacy_query_thread_count; i++) {
+ rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec);
+ rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec);
+ rrdset_done(legacy_aclk_cpu_data[i].st);
}
}
-void aclk_stats_thread_cleanup()
+void legacy_aclk_stats_thread_cleanup()
{
- freez(aclk_qt_data);
- freez(aclk_queries_per_thread);
- freez(aclk_queries_per_thread_sample);
- freez(aclk_cpu_data);
+ freez(legacy_aclk_qt_data);
+ freez(legacy_aclk_queries_per_thread);
+ freez(legacy_aclk_queries_per_thread_sample);
+ freez(legacy_aclk_cpu_data);
freez(rusage_per_thread);
}
-void *aclk_stats_main_thread(void *ptr)
+void *legacy_aclk_stats_main_thread(void *ptr)
{
struct aclk_stats_thread *args = ptr;
- query_thread_count = args->query_thread_count;
- aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
- aclk_cpu_data = callocz(query_thread_count, sizeof(struct aclk_cpu_data));
- aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
- aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
- rusage_per_thread = callocz(query_thread_count, sizeof(struct rusage));
- getrusage_called_this_tick = callocz(query_thread_count, sizeof(uint8_t));
+ legacy_query_thread_count = args->query_thread_count;
+ legacy_aclk_qt_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_qt_data));
+ legacy_aclk_cpu_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_cpu_data));
+ legacy_aclk_queries_per_thread = callocz(legacy_query_thread_count, sizeof(uint32_t));
+ legacy_aclk_queries_per_thread_sample = callocz(legacy_query_thread_count, sizeof(uint32_t));
+ rusage_per_thread = callocz(legacy_query_thread_count, sizeof(struct rusage));
+ getrusage_called_this_tick = callocz(legacy_query_thread_count, sizeof(uint8_t));
heartbeat_t hb;
heartbeat_init(&hb);
usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
- memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+ memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample));
- struct aclk_metrics_per_sample per_sample;
- struct aclk_metrics permanent;
+ struct legacy_aclk_metrics_per_sample per_sample;
+ struct legacy_aclk_metrics permanent;
while (!netdata_exit) {
netdata_thread_testcancel();
@@ -363,17 +361,17 @@ void *aclk_stats_main_thread(void *ptr)
heartbeat_next(&hb, step_ut);
if (netdata_exit) break;
- ACLK_STATS_LOCK;
+ LEGACY_ACLK_STATS_LOCK;
// to not hold lock longer than necessary, especially not to hold it
// during database rrd* operations
- memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
- memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
- memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+ memcpy(&per_sample, &legacy_aclk_metrics_per_sample, sizeof(struct legacy_aclk_metrics_per_sample));
+ memcpy(&permanent, &legacy_aclk_metrics, sizeof(struct legacy_aclk_metrics));
+ memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample));
- memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
- memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
- memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * query_thread_count);
- ACLK_STATS_UNLOCK;
+ memcpy(legacy_aclk_queries_per_thread_sample, legacy_aclk_queries_per_thread, sizeof(uint32_t) * legacy_query_thread_count);
+ memset(legacy_aclk_queries_per_thread, 0, sizeof(uint32_t) * legacy_query_thread_count);
+ memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * legacy_query_thread_count);
+ LEGACY_ACLK_STATS_UNLOCK;
aclk_stats_collect(&per_sample, &permanent);
aclk_stats_query_queue(&per_sample);
@@ -386,7 +384,7 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_cloud_req_cmd(&per_sample);
- aclk_stats_query_threads(aclk_queries_per_thread_sample);
+ aclk_stats_query_threads(legacy_aclk_queries_per_thread_sample);
aclk_stats_cpu_threads();
@@ -400,14 +398,14 @@ void *aclk_stats_main_thread(void *ptr)
return 0;
}
-void aclk_stats_upd_online(int online) {
+void legacy_aclk_stats_upd_online(int online) {
if(!aclk_stats_enabled)
return;
- ACLK_STATS_LOCK;
- aclk_metrics.online = online;
+ LEGACY_ACLK_STATS_LOCK;
+ legacy_aclk_metrics.online = online;
if(!online)
- aclk_metrics_per_sample.offline_during_sample = 1;
- ACLK_STATS_UNLOCK;
+ legacy_aclk_metrics_per_sample.offline_during_sample = 1;
+ LEGACY_ACLK_STATS_UNLOCK;
}
diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h
index 5e50a227..560de3b5 100644
--- a/aclk/legacy/aclk_stats.h
+++ b/aclk/legacy/aclk_stats.h
@@ -3,18 +3,16 @@
#ifndef NETDATA_ACLK_STATS_H
#define NETDATA_ACLK_STATS_H
-#include "../../daemon/common.h"
+#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
#include "aclk_common.h"
#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
-extern netdata_mutex_t aclk_stats_mutex;
+extern netdata_mutex_t legacy_aclk_stats_mutex;
-#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
-#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
-
-extern int aclk_stats_enabled;
+#define LEGACY_ACLK_STATS_LOCK netdata_mutex_lock(&legacy_aclk_stats_mutex)
+#define LEGACY_ACLK_STATS_UNLOCK netdata_mutex_unlock(&legacy_aclk_stats_mutex)
struct aclk_stats_thread {
netdata_thread_t *thread;
@@ -22,7 +20,7 @@ struct aclk_stats_thread {
};
// preserve between samples
-struct aclk_metrics {
+struct legacy_aclk_metrics {
volatile uint8_t online;
};
@@ -53,7 +51,7 @@ extern struct aclk_mat_metrics {
struct aclk_metric_mat cloud_q_recvd_to_processed;
} aclk_mat_metrics;
-void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
+void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
#define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
// if you change update cloud_req_type_names
@@ -61,7 +59,7 @@ void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurem
int aclk_cloud_req_type_to_idx(const char *name);
// reset to 0 on every sample
-extern struct aclk_metrics_per_sample {
+extern struct legacy_aclk_metrics_per_sample {
/* in the unlikely event of ACLK disconnecting
and reconnecting under 1 sampling rate
we want to make sure we record the disconnection
@@ -90,13 +88,13 @@ extern struct aclk_metrics_per_sample {
#endif
struct aclk_metric_mat_data cloud_q_db_query_time;
struct aclk_metric_mat_data cloud_q_recvd_to_processed;
-} aclk_metrics_per_sample;
+} legacy_aclk_metrics_per_sample;
-extern uint32_t *aclk_queries_per_thread;
+extern uint32_t *legacy_aclk_queries_per_thread;
extern struct rusage *rusage_per_thread;
-void *aclk_stats_main_thread(void *ptr);
-void aclk_stats_thread_cleanup();
-void aclk_stats_upd_online(int online);
+void *legacy_aclk_stats_main_thread(void *ptr);
+void legacy_aclk_stats_thread_cleanup();
+void legacy_aclk_stats_upd_online(int online);
#endif /* NETDATA_ACLK_STATS_H */
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c
index 5ed7e66a..80ca2397 100644
--- a/aclk/legacy/agent_cloud_link.c
+++ b/aclk/legacy/agent_cloud_link.c
@@ -6,6 +6,7 @@
#include "aclk_query.h"
#include "aclk_common.h"
#include "aclk_stats.h"
+#include "../aclk_collector_list.h"
#ifdef ENABLE_ACLK
#include <libwebsockets.h>
@@ -15,46 +16,20 @@ int aclk_shutting_down = 0;
// Other global state
static int aclk_subscribed = 0;
-static int aclk_disable_single_updates = 0;
static char *aclk_username = NULL;
static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_force_reconnect = 0; // Indication from lower layers
-usec_t aclk_session_us = 0; // Used by the mqtt layer
-time_t aclk_session_sec = 0; // Used by the mqtt layer
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
-static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
-#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
-#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
-
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void aclk_lws_wss_destroy_context();
-/*
- * Maintain a list of collectors and chart count
- * If all the charts of a collector are deleted
- * then a new metadata dataset must be send to the cloud
- *
- */
-struct _collector {
- time_t created;
- uint32_t count; //chart count
- uint32_t hostname_hash;
- uint32_t plugin_hash;
- uint32_t module_hash;
- char *hostname;
- char *plugin_name;
- char *module_name;
- struct _collector *next;
-};
-
-struct _collector *collector_list = NULL;
char *create_uuid()
{
@@ -67,7 +42,7 @@ char *create_uuid()
return uuid_str;
}
-int cloud_to_agent_parse(JSON_ENTRY *e)
+int legacy_cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
@@ -247,202 +222,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size)
return final_topic;
}
-#ifndef __GNUC__
-#pragma region ACLK Internal Collector Tracking
-#endif
-
-/*
- * Free a collector structure
- */
-
-static void _free_collector(struct _collector *collector)
-{
- if (likely(collector->plugin_name))
- freez(collector->plugin_name);
-
- if (likely(collector->module_name))
- freez(collector->module_name);
-
- if (likely(collector->hostname))
- freez(collector->hostname);
-
- freez(collector);
-}
-
-/*
- * This will report the collector list
- *
- */
-#ifdef ACLK_DEBUG
-static void _dump_collector_list()
-{
- struct _collector *tmp_collector;
-
- COLLECTOR_LOCK;
-
- info("DUMPING ALL COLLECTORS");
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- info("DUMPING ALL COLLECTORS -- nothing found");
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
-
- while (tmp_collector) {
- info(
- "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
- tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
- tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
-
- tmp_collector = tmp_collector->next;
- }
- info("DUMPING ALL COLLECTORS DONE");
- COLLECTOR_UNLOCK;
-}
-#endif
-
-/*
- * This will cleanup the collector list
- *
- */
-static void _reset_collector_list()
-{
- struct _collector *tmp_collector, *next_collector;
-
- COLLECTOR_LOCK;
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
- collector_list->count = 0;
- collector_list->next = NULL;
-
- // We broke the link; we can unlock
- COLLECTOR_UNLOCK;
-
- while (tmp_collector) {
- next_collector = tmp_collector->next;
- _free_collector(tmp_collector);
- tmp_collector = next_collector;
- }
-}
-
-/*
- * Find a collector (if it exists)
- * Must lock before calling this
- * If last_collector is not null, it will return the previous collector in the linked
- * list (used in collector delete)
- */
-static struct _collector *_find_collector(
- const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
-{
- struct _collector *tmp_collector, *prev_collector;
- uint32_t plugin_hash;
- uint32_t module_hash;
- uint32_t hostname_hash;
-
- if (unlikely(!collector_list)) {
- collector_list = callocz(1, sizeof(struct _collector));
- return NULL;
- }
-
- if (unlikely(!collector_list->next))
- return NULL;
-
- plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
- module_hash = module_name ? simple_hash(module_name) : 1;
- hostname_hash = simple_hash(hostname);
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
- prev_collector = collector_list;
- while (tmp_collector) {
- if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
- hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
- (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
- (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
- if (unlikely(last_collector))
- *last_collector = prev_collector;
-
- return tmp_collector;
- }
-
- prev_collector = tmp_collector;
- tmp_collector = tmp_collector->next;
- }
-
- return tmp_collector;
-}
-
-/*
- * Called to delete a collector
- * It will reduce the count (chart_count) and will remove it
- * from the linked list if the count reaches zero
- * The structure will be returned to the caller to free
- * the resources
- *
- */
-static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector, *prev_collector = NULL;
-
- tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
-
- if (likely(tmp_collector)) {
- --tmp_collector->count;
- if (unlikely(!tmp_collector->count))
- prev_collector->next = tmp_collector->next;
- }
- return tmp_collector;
-}
-
-/*
- * Add a new collector (plugin / module) to the list
- * If it already exists just update the chart count
- *
- * Lock before calling
- */
-static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector;
-
- tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
-
- if (unlikely(!tmp_collector)) {
- tmp_collector = callocz(1, sizeof(struct _collector));
- tmp_collector->hostname_hash = simple_hash(hostname);
- tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
- tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
-
- tmp_collector->hostname = strdupz(hostname);
- tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
- tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
-
- tmp_collector->next = collector_list->next;
- collector_list->next = tmp_collector;
- }
- tmp_collector->count++;
- debug(
- D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
- module_name ? module_name : "*", tmp_collector->count);
- return tmp_collector;
-}
-
-#ifndef __GNUC__
-#pragma endregion
-#endif
-
-/* Avoids the need to scan trough all RRDHOSTS
+/* Avoids the need to scan through all RRDHOSTS
* every time any Query Thread Wakes Up
* (every time we need to check child popcorn expiry)
- * call with ACLK_SHARED_STATE_LOCK held
+ * call with legacy_aclk_shared_state_LOCK held
*/
void aclk_update_next_child_to_popcorn(void)
{
@@ -462,19 +245,19 @@ void aclk_update_next_child_to_popcorn(void)
any = 1;
- if (unlikely(!aclk_shared_state.next_popcorn_host)) {
- aclk_shared_state.next_popcorn_host = host;
+ if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) {
+ legacy_aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
continue;
}
- if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
- aclk_shared_state.next_popcorn_host = host;
+ if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
+ legacy_aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
}
if(!any)
- aclk_shared_state.next_popcorn_host = NULL;
+ legacy_aclk_shared_state.next_popcorn_host = NULL;
rrd_unlock();
}
@@ -487,7 +270,7 @@ static int aclk_popcorn_check_bump(RRDHOST *host)
{
time_t now = now_monotonic_sec();
int updated = 0, ret;
- ACLK_SHARED_STATE_LOCK;
+ legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
ret = ACLK_IS_HOST_INITIALIZING(host);
@@ -502,12 +285,12 @@ static int aclk_popcorn_check_bump(RRDHOST *host)
if (host != localhost && updated)
aclk_update_next_child_to_popcorn();
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return ret;
}
rrdhost_aclk_state_unlock(host);
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return ret;
}
@@ -523,13 +306,13 @@ static void aclk_start_host_popcorning(RRDHOST *host)
{
usec_t now = now_monotonic_sec();
info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
- ACLK_SHARED_STATE_LOCK;
+ legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
errno = 0;
error("Localhost is allowed to do popcorning only once after startup!");
rrdhost_aclk_state_unlock(host);
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return;
}
@@ -539,16 +322,16 @@ static void aclk_start_host_popcorning(RRDHOST *host)
rrdhost_aclk_state_unlock(host);
if (host != localhost)
aclk_update_next_child_to_popcorn();
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
}
static void aclk_stop_host_popcorning(RRDHOST *host)
{
- ACLK_SHARED_STATE_LOCK;
+ legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
if (!ACLK_IS_HOST_POPCORNING(host)) {
rrdhost_aclk_state_unlock(host);
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return;
}
@@ -557,18 +340,18 @@ static void aclk_stop_host_popcorning(RRDHOST *host)
host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
rrdhost_aclk_state_unlock(host);
- if(host == aclk_shared_state.next_popcorn_host) {
- aclk_shared_state.next_popcorn_host = NULL;
+ if(host == legacy_aclk_shared_state.next_popcorn_host) {
+ legacy_aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@@ -589,7 +372,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
if(aclk_popcorn_check_bump(host))
return;
- if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}
@@ -601,7 +384,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
* This function will release the memory used and schedule
* a cloud update
*/
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@@ -628,7 +411,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
if (aclk_popcorn_check_bump(host))
return;
- if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}
@@ -639,7 +422,7 @@ static void aclk_graceful_disconnect()
// Send a graceful disconnect message
BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}");
aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
buffer_free(b);
@@ -963,10 +746,10 @@ static void aclk_try_to_connect(char *hostname, int port)
aclk_connecting = 1;
create_publish_base_topic();
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.version_neg = 0;
- aclk_shared_state.version_neg_wait_till = 0;
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_LOCK;
+ legacy_aclk_shared_state.version_neg = 0;
+ legacy_aclk_shared_state.version_neg_wait_till = 0;
+ legacy_aclk_shared_state_UNLOCK;
rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password);
if (unlikely(rc)) {
@@ -981,10 +764,10 @@ static inline void aclk_hello_msg()
char *msg_id = create_uuid();
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.version_neg = 0;
- aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_LOCK;
+ legacy_aclk_shared_state.version_neg = 0;
+ legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
+ legacy_aclk_shared_state_UNLOCK;
//Hello message is versioned separately from the rest of the protocol
aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
@@ -1004,7 +787,7 @@ static inline void aclk_hello_msg()
*
* @return It always returns NULL
*/
-void *aclk_main(void *ptr)
+void *legacy_aclk_main(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_query_threads query_threads;
@@ -1065,7 +848,7 @@ void *aclk_main(void *ptr)
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
netdata_thread_create(
- stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread,
stats_thread);
}
@@ -1165,20 +948,20 @@ void *aclk_main(void *ptr)
}
if (unlikely(!query_threads.thread_list)) {
- aclk_query_threads_start(&query_threads);
+ legacy_aclk_query_threads_start(&query_threads);
}
time_t now = now_monotonic_sec();
if(aclk_connected && last_periodic_query_wakeup < now) {
- // to make `aclk_queue_query()` param `run_after` work
+ // to make `legacy_aclk_queue_query()` param `run_after` work
// also makes per child popcorning work
last_periodic_query_wakeup = now;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
}
} // forever
exited:
// Wakeup query thread to cleanup
- QUERY_THREAD_WAKEUP_ALL;
+ LEGACY_QUERY_THREAD_WAKEUP_ALL;
freez(aclk_username);
freez(aclk_password);
@@ -1192,18 +975,18 @@ exited:
if (agent_id && aclk_connected) {
freez(agent_id);
// Wakeup thread to cleanup
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
aclk_graceful_disconnect();
}
- aclk_query_threads_cleanup(&query_threads);
+ legacy_aclk_query_threads_cleanup(&query_threads);
_reset_collector_list();
freez(collector_list);
if(aclk_stats_enabled) {
netdata_thread_join(*stats_thread->thread, NULL);
- aclk_stats_thread_cleanup();
+ legacy_aclk_stats_thread_cleanup();
freez(stats_thread->thread);
freez(stats_thread);
}
@@ -1306,12 +1089,12 @@ void aclk_connect()
{
info("Connection detected (%u queued queries)", aclk_query_size());
- aclk_stats_upd_online(1);
+ legacy_aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_reconnect_delay(0);
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return;
}
@@ -1321,7 +1104,7 @@ void aclk_disconnect()
if (likely(aclk_connected))
info("Disconnect detected (%u queued queries)", aclk_query_size());
- aclk_stats_upd_online(0);
+ legacy_aclk_stats_upd_online(0);
aclk_subscribed = 0;
rrdhost_aclk_state_lock(localhost);
@@ -1372,7 +1155,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts
*/
void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
-void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
+void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1388,9 +1171,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
else
- aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
@@ -1418,7 +1201,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
* /api/v1/info
* charts
*/
-int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
+int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1433,9 +1216,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *hos
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
else
- aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
@@ -1459,14 +1242,14 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
- if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
- fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg);
+ if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg);
debug(D_ACLK, "Sending Child Disconnect");
char *msg_id = create_uuid();
- aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\"payload\":");
@@ -1486,10 +1269,10 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
return 0;
}
-void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
+void legacy_aclk_host_state_update(RRDHOST *host, int connect)
{
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
- if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
return;
#else
#warning "This check became unnecessary. Remove"
@@ -1498,19 +1281,14 @@ void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
if (unlikely(aclk_host_initializing(localhost)))
return;
- switch (cmd) {
- case ACLK_CMD_CHILD_CONNECT:
- debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
- aclk_start_host_popcorning(host);
- aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
- break;
- case ACLK_CMD_CHILD_DISCONNECT:
- debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
- aclk_stop_host_popcorning(host);
- aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
- break;
- default:
- error("Unknown command for aclk_host_state_update %d.", (int)cmd);
+ if (connect) {
+ debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
+ aclk_start_host_popcorning(host);
+ legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
+ } else {
+ debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
+ aclk_stop_host_popcorning(host);
+ legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
}
}
@@ -1537,31 +1315,21 @@ void aclk_send_stress_test(size_t size)
// or on request
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
{
- aclk_send_info_metadata(state, host);
+ legacy_aclk_send_info_metadata(state, host);
if(host == localhost)
- aclk_send_alarm_metadata(state);
+ legacy_aclk_send_alarm_metadata(state);
return 0;
}
-void aclk_single_update_disable()
-{
- aclk_disable_single_updates = 1;
-}
-
-void aclk_single_update_enable()
-{
- aclk_disable_single_updates = 0;
-}
-
// Triggered by a health reload, sends the alarm metadata
-void aclk_alarm_reload()
+void legacy_aclk_alarm_reload()
{
if (unlikely(aclk_host_initializing(localhost)))
return;
- if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue on_connect command on alarm reload");
@@ -1585,7 +1353,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
rrdset2json(st, local_buffer, NULL, NULL, 1);
@@ -1598,7 +1366,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart)
return 0;
}
-int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
+int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
#ifndef ENABLE_ACLK
UNUSED(host);
@@ -1611,7 +1379,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (!netdata_cloud_setting)
return 0;
- if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
+ if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
return 0;
if (aclk_host_initializing(localhost))
@@ -1623,7 +1391,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (aclk_popcorn_check_bump(host))
return 0;
- if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
+ if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue chart_update command");
@@ -1634,7 +1402,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
#endif
}
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
+int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer = NULL;
@@ -1661,7 +1429,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
char *msg_id = create_uuid();
buffer_flush(local_buffer);
- aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
@@ -1670,7 +1438,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
buffer_sprintf(local_buffer, "\n}");
- if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
+ if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue alarm_command on alarm_update");
@@ -1682,3 +1450,53 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
}
+
+char *legacy_aclk_state(void)
+{
+ BUFFER *wb = buffer_create(1024);
+ char *ret;
+
+ buffer_strcat(wb,
+ "ACLK Available: Yes\n"
+ "ACLK Implementation: Legacy\n"
+ "Claimed: "
+ );
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
+ buffer_strcat(wb, "No\n");
+ else {
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
+ freez(agent_id);
+ }
+
+ buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No");
+
+ ret = strdupz(buffer_tostring(wb));
+ buffer_free(wb);
+ return ret;
+}
+
+char *legacy_aclk_state_json(void)
+{
+ BUFFER *wb = buffer_create(1024);
+ char *agent_id = is_agent_claimed();
+
+ buffer_sprintf(wb,
+ "{\"aclk-available\":true,"
+ "\"aclk-implementation\":\"Legacy\","
+ "\"agent-claimed\":%s,"
+ "\"claimed-id\":",
+ agent_id ? "true" : "false"
+ );
+
+ if (agent_id) {
+ buffer_sprintf(wb, "\"%s\"", agent_id);
+ freez(agent_id);
+ } else
+ buffer_strcat(wb, "null");
+
+ buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false");
+
+ return strdupz(buffer_tostring(wb));
+}
diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h
index bfcfef8e..8954a337 100644
--- a/aclk/legacy/agent_cloud_link.h
+++ b/aclk/legacy/agent_cloud_link.h
@@ -3,11 +3,10 @@
#ifndef NETDATA_AGENT_CLOUD_LINK_H
#define NETDATA_AGENT_CLOUD_LINK_H
-#include "../../daemon/common.h"
+#include "daemon/common.h"
#include "mqtt.h"
#include "aclk_common.h"
-#define ACLK_THREAD_NAME "ACLK_Query"
#define ACLK_CHART_TOPIC "outbound/meta"
#define ACLK_ALARMS_TOPIC "outbound/alarms"
#define ACLK_METADATA_TOPIC "outbound/meta"
@@ -18,7 +17,6 @@
#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg)
#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds
-#define ACLK_QOS 1
#define ACLK_PING_INTERVAL 60
#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop
@@ -42,16 +40,7 @@ struct aclk_request {
typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION;
-void *aclk_main(void *ptr);
-
-#define NETDATA_ACLK_HOOK \
- { .name = "ACLK_Main", \
- .config_section = NULL, \
- .config_name = NULL, \
- .enabled = 1, \
- .thread = NULL, \
- .init_routine = NULL, \
- .start_routine = aclk_main },
+void *legacy_aclk_main(void *ptr);
extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id);
@@ -62,32 +51,35 @@ char *create_uuid();
// callbacks for agent cloud link
int aclk_subscribe(char *topic, int qos);
-int cloud_to_agent_parse(JSON_ENTRY *e);
+int legacy_cloud_to_agent_parse(JSON_ENTRY *e);
void aclk_disconnect();
void aclk_connect();
+#ifdef ENABLE_ACLK
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host);
-int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
-void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
+int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
+void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
int aclk_wait_for_initialization();
char *create_publish_base_topic();
int aclk_send_single_chart(RRDHOST *host, char *chart);
-int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
+int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
+int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version);
-int aclk_handle_cloud_message(char *payload);
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void aclk_alarm_reload();
+int legacy_aclk_handle_cloud_message(char *payload);
+void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void legacy_aclk_alarm_reload(void);
unsigned long int aclk_reconnect_delay(int mode);
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
-void aclk_single_update_enable();
-void aclk_single_update_disable();
-void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd);
+void legacy_aclk_host_state_update(RRDHOST *host, int connect);
int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd);
void aclk_update_next_child_to_popcorn(void);
+char *legacy_aclk_state(void);
+char *legacy_aclk_state_json(void);
+#endif
+
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c
index 74f77455..0e4bb2ec 100644
--- a/aclk/legacy/mqtt.c
+++ b/aclk/legacy/mqtt.c
@@ -1,12 +1,16 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include <libnetdata/json/json.h>
-#include "../../daemon/common.h"
+#include "daemon/common.h"
#include "mqtt.h"
#include "aclk_lws_wss_client.h"
#include "aclk_stats.h"
#include "aclk_rx_msgs.h"
+#include "agent_cloud_link.h"
+
+#define ACLK_QOS 1
+
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
@@ -27,7 +31,7 @@ void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosqu
UNUSED(mosq);
UNUSED(obj);
- aclk_handle_cloud_message(msg->payload);
+ legacy_aclk_handle_cloud_message(msg->payload);
}
void publish_callback(struct mosquitto *mosq, void *obj, int rc)
@@ -44,7 +48,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc)
info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff);
- aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff);
+ legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.latency, diff);
#endif
return;
}
diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h
index cc4765d6..98d599f5 100644
--- a/aclk/legacy/mqtt.h
+++ b/aclk/legacy/mqtt.h
@@ -19,7 +19,7 @@ const char *_link_strerror(int rc);
int _link_set_lwt(char *topic, int qos);
-int aclk_handle_cloud_message(char *);
+int legacy_aclk_handle_cloud_message(char *);
extern char *get_topic(char *sub_topic, char *final_topic, int max_size);
#endif //NETDATA_MQTT_H