summaryrefslogtreecommitdiffstats
path: root/aclk/legacy
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/legacy/Makefile.am19
-rw-r--r--aclk/legacy/aclk_common.c236
-rw-r--r--aclk/legacy/aclk_common.h70
-rw-r--r--aclk/legacy/aclk_lws_https_client.c246
-rw-r--r--aclk/legacy/aclk_lws_https_client.h18
-rw-r--r--aclk/legacy/aclk_lws_wss_client.c613
-rw-r--r--aclk/legacy/aclk_lws_wss_client.h92
-rw-r--r--aclk/legacy/aclk_query.c789
-rw-r--r--aclk/legacy/aclk_query.h40
-rw-r--r--aclk/legacy/aclk_rrdhost_state.h42
-rw-r--r--aclk/legacy/aclk_rx_msgs.c365
-rw-r--r--aclk/legacy/aclk_rx_msgs.h13
-rw-r--r--aclk/legacy/aclk_stats.c298
-rw-r--r--aclk/legacy/aclk_stats.h91
-rw-r--r--aclk/legacy/agent_cloud_link.c1683
-rw-r--r--aclk/legacy/agent_cloud_link.h93
-rw-r--r--aclk/legacy/mqtt.c366
-rw-r--r--aclk/legacy/mqtt.h25
-rw-r--r--aclk/legacy/tests/fake-charts.d.plugin24
-rw-r--r--aclk/legacy/tests/install-fake-charts.d.sh.in6
-rwxr-xr-xaclk/legacy/tests/launch-paho.sh4
-rw-r--r--aclk/legacy/tests/paho-inspection.py59
-rw-r--r--aclk/legacy/tests/paho.Dockerfile14
23 files changed, 5206 insertions, 0 deletions
diff --git a/aclk/legacy/Makefile.am b/aclk/legacy/Makefile.am
new file mode 100644
index 000000000..1cd876b40
--- /dev/null
+++ b/aclk/legacy/Makefile.am
@@ -0,0 +1,19 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+CLEANFILES = \
+ tests/install-fake-charts.d.sh \
+ $(NULL)
+
+include $(top_srcdir)/build/subst.inc
+SUFFIXES = .in
+
+#sbin_SCRIPTS = \
+# tests/install-fake-charts.d.sh \
+# $(NULL)
+
+dist_noinst_SCRIPTS = tests/install-fake-charts.d.sh
+dist_noinst_DATA = tests/install-fake-charts.d.sh.in
+
diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c
new file mode 100644
index 000000000..7c8421a93
--- /dev/null
+++ b/aclk/legacy/aclk_common.c
@@ -0,0 +1,236 @@
+#include "aclk_common.h"
+
+#include "../../daemon/common.h"
+
+#ifdef ENABLE_ACLK
+#include <libwebsockets.h>
+#endif
+
+netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+
+int aclk_disable_runtime = 0;
+int aclk_kill_link = 0;
+
+struct aclk_shared_state aclk_shared_state = {
+ .version_neg = 0,
+ .version_neg_wait_till = 0
+};
+
+struct {
+ ACLK_PROXY_TYPE type;
+ const char *url_str;
+} supported_proxy_types[] = {
+ { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
+};
+
+const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
+{
+ switch (*type) {
+ case PROXY_DISABLED:
+ return "disabled";
+ case PROXY_TYPE_HTTP:
+ return "HTTP";
+ case PROXY_TYPE_SOCKS5:
+ return "SOCKS";
+ default:
+ return "Unknown";
+ }
+}
+
+static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
+{
+ int i = 0;
+ while (supported_proxy_types[i].url_str) {
+ if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
+ return supported_proxy_types[i].type;
+ i++;
+ }
+ return PROXY_TYPE_UNKNOWN;
+}
+
+ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
+{
+ if (!string)
+ return PROXY_TYPE_UNKNOWN;
+
+ while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove)
+ string++;
+
+ if (!*string)
+ return PROXY_TYPE_UNKNOWN;
+
+ return aclk_find_proxy(string);
+}
+
+// helper function to censor user&password
+// for logging purposes
+void safe_log_proxy_censor(char *proxy)
+{
+ size_t length = strlen(proxy);
+ char *auth = proxy + length - 1;
+ char *cur;
+
+ while ((auth >= proxy) && (*auth != '@'))
+ auth--;
+
+ //if not found or @ is first char do nothing
+ if (auth <= proxy)
+ return;
+
+ cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+ if (!cur)
+ cur = proxy;
+ else
+ cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+
+ while (cur < auth) {
+ *cur = 'X';
+ cur++;
+ }
+}
+
+static inline void safe_log_proxy_error(char *str, const char *proxy)
+{
+ char *log = strdupz(proxy);
+ safe_log_proxy_censor(log);
+ error("%s Provided Value:\"%s\"", str, log);
+ freez(log);
+}
+
+static inline int check_socks_enviroment(const char **proxy)
+{
+ char *tmp = getenv("socks_proxy");
+
+ if (!tmp)
+ return 1;
+
+ if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
+ *proxy = tmp;
+ return 0;
+ }
+
+ safe_log_proxy_error(
+ "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
+ tmp);
+ return 1;
+}
+
+static inline int check_http_enviroment(const char **proxy)
+{
+ char *tmp = getenv("http_proxy");
+
+ if (!tmp)
+ return 1;
+
+ if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
+ *proxy = tmp;
+ return 0;
+ }
+
+ safe_log_proxy_error(
+ "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
+ tmp);
+ return 1;
+}
+
+const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
+{
+ const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
+ *type = PROXY_DISABLED;
+
+ if (strcmp(proxy, "none") == 0)
+ return proxy;
+
+ if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
+ if (check_socks_enviroment(&proxy) == 0) {
+#ifdef LWS_WITH_SOCKS5
+ *type = PROXY_TYPE_SOCKS5;
+ return proxy;
+#else
+ safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
+ "but Libwebsockets used doesn't have SOCKS5 support built in. "
+ "Ignoring and checking for other options.",
+ proxy);
+#endif
+ }
+ if (check_http_enviroment(&proxy) == 0)
+ *type = PROXY_TYPE_HTTP;
+ return proxy;
+ }
+
+ *type = aclk_verify_proxy(proxy);
+#ifndef LWS_WITH_SOCKS5
+ if (*type == PROXY_TYPE_SOCKS5) {
+ safe_log_proxy_error(
+ "Config var \"" ACLK_PROXY_CONFIG_VAR
+ "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
+ proxy);
+ }
+#endif
+ if (*type == PROXY_TYPE_UNKNOWN) {
+ *type = PROXY_DISABLED;
+ safe_log_proxy_error(
+ "Config var \"" ACLK_PROXY_CONFIG_VAR
+ "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
+ proxy);
+ }
+
+ return proxy;
+}
+
+// helper function to read settings only once (static)
+// as claiming, challenge/response and ACLK
+// read the same thing, no need to parse again
+const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
+{
+ static const char *proxy = NULL;
+ static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
+
+ if (proxy_type == PROXY_NOT_SET)
+ proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
+
+ *type = proxy_type;
+ return proxy;
+}
+
+int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
+{
+ int pos = 0;
+ if (!strncmp("https://", url, 8)) {
+ pos = 8;
+ } else if (!strncmp("http://", url, 7)) {
+ error("Cannot connect ACLK over %s -> unencrypted link is not supported", url);
+ return 1;
+ }
+ int host_end = pos;
+ while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':')
+ host_end++;
+ if (url[host_end] == 0) {
+ *aclk_hostname = strdupz(url + pos);
+ *aclk_port = 443;
+ info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
+ return 0;
+ }
+ if (url[host_end] == ':') {
+ *aclk_hostname = callocz(host_end - pos + 1, 1);
+ strncpy(*aclk_hostname, url + pos, host_end - pos);
+ int port_end = host_end + 1;
+ while (url[port_end] >= '0' && url[port_end] <= '9')
+ port_end++;
+ if (port_end - host_end > 6) {
+ error("Port specified in %s is invalid", url);
+ return 0;
+ }
+ *aclk_port = atoi(&url[host_end+1]);
+ }
+ if (url[host_end] == '/') {
+ *aclk_port = 443;
+ *aclk_hostname = callocz(1, host_end - pos + 1);
+ strncpy(*aclk_hostname, url+pos, host_end - pos);
+ }
+ info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
+ return 0;
+}
diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h
new file mode 100644
index 000000000..2dc0aa553
--- /dev/null
+++ b/aclk/legacy/aclk_common.h
@@ -0,0 +1,70 @@
+#ifndef ACLK_COMMON_H
+#define ACLK_COMMON_H
+
+#include "aclk_rrdhost_state.h"
+#include "../../daemon/common.h"
+
+extern netdata_mutex_t aclk_shared_state_mutex;
+#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+
+// minimum and maximum supported version of ACLK
+// in this version of agent
+#define ACLK_VERSION_MIN 2
+#define ACLK_VERSION_MAX 3
+
+// Version negotiation messages have they own versioning
+// this is also used for LWT message as we set that up
+// before version negotiation
+#define ACLK_VERSION_NEG_VERSION 1
+
+// Maximum time to wait for version negotiation before aborting
+// and defaulting to oldest supported version
+#define VERSION_NEG_TIMEOUT 3
+
+#if ACLK_VERSION_MIN > ACLK_VERSION_MAX
+#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
+#endif
+
+// Define ACLK Feature Version Boundaries Here
+#define ACLK_V_COMPRESSION 2
+#define ACLK_V_CHILDRENSTATE 3
+
+#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING)
+#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update)
+
+extern struct aclk_shared_state {
+ // optimization to avoid looping trough hosts
+ // every time Query Thread wakes up
+ RRDHOST *next_popcorn_host;
+
+ // read only while ACLK connected
+ // protect by lock otherwise
+ int version_neg;
+ usec_t version_neg_wait_till;
+} aclk_shared_state;
+
+typedef enum aclk_proxy_type {
+ PROXY_TYPE_UNKNOWN = 0,
+ PROXY_TYPE_SOCKS5,
+ PROXY_TYPE_HTTP,
+ PROXY_DISABLED,
+ PROXY_NOT_SET,
+} ACLK_PROXY_TYPE;
+
+extern int aclk_kill_link; // Tells the agent to tear down the link
+extern int aclk_disable_runtime;
+
+const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
+
+#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
+#define ACLK_PROXY_ENV "env"
+#define ACLK_PROXY_CONFIG_VAR "proxy"
+
+ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
+const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
+void safe_log_proxy_censor(char *proxy);
+int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
+const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
+
+#endif //ACLK_COMMON_H
diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c
new file mode 100644
index 000000000..c1856ed2c
--- /dev/null
+++ b/aclk/legacy/aclk_lws_https_client.c
@@ -0,0 +1,246 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define ACLK_LWS_HTTPS_CLIENT_INTERNAL
+#include "aclk_lws_https_client.h"
+
+#include "aclk_common.h"
+
+#include "aclk_lws_wss_client.h"
+
+#define SMALL_BUFFER 16
+
+struct simple_hcc_data {
+ char *data;
+ size_t data_size;
+ size_t written;
+ char lws_work_buffer[1024 + LWS_PRE];
+ char *payload;
+ int response_code;
+ int done;
+};
+
+static int simple_https_client_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
+{
+ UNUSED(user);
+ int n;
+ char *ptr;
+ char buffer[SMALL_BUFFER];
+ struct simple_hcc_data *perconn_data = lws_get_opaque_user_data(wsi);
+
+ switch (reason) {
+ case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ:
+ debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ");
+ if (perconn_data->data_size - 1 - perconn_data->written < len)
+ return 1;
+ memcpy(&perconn_data->data[perconn_data->written], in, len);
+ perconn_data->written += len;
+ return 0;
+ case LWS_CALLBACK_RECEIVE_CLIENT_HTTP:
+ debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP");
+ if(!perconn_data) {
+ error("Missing Per Connect Data");
+ return -1;
+ }
+ n = sizeof(perconn_data->lws_work_buffer) - LWS_PRE;
+ ptr = perconn_data->lws_work_buffer + LWS_PRE;
+ if (lws_http_client_read(wsi, &ptr, &n) < 0)
+ return -1;
+ perconn_data->data[perconn_data->written] = '\0';
+ return 0;
+ case LWS_CALLBACK_WSI_DESTROY:
+ debug(D_ACLK, "LWS_CALLBACK_WSI_DESTROY");
+ if(perconn_data)
+ perconn_data->done = 1;
+ return 0;
+ case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
+ debug(D_ACLK, "LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP");
+ if(perconn_data)
+ perconn_data->response_code = lws_http_client_http_response(wsi);
+ return 0;
+ case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
+ debug(D_ACLK, "LWS_CALLBACK_CLOSED_CLIENT_HTTP");
+ return 0;
+ case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
+ debug(D_ACLK, "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS");
+ return 0;
+ case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
+ debug(D_ACLK, "LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER");
+ if(perconn_data && perconn_data->payload) {
+ unsigned char **p = (unsigned char **)in, *end = (*p) + len;
+ snprintfz(buffer, SMALL_BUFFER, "%zu", strlen(perconn_data->payload));
+ if (lws_add_http_header_by_token(wsi,
+ WSI_TOKEN_HTTP_CONTENT_LENGTH,
+ (unsigned char *)buffer, strlen(buffer), p, end))
+ return -1;
+ if (lws_add_http_header_by_token(wsi,
+ WSI_TOKEN_HTTP_CONTENT_TYPE,
+ (unsigned char *)ACLK_CONTENT_TYPE_JSON,
+ strlen(ACLK_CONTENT_TYPE_JSON), p, end))
+ return -1;
+ lws_client_http_body_pending(wsi, 1);
+ lws_callback_on_writable(wsi);
+ }
+ return 0;
+ case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE:
+ debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_WRITEABLE");
+ if(perconn_data && perconn_data->payload) {
+ n = strlen(perconn_data->payload);
+ if(perconn_data->data_size < (size_t)LWS_PRE + n + 1) {
+ error("Buffer given is not big enough");
+ return 1;
+ }
+
+ memcpy(&perconn_data->data[LWS_PRE], perconn_data->payload, n);
+ if(n != lws_write(wsi, (unsigned char*)&perconn_data->data[LWS_PRE], n, LWS_WRITE_HTTP)) {
+ error("lws_write error");
+ perconn_data->data[0] = 0;
+ return 1;
+ }
+ lws_client_http_body_pending(wsi, 0);
+ // clean for subsequent reply read
+ perconn_data->data[0] = 0;
+ }
+ return 0;
+ case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
+ debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL");
+ return 0;
+ case LWS_CALLBACK_WSI_CREATE:
+ debug(D_ACLK, "LWS_CALLBACK_WSI_CREATE");
+ return 0;
+ case LWS_CALLBACK_PROTOCOL_INIT:
+ debug(D_ACLK, "LWS_CALLBACK_PROTOCOL_INIT");
+ return 0;
+ case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
+ debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL");
+ return 0;
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ debug(D_ACLK, "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED");
+ return 0;
+ case LWS_CALLBACK_GET_THREAD_ID:
+ debug(D_ACLK, "LWS_CALLBACK_GET_THREAD_ID");
+ return 0;
+ case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
+ debug(D_ACLK, "LWS_CALLBACK_EVENT_WAIT_CANCELLED");
+ return 0;
+ case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
+ debug(D_ACLK, "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION");
+ return 0;
+ case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
+ debug(D_ACLK, "LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH");
+ return 0;
+ default:
+ debug(D_ACLK, "Unknown callback %d", (int)reason);
+ return 0;
+ }
+}
+
+static const struct lws_protocols protocols[] = {
+ {
+ "http",
+ simple_https_client_callback,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0
+ },
+ { NULL, NULL, 0, 0, 0, 0, 0 }
+};
+
+static void simple_hcc_log_divert(int level, const char *line)
+{
+ UNUSED(level);
+ error("Libwebsockets: %s", line);
+}
+
+int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload)
+{
+ info("%s %s", __func__, method);
+
+ struct lws_context_creation_info info;
+ struct lws_client_connect_info i;
+ struct lws_context *context;
+
+ struct simple_hcc_data *data = callocz(1, sizeof(struct simple_hcc_data));
+ data->data = b;
+ data->data[0] = 0;
+ data->data_size = b_size;
+ data->payload = payload;
+
+ int n = 0;
+ time_t timestamp;
+
+ struct lws_vhost *vhost;
+
+ memset(&info, 0, sizeof info);
+
+ info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ info.port = CONTEXT_PORT_NO_LISTEN;
+ info.protocols = protocols;
+
+
+ context = lws_create_context(&info);
+ if (!context) {
+ error("Error creating LWS context");
+ freez(data);
+ return 1;
+ }
+
+ lws_set_log_level(LLL_ERR | LLL_WARN, simple_hcc_log_divert);
+
+ lws_service(context, 0);
+
+ memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
+ i.context = context;
+
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+ i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE;
+ info("Disabling SSL certificate checks");
+#else
+ i.ssl_connection = LCCSCF_USE_SSL;
+#endif
+#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0
+#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM.
+ i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
+#endif
+
+ i.port = port;
+ i.address = host;
+ i.path = url;
+
+ i.host = i.address;
+ i.origin = i.address;
+ i.method = method;
+ i.opaque_user_data = data;
+ i.alpn = "http/1.1";
+
+ i.protocol = protocols[0].name;
+
+ vhost = lws_get_vhost_by_name(context, "default");
+ if(!vhost)
+ fatal("Could not find the default LWS vhost.");
+
+ //set up proxy
+ aclk_wss_set_proxy(vhost);
+
+ lws_client_connect_via_info(&i);
+
+ // libwebsockets handle connection timeouts already
+ // this adds additional safety in case of bug in LWS
+ timestamp = now_monotonic_sec();
+ while( n >= 0 && !data->done && !netdata_exit) {
+ n = lws_service(context, 0);
+ if( now_monotonic_sec() - timestamp > SEND_HTTPS_REQUEST_TIMEOUT ) {
+ data->data[0] = 0;
+ data->done = 1;
+ error("Servicing LWS took too long.");
+ }
+ }
+
+ lws_context_destroy(context);
+
+ n = data->response_code;
+
+ freez(data);
+ return (n < 200 || n >= 300);
+}
diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h
new file mode 100644
index 000000000..811809dd1
--- /dev/null
+++ b/aclk/legacy/aclk_lws_https_client.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_LWS_HTTPS_CLIENT_H
+#define NETDATA_LWS_HTTPS_CLIENT_H
+
+#include "../../daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+#define DATAMAXLEN 1024*16
+
+#ifdef ACLK_LWS_HTTPS_CLIENT_INTERNAL
+#define ACLK_CONTENT_TYPE_JSON "application/json"
+#define SEND_HTTPS_REQUEST_TIMEOUT 30
+#endif
+
+int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload);
+
+#endif /* NETDATA_LWS_HTTPS_CLIENT_H */
diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c
new file mode 100644
index 000000000..2e6fd4ec8
--- /dev/null
+++ b/aclk/legacy/aclk_lws_wss_client.c
@@ -0,0 +1,613 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_lws_wss_client.h"
+
+#include "libnetdata/libnetdata.h"
+#include "../../daemon/common.h"
+#include "aclk_common.h"
+#include "aclk_stats.h"
+
+extern int aclk_shutting_down;
+
+static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
+
+struct aclk_lws_wss_perconnect_data {
+ int todo;
+};
+
+static struct aclk_lws_wss_engine_instance *engine_instance = NULL;
+
+void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len)
+{
+ if (write_len != NULL && write_len_bytes != NULL)
+ {
+ *write_len = 0;
+ *write_len_bytes = 0;
+ if (engine_instance != NULL)
+ {
+ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
+
+ struct lws_wss_packet_buffer *write_b;
+ size_t w,wb;
+ for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
+ {
+ w++;
+ wb += write_b->data_size - write_b->written;
+ }
+ *write_len = w;
+ *write_len_bytes = wb;
+ aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
+ }
+ }
+ else if (write_len != NULL)
+ {
+ *write_len = 0;
+ if (engine_instance != NULL)
+ {
+ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
+
+ struct lws_wss_packet_buffer *write_b;
+ size_t w;
+ for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
+ w++;
+ *write_len = w;
+ aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
+ }
+ }
+ if (read_len != NULL)
+ {
+ *read_len = 0;
+ if (engine_instance != NULL)
+ {
+ aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
+ *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
+ aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
+ }
+ }
+}
+
+static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size)
+{
+ struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
+ if (data) {
+ new->data = mallocz(LWS_PRE + size);
+ memcpy(new->data + LWS_PRE, data, size);
+ new->data_size = size;
+ new->written = 0;
+ }
+ return new;
+}
+
+static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item)
+{
+ struct lws_wss_packet_buffer *tail = *list;
+ if (!*list) {
+ *list = item;
+ return;
+ }
+ while (tail->next) {
+ tail = tail->next;
+ }
+ tail->next = item;
+}
+
+static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list)
+{
+ struct lws_wss_packet_buffer *ret = *list;
+ if (ret != NULL)
+ *list = ret->next;
+
+ return ret;
+}
+
+static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item)
+{
+ freez(item->data);
+ freez(item);
+}
+
+static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer)
+{
+ size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL);
+ if (elems > 0)
+ lws_ring_consume(ringbuffer, NULL, NULL, elems);
+}
+
+static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list)
+{
+ struct lws_wss_packet_buffer *i;
+ while ((i = lws_wss_packet_buffer_pop(list)) != NULL) {
+ lws_wss_packet_buffer_free(i);
+ }
+ *list = NULL;
+}
+
+static inline void aclk_lws_wss_clear_io_buffers()
+{
+ aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
+ _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer);
+ aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
+ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
+ _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head);
+ aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
+}
+
+static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback,
+ sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 },
+ { NULL, NULL, 0, 0, 0, 0, 0 } };
+
+static void aclk_lws_wss_log_divert(int level, const char *line)
+{
+ switch (level) {
+ case LLL_ERR:
+ error("Libwebsockets Error: %s", line);
+ break;
+ case LLL_WARN:
+ debug(D_ACLK, "Libwebsockets Warn: %s", line);
+ break;
+ default:
+ error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line);
+ }
+}
+
+static int aclk_lws_wss_client_init( char *target_hostname, int target_port)
+{
+ static int lws_logging_initialized = 0;
+
+ if (unlikely(!lws_logging_initialized)) {
+ lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert);
+ lws_logging_initialized = 1;
+ }
+
+ if (!target_hostname)
+ return 1;
+
+ engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance));
+
+ engine_instance->host = target_hostname;
+ engine_instance->port = target_port;
+
+
+ aclk_lws_mutex_init(&engine_instance->write_buf_mutex);
+ aclk_lws_mutex_init(&engine_instance->read_buf_mutex);
+
+ engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL);
+ if (!engine_instance->read_ringbuffer)
+ goto failure_cleanup;
+
+ return 0;
+
+failure_cleanup:
+ freez(engine_instance);
+ return 1;
+}
+
+void aclk_lws_wss_destroy_context()
+{
+ if (!engine_instance)
+ return;
+ if (!engine_instance->lws_context)
+ return;
+ lws_context_destroy(engine_instance->lws_context);
+ engine_instance->lws_context = NULL;
+}
+
+
+void aclk_lws_wss_client_destroy()
+{
+ if (engine_instance == NULL)
+ return;
+
+ aclk_lws_wss_destroy_context();
+ engine_instance->lws_wsi = NULL;
+
+ aclk_lws_wss_clear_io_buffers();
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+ pthread_mutex_destroy(&engine_instance->write_buf_mutex);
+ pthread_mutex_destroy(&engine_instance->read_buf_mutex);
+#endif
+}
+
+#ifdef LWS_WITH_SOCKS5
+static int aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks)
+{
+ char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+
+ if (!proxy)
+ return -1;
+
+ proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+
+ if (!*proxy)
+ return -1;
+
+ return lws_set_socks(vhost, proxy);
+}
+#endif
+
+void aclk_wss_set_proxy(struct lws_vhost *vhost)
+{
+ const char *proxy;
+ ACLK_PROXY_TYPE proxy_type;
+ char *log;
+
+ proxy = aclk_get_proxy(&proxy_type);
+
+#ifdef LWS_WITH_SOCKS5
+ lws_set_socks(vhost, ":");
+#endif
+ lws_set_proxy(vhost, ":");
+
+ if (proxy_type == PROXY_TYPE_UNKNOWN) {
+ error("Unknown proxy type");
+ return;
+ }
+
+ if (proxy_type == PROXY_TYPE_SOCKS5 || proxy_type == PROXY_TYPE_HTTP) {
+ log = strdupz(proxy);
+ safe_log_proxy_censor(log);
+ info("Connecting using %s proxy:\"%s\"", aclk_proxy_type_to_s(&proxy_type), log);
+ freez(log);
+ }
+ if (proxy_type == PROXY_TYPE_SOCKS5) {
+#ifdef LWS_WITH_SOCKS5
+ if (aclk_wss_set_socks(vhost, proxy))
+ error("LWS failed to accept socks proxy.");
+ return;
+#else
+ fatal("We have no SOCKS5 support but we made it here. Programming error!");
+#endif
+ }
+ if (proxy_type == PROXY_TYPE_HTTP) {
+ if (lws_set_proxy(vhost, proxy))
+ error("LWS failed to accept http proxy.");
+ return;
+ }
+ if (proxy_type != PROXY_DISABLED)
+ error("Unknown proxy type");
+}
+
+// Return code indicates if connection attempt has started async.
+int aclk_lws_wss_connect(char *host, int port)
+{
+ struct lws_client_connect_info i;
+ struct lws_vhost *vhost;
+ int n;
+
+ if (!engine_instance) {
+ if (aclk_lws_wss_client_init(host, port))
+ return 1; // Propagate failure
+ }
+
+ if (!engine_instance->lws_context)
+ {
+ // First time through (on this connection), create the context
+ struct lws_context_creation_info info;
+ memset(&info, 0, sizeof(struct lws_context_creation_info));
+ info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+ info.port = CONTEXT_PORT_NO_LISTEN;
+ info.protocols = protocols;
+ engine_instance->lws_context = lws_create_context(&info);
+ if (!engine_instance->lws_context)
+ {
+ error("Failed to create lws_context, ACLK will not function");
+ return 1;
+ }
+ return 0;
+ // PROTOCOL_INIT callback will call again.
+ }
+
+ for (n = 0; n < ACLK_LWS_CALLBACK_HISTORY; n++)
+ engine_instance->lws_callback_history[n] = 0;
+
+ if (engine_instance->lws_wsi) {
+ error("Already Connected. Only one connection supported at a time.");
+ return 0;
+ }
+
+ memset(&i, 0, sizeof(i));
+ i.context = engine_instance->lws_context;
+ i.port = engine_instance->port;
+ i.address = engine_instance->host;
+ i.path = "/mqtt";
+ i.host = engine_instance->host;
+ i.protocol = "mqtt";
+
+ // from LWS docu:
+ // If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is
+ // created; you're expected to create your own vhosts afterwards using
+ // lws_create_vhost(). Otherwise a vhost named "default" is also created
+ // using the information in the vhost-related members, for compatibility.
+ vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default");
+ if(!vhost)
+ fatal("Could not find the default LWS vhost.");
+
+ aclk_wss_set_proxy(vhost);
+
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+ i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE;
+ info("Disabling SSL certificate checks");
+#else
+ i.ssl_connection = LCCSCF_USE_SSL;
+#endif
+#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0
+#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM.
+ i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
+#endif
+ lws_client_connect_via_info(&i);
+ return 0;
+}
+
+static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len)
+{
+ if (lws_ring_insert(buffer, data, len) != len) {
+ error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding.");
+ return 0;
+ }
+ return 1;
+}
+
+static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
+{
+ switch (reason) {
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ return "LWS_CALLBACK_CLIENT_WRITEABLE";
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ return "LWS_CALLBACK_CLIENT_RECEIVE";
+ case LWS_CALLBACK_PROTOCOL_INIT:
+ return "LWS_CALLBACK_PROTOCOL_INIT";
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED";
+ case LWS_CALLBACK_USER:
+ return "LWS_CALLBACK_USER";
+ case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
+ return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR";
+ case LWS_CALLBACK_CLIENT_CLOSED:
+ return "LWS_CALLBACK_CLIENT_CLOSED";
+ case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
+ return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE";
+ case LWS_CALLBACK_WSI_DESTROY:
+ return "LWS_CALLBACK_WSI_DESTROY";
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ return "LWS_CALLBACK_CLIENT_ESTABLISHED";
+ case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
+ return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION";
+ case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
+ return "LWS_CALLBACK_EVENT_WAIT_CANCELLED";
+ default:
+ // Not using an internal buffer here for thread-safety with unknown calling context.
+ error("Unknown LWS callback %u", reason);
+ return "unknown";
+ }
+}
+
+void aclk_lws_wss_fail_report()
+{
+ int i;
+ int anything_to_send = 0;
+ BUFFER *buf;
+
+ if (netdata_anonymous_statistics_enabled <= 0)
+ return;
+
+ // guess - most of the callback will be 1-99 + ',' + \0
+ buf = buffer_create((ACLK_LWS_CALLBACK_HISTORY * 2) + 10);
+
+ for (i = 0; i < ACLK_LWS_CALLBACK_HISTORY; i++)
+ if (engine_instance->lws_callback_history[i]) {
+ buffer_sprintf(buf, "%s%d", (i ? "," : ""), engine_instance->lws_callback_history[i]);
+ anything_to_send = 1;
+ }
+
+ if (anything_to_send)
+ send_statistics("ACLK_CONN_FAIL", "FAIL", buffer_tostring(buf));
+
+ buffer_free(buf);
+}
+
+static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
+{
+ UNUSED(user);
+ struct lws_wss_packet_buffer *data;
+ int retval = 0;
+ static int lws_shutting_down = 0;
+ int i;
+
+ for (i = ACLK_LWS_CALLBACK_HISTORY - 1; i > 0; i--)
+ engine_instance->lws_callback_history[i] = engine_instance->lws_callback_history[i - 1];
+ engine_instance->lws_callback_history[0] = (int)reason;
+
+ if (unlikely(aclk_shutting_down && !lws_shutting_down)) {
+ lws_shutting_down = 1;
+ retval = -1;
+ engine_instance->upstream_reconnect_request = 0;
+ }
+
+ // Callback servicing is forced when we are closed from above.
+ if (engine_instance->upstream_reconnect_request) {
+ error("Closing lws connectino due to libmosquitto error.");
+ char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
+ lws_close_reason(
+ wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error,
+ strlen(upstream_connection_error));
+ retval = -1;
+ engine_instance->upstream_reconnect_request = 0;
+ }
+
+ // Don't log to info - volume is proportional to message flow on ACLK.
+ switch (reason) {
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
+ data = engine_instance->write_buffer_head;
+ if (likely(data)) {
+ size_t bytes_left = data->data_size - data->written;
+ if ( bytes_left > FRAGMENT_SIZE)
+ bytes_left = FRAGMENT_SIZE;
+ int n = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY);
+ if (n>=0) {
+ data->written += n;
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.write_q_consumed += n;
+ ACLK_STATS_UNLOCK;
+ }
+ }
+ //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc);
+ if (data->written == data->data_size)
+ {
+ lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
+ lws_wss_packet_buffer_free(data);
+ }
+ if (engine_instance->write_buffer_head)
+ lws_callback_on_writable(engine_instance->lws_wsi);
+ }
+ aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
+ return retval;
+
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
+ if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len))
+ retval = 1;
+ aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.read_q_added += len;
+ ACLK_STATS_UNLOCK;
+ }
+
+ // to future myself -> do not call this while read lock is active as it will eventually
+ // want to acquire same lock later in aclk_lws_wss_client_read() function
+ aclk_lws_connection_data_received();
+ return retval;
+
+ case LWS_CALLBACK_WSI_CREATE:
+ case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
+ case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
+ case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
+ case LWS_CALLBACK_GET_THREAD_ID: // ?
+ case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
+ case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
+ // Expected and safe to ignore.
+ debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason));
+ return retval;
+
+ default:
+ // Pass to next switch, this case removes compiler warnings.
+ break;
+ }
+ // Log to info - volume is proportional to connection attempts.
+ info("Processing callback %s", aclk_lws_callback_name(reason));
+ switch (reason) {
+ case LWS_CALLBACK_PROTOCOL_INIT:
+ aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection
+ break;
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi)
+ error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi);
+ engine_instance->lws_wsi = wsi;
+ break;
+ case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
+ error(
+ "Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host,
+ engine_instance->port, (in ? (char *)in : "not given"));
+ // Fall-through
+ case LWS_CALLBACK_CLIENT_CLOSED:
+ case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
+ engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback
+ aclk_lws_connection_closed();
+ return -1; // the callback response is ignored, hope the above remains true
+ case LWS_CALLBACK_WSI_DESTROY:
+ aclk_lws_wss_clear_io_buffers();
+ if (!engine_instance->websocket_connection_up)
+ aclk_lws_wss_fail_report();
+ engine_instance->lws_wsi = NULL;
+ engine_instance->websocket_connection_up = 0;
+ aclk_lws_connection_closed();
+ break;
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ engine_instance->websocket_connection_up = 1;
+ aclk_lws_connection_established(engine_instance->host, engine_instance->port);
+ break;
+
+ default:
+ error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason));
+ break;
+ }
+ return retval; //0-OK, other connection should be closed!
+}
+
+int aclk_lws_wss_client_write(void *buf, size_t count)
+{
+ if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
+ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
+ lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count));
+ aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.write_q_added += count;
+ ACLK_STATS_UNLOCK;
+ }
+
+ lws_callback_on_writable(engine_instance->lws_wsi);
+ return count;
+ }
+ return 0;
+}
+
+int aclk_lws_wss_client_read(void *buf, size_t count)
+{
+ size_t data_to_be_read = count;
+
+ aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
+ size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
+ if (unlikely(readable_byte_count == 0)) {
+ errno = EAGAIN;
+ data_to_be_read = -1;
+ goto abort;
+ }
+
+ if (readable_byte_count < data_to_be_read)
+ data_to_be_read = readable_byte_count;
+
+ data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read);
+ if (data_to_be_read == readable_byte_count)
+ engine_instance->data_to_read = 0;
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.read_q_consumed += data_to_be_read;
+ ACLK_STATS_UNLOCK;
+ }
+
+abort:
+ aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
+ return data_to_be_read;
+}
+
+void aclk_lws_wss_service_loop()
+{
+ if (engine_instance)
+ {
+ /*if (engine_instance->lws_wsi) {
+ lws_cancel_service(engine_instance->lws_context);
+ lws_callback_on_writable(engine_instance->lws_wsi);
+ }*/
+ lws_service(engine_instance->lws_context, 0);
+ }
+}
+
+// in case the MQTT connection disconnect while lws transport is still operational
+// we should drop connection and reconnect
+// this function should be called when that happens to notify lws of that situation
+void aclk_lws_wss_mqtt_layer_disconect_notif()
+{
+ if (!engine_instance)
+ return;
+ if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
+ engine_instance->upstream_reconnect_request = 1;
+ lws_callback_on_writable(
+ engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written.
+ }
+}
diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h
new file mode 100644
index 000000000..584a3cf4f
--- /dev/null
+++ b/aclk/legacy/aclk_lws_wss_client.h
@@ -0,0 +1,92 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_LWS_WSS_CLIENT_H
+#define ACLK_LWS_WSS_CLIENT_H
+
+#include <libwebsockets.h>
+
+#include "libnetdata/libnetdata.h"
+
+// This is as define because ideally the ACLK at high level
+// can do mosqitto writes and reads only from one thread
+// which is cleaner implementation IMHO
+// in such case this mutexes are not necessarry and life
+// is simpler
+#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1
+
+#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES (128 * 1024)
+
+#define ACLK_LWS_CALLBACK_HISTORY 10
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+#define aclk_lws_mutex_init(x) netdata_mutex_init(x)
+#define aclk_lws_mutex_lock(x) netdata_mutex_lock(x)
+#define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x)
+#else
+#define aclk_lws_mutex_init(x)
+#define aclk_lws_mutex_lock(x)
+#define aclk_lws_mutex_unlock(x)
+#endif
+
+struct aclk_lws_wss_engine_callbacks {
+ void (*connection_established_callback)();
+ void (*data_rcvd_callback)();
+ void (*data_writable_callback)();
+ void (*connection_closed)();
+};
+
+struct lws_wss_packet_buffer {
+ unsigned char *data;
+ size_t data_size, written;
+ struct lws_wss_packet_buffer *next;
+};
+
+struct aclk_lws_wss_engine_instance {
+ //target host/port for connection
+ char *host;
+ int port;
+
+ //internal data
+ struct lws_context *lws_context;
+ struct lws *lws_wsi;
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+ netdata_mutex_t write_buf_mutex;
+ netdata_mutex_t read_buf_mutex;
+#endif
+
+ struct lws_wss_packet_buffer *write_buffer_head;
+ struct lws_ring *read_ringbuffer;
+
+ //flags to be readed by engine user
+ int websocket_connection_up;
+
+ // currently this is by default disabled
+
+ int data_to_read;
+ int upstream_reconnect_request;
+
+ int lws_callback_history[ACLK_LWS_CALLBACK_HISTORY];
+};
+
+void aclk_lws_wss_client_destroy();
+void aclk_lws_wss_destroy_context();
+
+int aclk_lws_wss_connect(char *host, int port);
+
+int aclk_lws_wss_client_write(void *buf, size_t count);
+int aclk_lws_wss_client_read(void *buf, size_t count);
+void aclk_lws_wss_service_loop();
+
+void aclk_lws_wss_mqtt_layer_disconect_notif();
+
+// Notifications inside the layer above
+void aclk_lws_connection_established();
+void aclk_lws_connection_data_received();
+void aclk_lws_connection_closed();
+void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
+
+void aclk_wss_set_proxy(struct lws_vhost *vhost);
+
+#define FRAGMENT_SIZE 4096
+#endif
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c
new file mode 100644
index 000000000..7ab534f16
--- /dev/null
+++ b/aclk/legacy/aclk_query.c
@@ -0,0 +1,789 @@
+#include "aclk_common.h"
+#include "aclk_query.h"
+#include "aclk_stats.h"
+#include "aclk_rx_msgs.h"
+
+#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
+
+pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
+#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+
+volatile int aclk_connected = 0;
+
+#ifndef __GNUC__
+#pragma region ACLK_QUEUE
+#endif
+
+static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
+#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex)
+#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex)
+
+struct aclk_query {
+ usec_t created;
+ usec_t created_boot_time;
+ time_t run_after; // Delay run until after this time
+ ACLK_CMD cmd; // What command is this
+ char *topic; // Topic to respond to
+ char *data; // Internal data (NULL if request from the cloud)
+ char *msg_id; // msg_id generated by the cloud (NULL if internal)
+ char *query; // The actual query
+ u_char deleted; // Mark deleted for garbage collect
+ struct aclk_query *next;
+};
+
+struct aclk_query_queue {
+ struct aclk_query *aclk_query_head;
+ struct aclk_query *aclk_query_tail;
+ unsigned int count;
+} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
+
+
+unsigned int aclk_query_size()
+{
+ int r;
+ ACLK_QUEUE_LOCK;
+ r = aclk_queue.count;
+ ACLK_QUEUE_UNLOCK;
+ return r;
+}
+
+/*
+ * Free a query structure when done
+ */
+static void aclk_query_free(struct aclk_query *this_query)
+{
+ if (unlikely(!this_query))
+ return;
+
+ freez(this_query->topic);
+ if (likely(this_query->query))
+ freez(this_query->query);
+ if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) {
+ struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data;
+ freez(del->data);
+ freez(del);
+ }
+ if (likely(this_query->msg_id))
+ freez(this_query->msg_id);
+ freez(this_query);
+}
+
+/*
+ * Get the next query to process - NULL if nothing there
+ * The caller needs to free memory by calling aclk_query_free()
+ *
+ * topic
+ * query
+ * The structure itself
+ *
+ */
+static struct aclk_query *aclk_queue_pop()
+{
+ struct aclk_query *this_query;
+
+ ACLK_QUEUE_LOCK;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ ACLK_QUEUE_UNLOCK;
+ return NULL;
+ }
+
+ this_query = aclk_queue.aclk_query_head;
+
+ // Get rid of the deleted entries
+ while (this_query && this_query->deleted) {
+ aclk_queue.count--;
+
+ aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ aclk_queue.aclk_query_tail = NULL;
+ }
+
+ aclk_query_free(this_query);
+
+ this_query = aclk_queue.aclk_query_head;
+ }
+
+ if (likely(!this_query)) {
+ ACLK_QUEUE_UNLOCK;
+ return NULL;
+ }
+
+ if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
+ info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
+ ACLK_QUEUE_UNLOCK;
+ return NULL;
+ }
+
+ aclk_queue.count--;
+ aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+ if (likely(!aclk_queue.aclk_query_head)) {
+ aclk_queue.aclk_query_tail = NULL;
+ }
+
+ ACLK_QUEUE_UNLOCK;
+ return this_query;
+}
+
+// Returns the entry after which we need to create a new entry to run at the specified time
+// If NULL is returned we need to add to HEAD
+// Need to have a QUERY lock before calling this
+
+static struct aclk_query *aclk_query_find_position(time_t time_to_run)
+{
+ struct aclk_query *tmp_query, *last_query;
+
+ // Quick check if we will add to the end
+ if (likely(aclk_queue.aclk_query_tail)) {
+ if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
+ return aclk_queue.aclk_query_tail;
+ }
+
+ last_query = NULL;
+ tmp_query = aclk_queue.aclk_query_head;
+
+ while (tmp_query) {
+ if (tmp_query->run_after > time_to_run)
+ return last_query;
+ last_query = tmp_query;
+ tmp_query = tmp_query->next;
+ }
+ return last_query;
+}
+
+// Need to have a QUERY lock before calling this
+static struct aclk_query *
+aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
+{
+ struct aclk_query *tmp_query, *prev_query;
+ UNUSED(cmd);
+
+ tmp_query = aclk_queue.aclk_query_head;
+ prev_query = NULL;
+ while (tmp_query) {
+ if (likely(!tmp_query->deleted)) {
+ if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
+ if ((!data || data == tmp_query->data) &&
+ (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
+ if (likely(last_query))
+ *last_query = prev_query;
+ return tmp_query;
+ }
+ }
+ }
+ prev_query = tmp_query;
+ tmp_query = tmp_query->next;
+ }
+ return NULL;
+}
+
+/*
+ * Add a query to execute, the result will be send to the specified topic
+ */
+
+int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
+{
+ struct aclk_query *new_query, *tmp_query;
+
+ // Ignore all commands while we wait for the agent to initialize
+ if (unlikely(!aclk_connected))
+ return 1;
+
+ run_after = now_realtime_sec() + run_after;
+
+ ACLK_QUEUE_LOCK;
+ struct aclk_query *last_query = NULL;
+
+ tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
+ if (unlikely(tmp_query)) {
+ if (tmp_query->run_after == run_after) {
+ ACLK_QUEUE_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+
+ if (last_query)
+ last_query->next = tmp_query->next;
+ else
+ aclk_queue.aclk_query_head = tmp_query->next;
+
+ debug(D_ACLK, "Removing double entry");
+ aclk_query_free(tmp_query);
+ aclk_queue.count--;
+ }
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_queued++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ new_query = callocz(1, sizeof(struct aclk_query));
+ new_query->cmd = aclk_cmd;
+ if (internal) {
+ new_query->topic = strdupz(topic);
+ if (likely(query))
+ new_query->query = strdupz(query);
+ } else {
+ new_query->topic = topic;
+ new_query->query = query;
+ new_query->msg_id = msg_id;
+ }
+
+ new_query->data = data;
+ new_query->next = NULL;
+ new_query->created = now_realtime_usec();
+ new_query->created_boot_time = now_boottime_usec();
+ new_query->run_after = run_after;
+
+ debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
+
+ tmp_query = aclk_query_find_position(run_after);
+
+ if (tmp_query) {
+ new_query->next = tmp_query->next;
+ tmp_query->next = new_query;
+ if (tmp_query == aclk_queue.aclk_query_tail)
+ aclk_queue.aclk_query_tail = new_query;
+ aclk_queue.count++;
+ ACLK_QUEUE_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+ }
+
+ new_query->next = aclk_queue.aclk_query_head;
+ aclk_queue.aclk_query_head = new_query;
+ aclk_queue.count++;
+
+ ACLK_QUEUE_UNLOCK;
+ QUERY_THREAD_WAKEUP;
+ return 0;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+#ifndef __GNUC__
+#pragma region Helper Functions
+#endif
+
+/*
+ * Take a buffer, encode it and rewrite it
+ *
+ */
+
+static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
+{
+ char *tmp_buffer = mallocz(content_size * 2);
+ char *dst = tmp_buffer;
+ while (content_size > 0) {
+ switch (*src) {
+ case '\n':
+ if (keep_newlines)
+ {
+ *dst++ = '\\';
+ *dst++ = 'n';
+ }
+ break;
+ case '\t':
+ break;
+ case 0x01 ... 0x08:
+ case 0x0b ... 0x1F:
+ *dst++ = '\\';
+ *dst++ = 'u';
+ *dst++ = '0';
+ *dst++ = '0';
+ *dst++ = (*src < 0x0F) ? '0' : '1';
+ *dst++ = to_hex(*src);
+ break;
+ case '\"':
+ *dst++ = '\\';
+ *dst++ = *src;
+ break;
+ default:
+ *dst++ = *src;
+ }
+ src++;
+ content_size--;
+ }
+ *dst = '\0';
+
+ return tmp_buffer;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+#ifndef __GNUC__
+#pragma region ACLK_QUERY
+#endif
+
+static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
+{
+ usec_t t = now_boottime_usec();
+ aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
+
+ w->response.code = web_client_api_request_v1(host, w, url);
+ t = now_boottime_usec() - t;
+
+ aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t);
+
+ return t;
+}
+
+static int aclk_execute_query(struct aclk_query *this_query)
+{
+ if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = 0x1f;
+
+ char *mysep = strchr(this_query->query, '?');
+ if (mysep) {
+ strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
+ *mysep = '\0';
+ } else
+ strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
+
+ mysep = strrchr(this_query->query, '/');
+
+ // TODO: handle bad response perhaps in a different way. For now it does to the payload
+ aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+ now_realtime_timeval(&w->tv_ready);
+ w->response.data->date = w->tv_ready.tv_sec;
+ web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
+ BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ buffer_flush(local_buffer);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+ char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
+ char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
+
+ buffer_sprintf(
+ local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
+ w->response.code, encoded_response, encoded_header);
+
+ buffer_sprintf(local_buffer, "\n}");
+
+ debug(D_ACLK, "Response:%s", encoded_header);
+
+ aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
+
+ buffer_free(w->response.data);
+ buffer_free(w->response.header);
+ buffer_free(w->response.header_output);
+ freez(w);
+ buffer_free(local_buffer);
+ freez(encoded_response);
+ freez(encoded_header);
+ return 0;
+ }
+ return 1;
+}
+
+static int aclk_execute_query_v2(struct aclk_query *this_query)
+{
+ int retval = 0;
+ usec_t t;
+ BUFFER *local_buffer = NULL;
+ struct aclk_cloud_req_v2 *cloud_req = (struct aclk_cloud_req_v2 *)this_query->data;
+
+#ifdef NETDATA_WITH_ZLIB
+ int z_ret;
+ BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ char *start, *end;
+#endif
+
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = 0x1f;
+
+ char *mysep = strchr(this_query->query, '?');
+ if (mysep) {
+ url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+ *mysep = '\0';
+ } else
+ url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+
+ mysep = strrchr(this_query->query, '/');
+
+ // execute the query
+ t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+
+#ifdef NETDATA_WITH_ZLIB
+ // check if gzip encoding can and should be used
+ if ((start = strstr(cloud_req->data, WEB_HDR_ACCEPT_ENC))) {
+ start += strlen(WEB_HDR_ACCEPT_ENC);
+ end = strstr(start, "\x0D\x0A");
+ start = strstr(start, "gzip");
+
+ if (start && start < end) {
+ w->response.zstream.zalloc = Z_NULL;
+ w->response.zstream.zfree = Z_NULL;
+ w->response.zstream.opaque = Z_NULL;
+ if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
+ w->response.zinitialized = 1;
+ w->response.zoutput = 1;
+ } else
+ error("Failed to initialize zlib. Proceeding without compression.");
+ }
+ }
+
+ if (w->response.data->len && w->response.zinitialized) {
+ w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
+ w->response.zstream.avail_in = w->response.data->len;
+ do {
+ w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
+ w->response.zstream.next_out = w->response.zbuffer;
+ z_ret = deflate(&w->response.zstream, Z_FINISH);
+ if(z_ret < 0) {
+ if(w->response.zstream.msg)
+ error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg);
+ else
+ error("Unknown error during zlib compression.");
+ retval = 1;
+ goto cleanup;
+ }
+ int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
+ buffer_need_bytes(z_buffer, bytes_to_cpy);
+ memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
+ z_buffer->len += bytes_to_cpy;
+ } while(z_ret != Z_STREAM_END);
+ // so that web_client_build_http_header
+ // puts correct content lenght into header
+ buffer_free(w->response.data);
+ w->response.data = z_buffer;
+ z_buffer = NULL;
+ }
+#endif
+
+ now_realtime_timeval(&w->tv_ready);
+ w->response.data->date = w->tv_ready.tv_sec;
+ web_client_build_http_header(w);
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
+ buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code);
+ buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A");
+ buffer_strcat(local_buffer, w->response.header_output->buffer);
+
+ if (w->response.data->len) {
+#ifdef NETDATA_WITH_ZLIB
+ if (w->response.zinitialized) {
+ buffer_need_bytes(local_buffer, w->response.data->len);
+ memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
+ local_buffer->len += w->response.data->len;
+ } else {
+#endif
+ buffer_strcat(local_buffer, w->response.data->buffer);
+#ifdef NETDATA_WITH_ZLIB
+ }
+#endif
+ }
+
+ aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
+
+cleanup:
+#ifdef NETDATA_WITH_ZLIB
+ if(w->response.zinitialized)
+ deflateEnd(&w->response.zstream);
+ buffer_free(z_buffer);
+#endif
+ buffer_free(w->response.data);
+ buffer_free(w->response.header);
+ buffer_free(w->response.header_output);
+ freez(w);
+ buffer_free(local_buffer);
+ return retval;
+}
+
+#define ACLK_HOST_PTR_COMPULSORY(x) \
+ if (unlikely(!host)) { \
+ errno = 0; \
+ error(x " needs host pointer"); \
+ break; \
+ }
+
+/*
+ * This function will fetch the next pending command and process it
+ *
+ */
+static int aclk_process_query(struct aclk_query_thread *t_info)
+{
+ struct aclk_query *this_query;
+ static long int query_count = 0;
+ ACLK_METADATA_STATE meta_state;
+ RRDHOST *host;
+
+ if (!aclk_connected)
+ return 0;
+
+ this_query = aclk_queue_pop();
+ if (likely(!this_query)) {
+ return 0;
+ }
+
+ if (unlikely(this_query->deleted)) {
+ debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
+ aclk_query_free(this_query);
+ return 1;
+ }
+ query_count++;
+
+ host = (RRDHOST*)this_query->data;
+
+ debug(
+ D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
+ this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS);
+
+ switch (this_query->cmd) {
+ case ACLK_CMD_ONCONNECT:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
+#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
+ if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
+ error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
+ break;
+ }
+#else
+#warning "This check became unnecessary. Remove"
+#endif
+
+ debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"",
+ host->hostname,
+ host->machine_guid);
+
+ rrdhost_aclk_state_lock(host);
+ meta_state = host->aclk_state.metadata;
+ host->aclk_state.metadata = ACLK_METADATA_SENT;
+ rrdhost_aclk_state_unlock(host);
+ aclk_send_metadata(meta_state, host);
+ break;
+
+ case ACLK_CMD_CHART:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART");
+
+ debug(D_ACLK, "EXECUTING a chart update command");
+ aclk_send_single_chart(host, this_query->query);
+ break;
+
+ case ACLK_CMD_CHARTDEL:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL");
+
+ debug(D_ACLK, "EXECUTING a chart delete command");
+ //TODO: This send the info metadata for now
+ aclk_send_info_metadata(ACLK_METADATA_SENT, host);
+ break;
+
+ case ACLK_CMD_ALARM:
+ debug(D_ACLK, "EXECUTING an alarm update command");
+ aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
+ break;
+
+ case ACLK_CMD_CLOUD:
+ debug(D_ACLK, "EXECUTING a cloud command");
+ aclk_execute_query(this_query);
+ break;
+ case ACLK_CMD_CLOUD_QUERY_2:
+ debug(D_ACLK, "EXECUTING Cloud Query v2");
+ aclk_execute_query_v2(this_query);
+ break;
+
+ case ACLK_CMD_CHILD_CONNECT:
+ case ACLK_CMD_CHILD_DISCONNECT:
+ ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT");
+
+ debug(
+ D_ACLK, "Execution Child %s command",
+ this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect");
+ aclk_send_info_child_connection(host, this_query->cmd);
+ break;
+
+ default:
+ errno = 0;
+ error("Unknown ACLK Query Command");
+ break;
+ }
+ debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_dispatched++;
+ aclk_queries_per_thread[t_info->idx]++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ aclk_query_free(this_query);
+
+ return 1;
+}
+
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
+{
+ if (query_threads && query_threads->thread_list) {
+ for (int i = 0; i < query_threads->count; i++) {
+ netdata_thread_join(query_threads->thread_list[i].thread, NULL);
+ }
+ freez(query_threads->thread_list);
+ }
+
+ struct aclk_query *this_query;
+
+ do {
+ this_query = aclk_queue_pop();
+ aclk_query_free(this_query);
+ } while (this_query);
+}
+
+#define TASK_LEN_MAX 16
+void aclk_query_threads_start(struct aclk_query_threads *query_threads)
+{
+ info("Starting %d query threads.", query_threads->count);
+
+ char thread_name[TASK_LEN_MAX];
+ query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
+ for (int i = 0; i < query_threads->count; i++) {
+ query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
+
+ if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0))
+ error("snprintf encoding error");
+ netdata_thread_create(
+ &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
+ &query_threads->thread_list[i]);
+ }
+}
+
+/**
+ * Checks and updates popcorning state of rrdhost
+ * returns actual/updated popcorning state
+ */
+
+ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
+{
+ rrdhost_aclk_state_lock(host);
+ ACLK_POPCORNING_STATE ret = host->aclk_state.state;
+ if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+ }
+
+ if (!host->aclk_state.t_last_popcorn_update){
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+ }
+
+ time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update;
+
+ if (t_diff >= ACLK_STABLE_TIMEOUT) {
+ host->aclk_state.state = ACLK_HOST_STABLE;
+ host->aclk_state.t_last_popcorn_update = 0;
+ rrdhost_aclk_state_unlock(host);
+ info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff);
+ return ACLK_HOST_STABLE;
+ }
+
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+}
+
+/**
+ * Main query processing thread
+ *
+ * On startup wait for the agent collectors to initialize
+ * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
+ * of no new collectors coming in in order to mark the agent
+ * as stable (set agent_state = AGENT_STABLE)
+ */
+void *aclk_query_main_thread(void *ptr)
+{
+ struct aclk_query_thread *info = ptr;
+
+ while (!netdata_exit) {
+ if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) {
+#ifdef ACLK_DEBUG
+ _dump_collector_list();
+#endif
+ break;
+ }
+ sleep_usec(USEC_PER_SEC * 1);
+ }
+
+ while (!netdata_exit) {
+ if(aclk_disable_runtime) {
+ sleep(1);
+ continue;
+ }
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(!aclk_shared_state.version_neg)) {
+ if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
+ ACLK_SHARED_STATE_UNLOCK;
+ info("Waiting for ACLK Version Negotiation message from Cloud");
+ sleep(1);
+ continue;
+ }
+ errno = 0;
+ error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
+ " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
+ aclk_shared_state.version_neg = ACLK_VERSION_MIN;
+ aclk_set_rx_handlers(aclk_shared_state.version_neg);
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
+ if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ rrdhost_aclk_state_unlock(localhost);
+ errno = 0;
+ error("ACLK failed to queue on_connect command");
+ sleep(1);
+ continue;
+ }
+ localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED;
+ }
+ rrdhost_aclk_state_unlock(localhost);
+
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
+ aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
+ aclk_shared_state.next_popcorn_host = NULL;
+ aclk_update_next_child_to_popcorn();
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+
+ while (aclk_process_query(info)) {
+ // Process all commands
+ };
+
+ QUERY_THREAD_LOCK;
+
+ // TODO: Need to check if there are queries awaiting already
+ if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ sleep_usec(USEC_PER_SEC * 1);
+
+ QUERY_THREAD_UNLOCK;
+ }
+
+ return NULL;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h
new file mode 100644
index 000000000..53eef1392
--- /dev/null
+++ b/aclk/legacy/aclk_query.h
@@ -0,0 +1,40 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_QUERY_H
+#define NETDATA_ACLK_QUERY_H
+
+#include "libnetdata/libnetdata.h"
+#include "web/server/web_client.h"
+
+#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+
+extern pthread_cond_t query_cond_wait;
+extern pthread_mutex_t query_lock_wait;
+#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
+
+extern volatile int aclk_connected;
+
+struct aclk_query_thread {
+ netdata_thread_t thread;
+ int idx;
+};
+
+struct aclk_query_threads {
+ struct aclk_query_thread *thread_list;
+ int count;
+};
+
+struct aclk_cloud_req_v2 {
+ char *data;
+ RRDHOST *host;
+};
+
+void *aclk_query_main_thread(void *ptr);
+int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
+
+void aclk_query_threads_start(struct aclk_query_threads *query_threads);
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
+unsigned int aclk_query_size();
+
+#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/legacy/aclk_rrdhost_state.h b/aclk/legacy/aclk_rrdhost_state.h
new file mode 100644
index 000000000..7ab3a502e
--- /dev/null
+++ b/aclk/legacy/aclk_rrdhost_state.h
@@ -0,0 +1,42 @@
+#ifndef ACLK_RRDHOST_STATE_H
+#define ACLK_RRDHOST_STATE_H
+
+#include "../../libnetdata/libnetdata.h"
+
+typedef enum aclk_cmd {
+ ACLK_CMD_CLOUD,
+ ACLK_CMD_ONCONNECT,
+ ACLK_CMD_INFO,
+ ACLK_CMD_CHART,
+ ACLK_CMD_CHARTDEL,
+ ACLK_CMD_ALARM,
+ ACLK_CMD_CLOUD_QUERY_2,
+ ACLK_CMD_CHILD_CONNECT,
+ ACLK_CMD_CHILD_DISCONNECT
+} ACLK_CMD;
+
+typedef enum aclk_metadata_state {
+ ACLK_METADATA_REQUIRED,
+ ACLK_METADATA_CMD_QUEUED,
+ ACLK_METADATA_SENT
+} ACLK_METADATA_STATE;
+
+typedef enum aclk_agent_state {
+ ACLK_HOST_INITIALIZING,
+ ACLK_HOST_STABLE
+} ACLK_POPCORNING_STATE;
+
+typedef struct aclk_rrdhost_state {
+ char *claimed_id; // Claimed ID if host has one otherwise NULL
+
+#ifdef ENABLE_ACLK
+ // per child popcorning
+ ACLK_POPCORNING_STATE state;
+ ACLK_METADATA_STATE metadata;
+
+ time_t timestamp_created;
+ time_t t_last_popcorn_update;
+#endif /* ENABLE_ACLK */
+} aclk_rrdhost_state;
+
+#endif /* ACLK_RRDHOST_STATE_H */
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
new file mode 100644
index 000000000..99fa9d987
--- /dev/null
+++ b/aclk/legacy/aclk_rx_msgs.c
@@ -0,0 +1,365 @@
+
+#include "aclk_rx_msgs.h"
+
+#include "aclk_common.h"
+#include "aclk_stats.h"
+#include "aclk_query.h"
+
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN 37
+#endif
+
+static inline int aclk_extract_v2_data(char *payload, char **data)
+{
+ char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR);
+ if(!ptr)
+ return 1;
+ ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR);
+ *data = strdupz(ptr);
+ return 0;
+}
+
+#define ACLK_GET_REQ "GET "
+#define ACLK_CHILD_REQ "/host/"
+#define ACLK_CLOUD_REQ_V2_PREFIX "/api/v1/"
+#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref))
+static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req)
+{
+ const char *start, *end, *ptr;
+ char uuid_str[UUID_STR_LEN];
+ uuid_t uuid;
+
+ errno = 0;
+
+ if(STRNCMP_CONSTANT_PREFIX(cloud_req->data, ACLK_GET_REQ)) {
+ error("Only accepting GET HTTP requests from CLOUD");
+ return 1;
+ }
+ start = ptr = cloud_req->data + strlen(ACLK_GET_REQ);
+
+ if(!STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CHILD_REQ)) {
+ ptr += strlen(ACLK_CHILD_REQ);
+ if(strlen(ptr) < UUID_STR_LEN) {
+ error("the child id in URL too short \"%s\"", start);
+ return 1;
+ }
+
+ strncpyz(uuid_str, ptr, UUID_STR_LEN - 1);
+
+ for(int i = 0; i < UUID_STR_LEN && uuid_str[i]; i++)
+ uuid_str[i] = tolower(uuid_str[i]);
+
+ if(ptr[0] && uuid_parse(uuid_str, uuid)) {
+ error("Got Child query (/host/XXX/...) host id \"%s\" doesn't look like valid GUID", uuid_str);
+ return 1;
+ }
+ ptr += UUID_STR_LEN - 1;
+
+ cloud_req->host = rrdhost_find_by_guid(uuid_str, 0);
+ if(!cloud_req->host) {
+ error("Cannot find host with GUID \"%s\"", uuid_str);
+ return 1;
+ }
+ }
+
+ if(STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CLOUD_REQ_V2_PREFIX)) {
+ error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
+ return 1;
+ }
+
+ if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) {
+ errno = 0;
+ error("Doesn't look like HTTP GET request.");
+ return 1;
+ }
+
+ req->payload = mallocz((end - start) + 1);
+ strncpyz(req->payload, start, end - start);
+
+ return 0;
+}
+
+#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\
+ if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\
+ debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
+ rrdhost_aclk_state_unlock(localhost);\
+ return 1;\
+ }\
+ rrdhost_aclk_state_unlock(localhost);
+
+/*
+ * Parse the incoming payload and queue a command if valid
+ */
+static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ UNUSED(raw_payload);
+ HTTP_CHECK_AGENT_INITIALIZED();
+
+ errno = 0;
+ if (unlikely(cloud_to_agent->version != 1)) {
+ error(
+ "Received \"http\" message from Cloud with version %d, but ACLK version %d is used",
+ cloud_to_agent->version,
+ aclk_shared_state.version_neg);
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->payload)) {
+ error("payload missing");
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->callback_topic)) {
+ error("callback_topic missing");
+ return 1;
+ }
+
+ if (unlikely(!cloud_to_agent->msg_id)) {
+ error("msg_id missing");
+ return 1;
+ }
+
+ if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
+ debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
+
+ return 0;
+}
+
+static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ HTTP_CHECK_AGENT_INITIALIZED();
+
+ struct aclk_cloud_req_v2 *cloud_req;
+ char *data;
+
+ errno = 0;
+ if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
+ error(
+ "This handler cannot reply to request with version older than %d, received %d.",
+ ACLK_V_COMPRESSION,
+ cloud_to_agent->version);
+ return 1;
+ }
+
+ if (unlikely(aclk_extract_v2_data(raw_payload, &data))) {
+ error("Error extracting payload expected after the JSON dictionary.");
+ return 1;
+ }
+
+ cloud_req = mallocz(sizeof(struct aclk_cloud_req_v2));
+ cloud_req->data = data;
+ cloud_req->host = localhost;
+
+ if (unlikely(aclk_v2_payload_get_query(cloud_req, cloud_to_agent))) {
+ error("Could not extract payload from query");
+ goto cleanup;
+ }
+
+ if (unlikely(!cloud_to_agent->callback_topic)) {
+ error("Missing callback_topic");
+ goto cleanup;
+ }
+
+ if (unlikely(!cloud_to_agent->msg_id)) {
+ error("Missing msg_id");
+ goto cleanup;
+ }
+
+ // aclk_queue_query takes ownership of data pointer
+ if (unlikely(aclk_queue_query(
+ cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
+ ACLK_CMD_CLOUD_QUERY_2))) {
+ error("ACLK failed to queue incoming \"http\" v2 message");
+ goto cleanup;
+ }
+
+ return 0;
+cleanup:
+ freez(cloud_req->data);
+ freez(cloud_req);
+ return 1;
+}
+
+// This handles `version` message from cloud used to negotiate
+// protocol version we will use
+static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ UNUSED(raw_payload);
+ int version = -1;
+ errno = 0;
+
+ if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
+ error(
+ "Unsuported version of \"version\" message from cloud. Expected %d, Got %d",
+ ACLK_VERSION_NEG_VERSION,
+ cloud_to_agent->version);
+ return 1;
+ }
+ if (unlikely(!cloud_to_agent->min_version)) {
+ error("Min version missing or 0");
+ return 1;
+ }
+ if (unlikely(!cloud_to_agent->max_version)) {
+ error("Max version missing or 0");
+ return 1;
+ }
+ if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
+ error(
+ "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version,
+ cloud_to_agent->min_version);
+ return 1;
+ }
+
+ if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
+ error(
+ "Agent too old for this cloud. Minimum version required by cloud %d."
+ " Maximum version supported by this agent %d.",
+ cloud_to_agent->min_version, ACLK_VERSION_MAX);
+ aclk_kill_link = 1;
+ aclk_disable_runtime = 1;
+ return 1;
+ }
+ if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
+ error(
+ "Cloud version is too old for this agent. Maximum version supported by cloud %d."
+ " Minimum (oldest) version supported by this agent %d.",
+ cloud_to_agent->max_version, ACLK_VERSION_MIN);
+ aclk_kill_link = 1;
+ return 1;
+ }
+
+ version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
+
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
+ errno = 0;
+ error("The \"version\" message came too late ignoring.");
+ goto err_cleanup;
+ }
+ if (unlikely(aclk_shared_state.version_neg)) {
+ errno = 0;
+ error("Version has already been set to %d", aclk_shared_state.version_neg);
+ goto err_cleanup;
+ }
+ aclk_shared_state.version_neg = version;
+ ACLK_SHARED_STATE_UNLOCK;
+
+ info("Choosing version %d of ACLK", version);
+
+ aclk_set_rx_handlers(version);
+
+ return 0;
+
+err_cleanup:
+ ACLK_SHARED_STATE_UNLOCK;
+ return 1;
+}
+
+typedef struct aclk_incoming_msg_type{
+ char *name;
+ int(*fnc)(struct aclk_request *, char *);
+}aclk_incoming_msg_type;
+
+aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = {
+ { .name = "http", .fnc = aclk_handle_cloud_request_v1 },
+ { .name = "version", .fnc = aclk_handle_version_response },
+ { .name = NULL, .fnc = NULL }
+};
+
+aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
+ { .name = "http", .fnc = aclk_handle_cloud_request_v2 },
+ { .name = "version", .fnc = aclk_handle_version_response },
+ { .name = NULL, .fnc = NULL }
+};
+
+struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+
+void aclk_set_rx_handlers(int version)
+{
+ if(version >= ACLK_V_COMPRESSION) {
+ aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
+ return;
+ }
+
+ aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
+}
+
+int aclk_handle_cloud_message(char *payload)
+{
+ struct aclk_request cloud_to_agent;
+ memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ if (unlikely(!payload)) {
+ errno = 0;
+ error("ACLK incoming message is empty");
+ goto err_cleanup_nojson;
+ }
+
+ debug(D_ACLK, "ACLK incoming message (%s)", payload);
+
+ int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+
+ if (unlikely(rc != JSON_OK)) {
+ errno = 0;
+ error("Malformed json request (%s)", payload);
+ goto err_cleanup;
+ }
+
+ if (!cloud_to_agent.type_id) {
+ errno = 0;
+ error("Cloud message is missing compulsory key \"type\"");
+ goto err_cleanup;
+ }
+
+ if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
+ error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
+ goto err_cleanup;
+ }
+
+ for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
+ if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
+ if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
+ // in case of success handler is supposed to clean up after itself
+ // or as in the case of aclk_handle_cloud_request take
+ // ownership of the pointers (done to avoid copying)
+ // see what `aclk_queue_query` parameter `internal` does
+
+ // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
+ // msg handlers (namely aclk_handle_version_responce)
+ // can freely change what aclk_incoming_msg_types points to
+ // so either exit or restart this for loop
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
+ goto err_cleanup;
+ }
+ }
+
+ errno = 0;
+ error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
+
+err_cleanup:
+ if (cloud_to_agent.payload)
+ freez(cloud_to_agent.payload);
+ if (cloud_to_agent.type_id)
+ freez(cloud_to_agent.type_id);
+ if (cloud_to_agent.msg_id)
+ freez(cloud_to_agent.msg_id);
+ if (cloud_to_agent.callback_topic)
+ freez(cloud_to_agent.callback_topic);
+
+err_cleanup_nojson:
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ return 1;
+}
diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h
new file mode 100644
index 000000000..3095e41a7
--- /dev/null
+++ b/aclk/legacy/aclk_rx_msgs.h
@@ -0,0 +1,13 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_RX_MSGS_H
+#define NETDATA_ACLK_RX_MSGS_H
+
+#include "../../daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+int aclk_handle_cloud_message(char *payload);
+void aclk_set_rx_handlers(int version);
+
+
+#endif /* NETDATA_ACLK_RX_MSGS_H */
diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c
new file mode 100644
index 000000000..2a57cd6f0
--- /dev/null
+++ b/aclk/legacy/aclk_stats.c
@@ -0,0 +1,298 @@
+#include "aclk_stats.h"
+
+netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
+
+int aclk_stats_enabled;
+
+int query_thread_count;
+
+// data ACLK stats need per query thread
+struct aclk_qt_data {
+ RRDDIM *dim;
+} *aclk_qt_data = NULL;
+
+uint32_t *aclk_queries_per_thread = NULL;
+uint32_t *aclk_queries_per_thread_sample = NULL;
+
+struct aclk_metrics aclk_metrics = {
+ .online = 0,
+};
+
+struct aclk_metrics_per_sample aclk_metrics_per_sample;
+
+struct aclk_mat_metrics aclk_mat_metrics = {
+#ifdef NETDATA_INTERNAL_CHECKS
+ .latency = { .name = "aclk_latency_mqtt",
+ .prio = 200002,
+ .st = NULL,
+ .rd_avg = NULL,
+ .rd_max = NULL,
+ .rd_total = NULL,
+ .unit = "ms",
+ .title = "ACLK Message Publish Latency" },
+#endif
+
+ .cloud_q_db_query_time = { .name = "aclk_db_query_time",
+ .prio = 200006,
+ .st = NULL,
+ .rd_avg = NULL,
+ .rd_max = NULL,
+ .rd_total = NULL,
+ .unit = "us",
+ .title = "Time it took to process cloud requested DB queries" },
+
+ .cloud_q_recvd_to_processed = { .name = "aclk_cloud_q_recvd_to_processed",
+ .prio = 200007,
+ .st = NULL,
+ .rd_avg = NULL,
+ .rd_max = NULL,
+ .rd_total = NULL,
+ .unit = "us",
+ .title = "Time from receiving the Cloud Query until it was picked up "
+ "by query thread (just before passing to the database)." }
+};
+
+void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement)
+{
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ if (metric->max < measurement)
+ metric->max = measurement;
+
+ metric->total += measurement;
+ metric->count++;
+ ACLK_STATS_UNLOCK;
+ }
+}
+
+static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent)
+{
+ static RRDSET *st_aclkstats = NULL;
+ static RRDDIM *rd_online_status = NULL;
+
+ if (unlikely(!st_aclkstats)) {
+ st_aclkstats = rrdset_create_localhost(
+ "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status",
+ "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st_aclkstats);
+
+ rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online);
+
+ rrdset_done(st_aclkstats);
+}
+
+static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st_query_thread = NULL;
+ static RRDDIM *rd_queued = NULL;
+ static RRDDIM *rd_dispatched = NULL;
+
+ if (unlikely(!st_query_thread)) {
+ st_query_thread = rrdset_create_localhost(
+ "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s",
+ "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA);
+
+ rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st_query_thread);
+
+ rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued);
+ rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched);
+
+ rrdset_done(st_query_thread);
+}
+
+static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_wq_add = NULL;
+ static RRDDIM *rd_wq_consumed = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "kB/s",
+ "netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA);
+
+ rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_wq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_wq_add, per_sample->write_q_added);
+ rrddim_set_by_pointer(st, rd_wq_consumed, per_sample->write_q_consumed);
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_add = NULL;
+ static RRDDIM *rd_rq_consumed = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "kB/s",
+ "netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA);
+
+ rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_rq_add, per_sample->read_q_added);
+ rrddim_set_by_pointer(st, rd_rq_consumed, per_sample->read_q_consumed);
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_rcvd = NULL;
+ static RRDDIM *rd_rq_err = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
+ "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err);
+ rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
+
+ rrdset_done(st);
+}
+
+#define MAX_DIM_NAME 16
+static void aclk_stats_query_threads(uint32_t *queries_per_thread)
+{
+ static RRDSET *st = NULL;
+
+ char dim_name[MAX_DIM_NAME];
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
+ "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < query_thread_count; i++) {
+ if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
+ error("snprintf encoding error");
+ aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+ } else
+ rrdset_next(st);
+
+ for (int i = 0; i < query_thread_count; i++) {
+ rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
+ }
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct aclk_metric_mat_data *data)
+{
+ if(unlikely(!metric->st)) {
+ metric->st = rrdset_create_localhost(
+ "netdata", metric->name, NULL, "aclk", NULL, metric->title, metric->unit, "netdata", "stats", metric->prio,
+ localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ metric->rd_avg = rrddim_add(metric->st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ metric->rd_max = rrddim_add(metric->st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ metric->rd_total = rrddim_add(metric->st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(metric->st);
+
+ if(data->count)
+ rrddim_set_by_pointer(metric->st, metric->rd_avg, roundf((float)data->total / data->count));
+ else
+ rrddim_set_by_pointer(metric->st, metric->rd_avg, 0);
+ rrddim_set_by_pointer(metric->st, metric->rd_max, data->max);
+ rrddim_set_by_pointer(metric->st, metric->rd_total, data->total);
+
+ rrdset_done(metric->st);
+}
+
+void aclk_stats_thread_cleanup()
+{
+ freez(aclk_qt_data);
+ freez(aclk_queries_per_thread);
+ freez(aclk_queries_per_thread_sample);
+}
+
+void *aclk_stats_main_thread(void *ptr)
+{
+ struct aclk_stats_thread *args = ptr;
+
+ query_thread_count = args->query_thread_count;
+ aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
+ aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
+ aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+ usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
+
+ memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+ struct aclk_metrics_per_sample per_sample;
+ struct aclk_metrics permanent;
+
+ while (!netdata_exit) {
+ netdata_thread_testcancel();
+ // ------------------------------------------------------------------------
+ // Wait for the next iteration point.
+
+ heartbeat_next(&hb, step_ut);
+ if (netdata_exit) break;
+
+ ACLK_STATS_LOCK;
+ // to not hold lock longer than necessary, especially not to hold it
+ // during database rrd* operations
+ memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
+ memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
+ memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+ memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
+ memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
+ ACLK_STATS_UNLOCK;
+
+ aclk_stats_collect(&per_sample, &permanent);
+ aclk_stats_query_queue(&per_sample);
+
+ aclk_stats_write_q(&per_sample);
+ aclk_stats_read_q(&per_sample);
+
+ aclk_stats_cloud_req(&per_sample);
+ aclk_stats_query_threads(aclk_queries_per_thread_sample);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency);
+#endif
+ aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_db_query_time, &per_sample.cloud_q_db_query_time);
+ aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_recvd_to_processed, &per_sample.cloud_q_recvd_to_processed);
+ }
+
+ return 0;
+}
+
+void aclk_stats_upd_online(int online) {
+ if(!aclk_stats_enabled)
+ return;
+
+ ACLK_STATS_LOCK;
+ aclk_metrics.online = online;
+
+ if(!online)
+ aclk_metrics_per_sample.offline_during_sample = 1;
+ ACLK_STATS_UNLOCK;
+}
diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h
new file mode 100644
index 000000000..7e74fdf88
--- /dev/null
+++ b/aclk/legacy/aclk_stats.h
@@ -0,0 +1,91 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_STATS_H
+#define NETDATA_ACLK_STATS_H
+
+#include "../../daemon/common.h"
+#include "libnetdata/libnetdata.h"
+#include "aclk_common.h"
+
+#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
+
+extern netdata_mutex_t aclk_stats_mutex;
+
+#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
+#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
+
+extern int aclk_stats_enabled;
+
+struct aclk_stats_thread {
+ netdata_thread_t *thread;
+ int query_thread_count;
+};
+
+// preserve between samples
+struct aclk_metrics {
+ volatile uint8_t online;
+};
+
+//mat = max average total
+struct aclk_metric_mat_data {
+ volatile uint32_t total;
+ volatile uint32_t count;
+ volatile uint32_t max;
+};
+
+//mat = max average total
+struct aclk_metric_mat {
+ char *name;
+ char *title;
+ RRDSET *st;
+ RRDDIM *rd_avg;
+ RRDDIM *rd_max;
+ RRDDIM *rd_total;
+ long prio;
+ char *unit;
+};
+
+extern struct aclk_mat_metrics {
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct aclk_metric_mat latency;
+#endif
+ struct aclk_metric_mat cloud_q_db_query_time;
+ struct aclk_metric_mat cloud_q_recvd_to_processed;
+} aclk_mat_metrics;
+
+void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
+
+// reset to 0 on every sample
+extern struct aclk_metrics_per_sample {
+ /* in the unlikely event of ACLK disconnecting
+ and reconnecting under 1 sampling rate
+ we want to make sure we record the disconnection
+ despite it being then seemingly longer in graph */
+ volatile uint8_t offline_during_sample;
+
+ volatile uint32_t queries_queued;
+ volatile uint32_t queries_dispatched;
+
+ volatile uint32_t write_q_added;
+ volatile uint32_t write_q_consumed;
+
+ volatile uint32_t read_q_added;
+ volatile uint32_t read_q_consumed;
+
+ volatile uint32_t cloud_req_recvd;
+ volatile uint32_t cloud_req_err;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct aclk_metric_mat_data latency;
+#endif
+ struct aclk_metric_mat_data cloud_q_db_query_time;
+ struct aclk_metric_mat_data cloud_q_recvd_to_processed;
+} aclk_metrics_per_sample;
+
+extern uint32_t *aclk_queries_per_thread;
+
+void *aclk_stats_main_thread(void *ptr);
+void aclk_stats_thread_cleanup();
+void aclk_stats_upd_online(int online);
+
+#endif /* NETDATA_ACLK_STATS_H */
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c
new file mode 100644
index 000000000..e51a01308
--- /dev/null
+++ b/aclk/legacy/agent_cloud_link.c
@@ -0,0 +1,1683 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
+#include "agent_cloud_link.h"
+#include "aclk_lws_https_client.h"
+#include "aclk_query.h"
+#include "aclk_common.h"
+#include "aclk_stats.h"
+
+#ifdef ENABLE_ACLK
+#include <libwebsockets.h>
+#endif
+
+int aclk_shutting_down = 0;
+
+// Other global state
+static int aclk_subscribed = 0;
+static int aclk_disable_single_updates = 0;
+static char *aclk_username = NULL;
+static char *aclk_password = NULL;
+
+static char *global_base_topic = NULL;
+static int aclk_connecting = 0;
+int aclk_force_reconnect = 0; // Indication from lower layers
+usec_t aclk_session_us = 0; // Used by the mqtt layer
+time_t aclk_session_sec = 0; // Used by the mqtt layer
+
+static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
+
+#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
+#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
+
+#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
+#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
+
+void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
+void aclk_lws_wss_destroy_context();
+/*
+ * Maintain a list of collectors and chart count
+ * If all the charts of a collector are deleted
+ * then a new metadata dataset must be send to the cloud
+ *
+ */
+struct _collector {
+ time_t created;
+ uint32_t count; //chart count
+ uint32_t hostname_hash;
+ uint32_t plugin_hash;
+ uint32_t module_hash;
+ char *hostname;
+ char *plugin_name;
+ char *module_name;
+ struct _collector *next;
+};
+
+struct _collector *collector_list = NULL;
+
+char *create_uuid()
+{
+ uuid_t uuid;
+ char *uuid_str = mallocz(36 + 1);
+
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+
+ return uuid_str;
+}
+
+int cloud_to_agent_parse(JSON_ENTRY *e)
+{
+ struct aclk_request *data = e->callback_data;
+
+ switch (e->type) {
+ case JSON_OBJECT:
+ case JSON_ARRAY:
+ break;
+ case JSON_STRING:
+ if (!strcmp(e->name, "msg-id")) {
+ data->msg_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "type")) {
+ data->type_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "callback-topic")) {
+ data->callback_topic = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "payload")) {
+ if (likely(e->data.string)) {
+ size_t len = strlen(e->data.string);
+ data->payload = mallocz(len+1);
+ if (!url_decode_r(data->payload, e->data.string, len + 1))
+ strcpy(data->payload, e->data.string);
+ }
+ break;
+ }
+ break;
+ case JSON_NUMBER:
+ if (!strcmp(e->name, "version")) {
+ data->version = e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "min-version")) {
+ data->min_version = e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "max-version")) {
+ data->max_version = e->data.number;
+ break;
+ }
+
+ break;
+
+ case JSON_BOOLEAN:
+ break;
+
+ case JSON_NULL:
+ break;
+ }
+ return 0;
+}
+
+
+static RSA *aclk_private_key = NULL;
+static int create_private_key()
+{
+ if (aclk_private_key != NULL)
+ RSA_free(aclk_private_key);
+ aclk_private_key = NULL;
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
+
+ long bytes_read;
+ char *private_key = read_by_filename(filename, &bytes_read);
+ if (!private_key) {
+ error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
+ return 1;
+ }
+ debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
+
+ BIO *key_bio = BIO_new_mem_buf(private_key, -1);
+ if (key_bio==NULL) {
+ error("Claimed agent cannot establish ACLK - failed to create BIO for key");
+ goto biofailed;
+ }
+
+ aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
+ BIO_free(key_bio);
+ if (aclk_private_key!=NULL)
+ {
+ freez(private_key);
+ return 0;
+ }
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
+
+biofailed:
+ freez(private_key);
+ return 1;
+}
+
+/*
+ * After a connection failure -- delay in milliseconds
+ * When a connection is established, the delay function
+ * should be called with
+ *
+ * mode 0 to reset the delay
+ * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ *
+ */
+unsigned long int aclk_reconnect_delay(int mode)
+{
+ static int fail = -1;
+ unsigned long int delay;
+
+ if (!mode || fail == -1) {
+ srandom(time(NULL));
+ fail = mode - 1;
+ return 0;
+ }
+
+ delay = (1 << fail);
+
+ if (delay >= ACLK_MAX_BACKOFF_DELAY) {
+ delay = ACLK_MAX_BACKOFF_DELAY * 1000;
+ } else {
+ fail++;
+ delay = (delay * 1000) + (random() % 1000);
+ }
+
+ return delay;
+}
+
+// This will give the base topic that the agent will publish messages.
+// subtopics will be sent under the base topic e.g. base_topic/subtopic
+// This is called during the connection, we delete any previous topic
+// in-case the user has changed the agent id and reclaimed.
+
+char *create_publish_base_topic()
+{
+ char *agent_id = is_agent_claimed();
+ if (unlikely(!agent_id))
+ return NULL;
+
+ ACLK_LOCK;
+
+ if (global_base_topic)
+ freez(global_base_topic);
+ char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
+
+ snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
+ tmp = strchr(tmp_topic, '\n');
+ if (unlikely(tmp))
+ *tmp = '\0';
+ global_base_topic = strdupz(tmp_topic);
+
+ ACLK_UNLOCK;
+ freez(agent_id);
+ return global_base_topic;
+}
+
+/*
+ * Build a topic based on sub_topic and final_topic
+ * if the sub topic starts with / assume that is an absolute topic
+ *
+ */
+
+char *get_topic(char *sub_topic, char *final_topic, int max_size)
+{
+ int rc;
+
+ if (likely(sub_topic && sub_topic[0] == '/'))
+ return sub_topic;
+
+ if (unlikely(!global_base_topic))
+ return sub_topic;
+
+ rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
+ if (unlikely(rc >= max_size))
+ debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
+
+ return final_topic;
+}
+
+#ifndef __GNUC__
+#pragma region ACLK Internal Collector Tracking
+#endif
+
+/*
+ * Free a collector structure
+ */
+
+static void _free_collector(struct _collector *collector)
+{
+ if (likely(collector->plugin_name))
+ freez(collector->plugin_name);
+
+ if (likely(collector->module_name))
+ freez(collector->module_name);
+
+ if (likely(collector->hostname))
+ freez(collector->hostname);
+
+ freez(collector);
+}
+
+/*
+ * This will report the collector list
+ *
+ */
+#ifdef ACLK_DEBUG
+static void _dump_collector_list()
+{
+ struct _collector *tmp_collector;
+
+ COLLECTOR_LOCK;
+
+ info("DUMPING ALL COLLECTORS");
+
+ if (unlikely(!collector_list || !collector_list->next)) {
+ COLLECTOR_UNLOCK;
+ info("DUMPING ALL COLLECTORS -- nothing found");
+ return;
+ }
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+
+ while (tmp_collector) {
+ info(
+ "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
+ tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
+ tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
+
+ tmp_collector = tmp_collector->next;
+ }
+ info("DUMPING ALL COLLECTORS DONE");
+ COLLECTOR_UNLOCK;
+}
+#endif
+
+/*
+ * This will cleanup the collector list
+ *
+ */
+static void _reset_collector_list()
+{
+ struct _collector *tmp_collector, *next_collector;
+
+ COLLECTOR_LOCK;
+
+ if (unlikely(!collector_list || !collector_list->next)) {
+ COLLECTOR_UNLOCK;
+ return;
+ }
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+ collector_list->count = 0;
+ collector_list->next = NULL;
+
+ // We broke the link; we can unlock
+ COLLECTOR_UNLOCK;
+
+ while (tmp_collector) {
+ next_collector = tmp_collector->next;
+ _free_collector(tmp_collector);
+ tmp_collector = next_collector;
+ }
+}
+
+/*
+ * Find a collector (if it exists)
+ * Must lock before calling this
+ * If last_collector is not null, it will return the previous collector in the linked
+ * list (used in collector delete)
+ */
+static struct _collector *_find_collector(
+ const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
+{
+ struct _collector *tmp_collector, *prev_collector;
+ uint32_t plugin_hash;
+ uint32_t module_hash;
+ uint32_t hostname_hash;
+
+ if (unlikely(!collector_list)) {
+ collector_list = callocz(1, sizeof(struct _collector));
+ return NULL;
+ }
+
+ if (unlikely(!collector_list->next))
+ return NULL;
+
+ plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
+ module_hash = module_name ? simple_hash(module_name) : 1;
+ hostname_hash = simple_hash(hostname);
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+ prev_collector = collector_list;
+ while (tmp_collector) {
+ if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
+ hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
+ (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
+ (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
+ if (unlikely(last_collector))
+ *last_collector = prev_collector;
+
+ return tmp_collector;
+ }
+
+ prev_collector = tmp_collector;
+ tmp_collector = tmp_collector->next;
+ }
+
+ return tmp_collector;
+}
+
+/*
+ * Called to delete a collector
+ * It will reduce the count (chart_count) and will remove it
+ * from the linked list if the count reaches zero
+ * The structure will be returned to the caller to free
+ * the resources
+ *
+ */
+static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
+{
+ struct _collector *tmp_collector, *prev_collector = NULL;
+
+ tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
+
+ if (likely(tmp_collector)) {
+ --tmp_collector->count;
+ if (unlikely(!tmp_collector->count))
+ prev_collector->next = tmp_collector->next;
+ }
+ return tmp_collector;
+}
+
+/*
+ * Add a new collector (plugin / module) to the list
+ * If it already exists just update the chart count
+ *
+ * Lock before calling
+ */
+static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
+{
+ struct _collector *tmp_collector;
+
+ tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
+
+ if (unlikely(!tmp_collector)) {
+ tmp_collector = callocz(1, sizeof(struct _collector));
+ tmp_collector->hostname_hash = simple_hash(hostname);
+ tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
+ tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
+
+ tmp_collector->hostname = strdupz(hostname);
+ tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
+ tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
+
+ tmp_collector->next = collector_list->next;
+ collector_list->next = tmp_collector;
+ }
+ tmp_collector->count++;
+ debug(
+ D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
+ module_name ? module_name : "*", tmp_collector->count);
+ return tmp_collector;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+/* Avoids the need to scan trough all RRDHOSTS
+ * every time any Query Thread Wakes Up
+ * (every time we need to check child popcorn expiry)
+ * call with ACLK_SHARED_STATE_LOCK held
+ */
+void aclk_update_next_child_to_popcorn(void)
+{
+ RRDHOST *host;
+ int any = 0;
+
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)))
+ continue;
+
+ rrdhost_aclk_state_lock(host);
+ if (!ACLK_IS_HOST_POPCORNING(host)) {
+ rrdhost_aclk_state_unlock(host);
+ continue;
+ }
+
+ any = 1;
+
+ if (unlikely(!aclk_shared_state.next_popcorn_host)) {
+ aclk_shared_state.next_popcorn_host = host;
+ rrdhost_aclk_state_unlock(host);
+ continue;
+ }
+
+ if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
+ aclk_shared_state.next_popcorn_host = host;
+
+ rrdhost_aclk_state_unlock(host);
+ }
+ if(!any)
+ aclk_shared_state.next_popcorn_host = NULL;
+
+ rrd_unlock();
+}
+
+/* If popcorning bump timer.
+ * If popcorning or initializing (host not stable) return 1
+ * Otherwise return 0
+ */
+static int aclk_popcorn_check_bump(RRDHOST *host)
+{
+ time_t now = now_monotonic_sec();
+ int updated = 0, ret;
+ ACLK_SHARED_STATE_LOCK;
+ rrdhost_aclk_state_lock(host);
+
+ ret = ACLK_IS_HOST_INITIALIZING(host);
+ if (unlikely(ACLK_IS_HOST_POPCORNING(host))) {
+ if(now != host->aclk_state.t_last_popcorn_update) {
+ updated = 1;
+ info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
+ }
+ host->aclk_state.t_last_popcorn_update = now;
+ rrdhost_aclk_state_unlock(host);
+
+ if (host != localhost && updated)
+ aclk_update_next_child_to_popcorn();
+
+ ACLK_SHARED_STATE_UNLOCK;
+ return ret;
+ }
+
+ rrdhost_aclk_state_unlock(host);
+ ACLK_SHARED_STATE_UNLOCK;
+ return ret;
+}
+
+inline static int aclk_host_initializing(RRDHOST *host)
+{
+ rrdhost_aclk_state_lock(host);
+ int ret = ACLK_IS_HOST_INITIALIZING(host);
+ rrdhost_aclk_state_unlock(host);
+ return ret;
+}
+
+static void aclk_start_host_popcorning(RRDHOST *host)
+{
+ usec_t now = now_monotonic_sec();
+ info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
+ ACLK_SHARED_STATE_LOCK;
+ rrdhost_aclk_state_lock(host);
+ if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
+ errno = 0;
+ error("Localhost is allowed to do popcorning only once after startup!");
+ rrdhost_aclk_state_unlock(host);
+ ACLK_SHARED_STATE_UNLOCK;
+ return;
+ }
+
+ host->aclk_state.state = ACLK_HOST_INITIALIZING;
+ host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
+ host->aclk_state.t_last_popcorn_update = now;
+ rrdhost_aclk_state_unlock(host);
+ if (host != localhost)
+ aclk_update_next_child_to_popcorn();
+ ACLK_SHARED_STATE_UNLOCK;
+}
+
+static void aclk_stop_host_popcorning(RRDHOST *host)
+{
+ ACLK_SHARED_STATE_LOCK;
+ rrdhost_aclk_state_lock(host);
+ if (!ACLK_IS_HOST_POPCORNING(host)) {
+ rrdhost_aclk_state_unlock(host);
+ ACLK_SHARED_STATE_UNLOCK;
+ return;
+ }
+
+ info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid);
+ host->aclk_state.t_last_popcorn_update = 0;
+ host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
+ rrdhost_aclk_state_unlock(host);
+
+ if(host == aclk_shared_state.next_popcorn_host) {
+ aclk_shared_state.next_popcorn_host = NULL;
+ aclk_update_next_child_to_popcorn();
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+}
+
+/*
+ * Add a new collector to the list
+ * If it exists, update the chart count
+ */
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+{
+ struct _collector *tmp_collector;
+ if (unlikely(!netdata_ready)) {
+ return;
+ }
+
+ COLLECTOR_LOCK;
+
+ tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
+
+ if (unlikely(tmp_collector->count != 1)) {
+ COLLECTOR_UNLOCK;
+ return;
+ }
+
+ COLLECTOR_UNLOCK;
+
+ if(aclk_popcorn_check_bump(host))
+ return;
+
+ if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
+}
+
+/*
+ * Delete a collector from the list
+ * If the chart count reaches zero the collector will be removed
+ * from the list by calling del_collector.
+ *
+ * This function will release the memory used and schedule
+ * a cloud update
+ */
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+{
+ struct _collector *tmp_collector;
+ if (unlikely(!netdata_ready)) {
+ return;
+ }
+
+ COLLECTOR_LOCK;
+
+ tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
+
+ if (unlikely(!tmp_collector || tmp_collector->count)) {
+ COLLECTOR_UNLOCK;
+ return;
+ }
+
+ debug(
+ D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
+ tmp_collector->count);
+
+ COLLECTOR_UNLOCK;
+
+ _free_collector(tmp_collector);
+
+ if (aclk_popcorn_check_bump(host))
+ return;
+
+ if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
+}
+
+static void aclk_graceful_disconnect()
+{
+ size_t write_q, write_q_bytes, read_q;
+ time_t event_loop_timeout;
+
+ // Send a graceful disconnect message
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg);
+ buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}");
+ aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
+ buffer_free(b);
+
+ event_loop_timeout = now_realtime_sec() + 5;
+ write_q = 1;
+ while (write_q && event_loop_timeout > now_realtime_sec()) {
+ _link_event_loop();
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ }
+
+ aclk_shutting_down = 1;
+ _link_shutdown();
+ aclk_lws_wss_mqtt_layer_disconect_notif();
+
+ write_q = 1;
+ event_loop_timeout = now_realtime_sec() + 5;
+ while (write_q && event_loop_timeout > now_realtime_sec()) {
+ _link_event_loop();
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ }
+ aclk_shutting_down = 0;
+}
+
+#ifndef __GNUC__
+#pragma region Incoming Msg Parsing
+#endif
+
+struct dictionary_singleton {
+ char *key;
+ char *result;
+};
+
+int json_extract_singleton(JSON_ENTRY *e)
+{
+ struct dictionary_singleton *data = e->callback_data;
+
+ switch (e->type) {
+ case JSON_OBJECT:
+ case JSON_ARRAY:
+ break;
+ case JSON_STRING:
+ if (!strcmp(e->name, data->key)) {
+ data->result = strdupz(e->data.string);
+ break;
+ }
+ break;
+ case JSON_NUMBER:
+ case JSON_BOOLEAN:
+ case JSON_NULL:
+ break;
+ }
+ return 0;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+
+#ifndef __GNUC__
+#pragma region Challenge Response
+#endif
+
+// Base-64 decoder.
+// Note: This is non-validating, invalid input will be decoded without an error.
+// Challenges are packed into json strings so we don't skip newlines.
+// Size errors (i.e. invalid input size or insufficient output space) are caught.
+size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
+{
+ static char lookup[256];
+ static int first_time=1;
+ if (first_time)
+ {
+ first_time = 0;
+ for(int i=0; i<256; i++)
+ lookup[i] = -1;
+ for(int i='A'; i<='Z'; i++)
+ lookup[i] = i-'A';
+ for(int i='a'; i<='z'; i++)
+ lookup[i] = i-'a' + 26;
+ for(int i='0'; i<='9'; i++)
+ lookup[i] = i-'0' + 52;
+ lookup['+'] = 62;
+ lookup['/'] = 63;
+ }
+ if ((input_size & 3) != 0)
+ {
+ error("Can't decode base-64 input length %zu", input_size);
+ return 0;
+ }
+ size_t unpadded_size = (input_size/4) * 3;
+ if ( unpadded_size > output_size )
+ {
+ error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
+ return 0;
+ }
+ // Don't check padding within full quantums
+ for (size_t i = 0 ; i < input_size-4 ; i+=4 )
+ {
+ uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
+ output[0] = value >> 16;
+ output[1] = value >> 8;
+ output[2] = value;
+ //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
+ output += 3;
+ input += 4;
+ }
+ // Handle padding only in last quantum
+ if (input[2] == '=') {
+ uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
+ output[0] = value >> 4;
+ //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
+ return unpadded_size-2;
+ }
+ else if (input[3] == '=') {
+ uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
+ output[0] = value >> 10;
+ output[1] = value >> 2;
+ //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
+ return unpadded_size-1;
+ }
+ else
+ {
+ uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
+ output[0] = value >> 16;
+ output[1] = value >> 8;
+ output[2] = value;
+ //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
+ return unpadded_size;
+ }
+}
+
+size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
+{
+ uint32_t value;
+ static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz"
+ "0123456789+/";
+ if ((input_size/3+1)*4 >= output_size)
+ {
+ error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
+ return 0;
+ }
+ size_t count = 0;
+ while (input_size>3)
+ {
+ value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
+ output[0] = lookup[value >> 18];
+ output[1] = lookup[(value >> 12) & 0x3f];
+ output[2] = lookup[(value >> 6) & 0x3f];
+ output[3] = lookup[value & 0x3f];
+ //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
+ output += 4;
+ input += 3;
+ input_size -= 3;
+ count += 4;
+ }
+ switch (input_size)
+ {
+ case 2:
+ value = (input[0] << 10) + (input[1] << 2);
+ output[0] = lookup[(value >> 12) & 0x3f];
+ output[1] = lookup[(value >> 6) & 0x3f];
+ output[2] = lookup[value & 0x3f];
+ output[3] = '=';
+ //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
+ count += 4;
+ break;
+ case 1:
+ value = input[0] << 4;
+ output[0] = lookup[(value >> 6) & 0x3f];
+ output[1] = lookup[value & 0x3f];
+ output[2] = '=';
+ output[3] = '=';
+ //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
+ count += 4;
+ break;
+ case 0:
+ break;
+ }
+ return count;
+}
+
+
+
+int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
+{
+ int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
+ if (result == -1) {
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ error("Decryption of the challenge failed: %s", err);
+ }
+ return result;
+}
+
+void aclk_get_challenge(char *aclk_hostname, int port)
+{
+ char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ debug(D_ACLK, "Performing challenge-response sequence");
+ if (aclk_password != NULL)
+ {
+ freez(aclk_password);
+ aclk_password = NULL;
+ }
+ // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
+ // TODO - target host?
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
+ {
+ error("Agent was not claimed - cannot perform challenge/response");
+ goto CLEANUP;
+ }
+ char url[1024];
+ sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
+ info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url);
+ if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
+ {
+ error("Challenge failed: %s", data_buffer);
+ goto CLEANUP;
+ }
+ struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
+
+ debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
+ if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
+ {
+ freez(challenge.result);
+ error("Could not parse the json response with the challenge: %s", data_buffer);
+ goto CLEANUP;
+ }
+ if (challenge.result == NULL ) {
+ error("Could not retrieve challenge from auth response: %s", data_buffer);
+ goto CLEANUP;
+ }
+
+
+ size_t challenge_len = strlen(challenge.result);
+ unsigned char decoded[512];
+ size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
+
+ unsigned char plaintext[4096]={};
+ int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
+ freez(challenge.result);
+ char encoded[512];
+ size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
+ encoded[encoded_len] = 0;
+ debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
+
+ char response_json[4096]={};
+ sprintf(response_json, "{\"response\":\"%s\"}", encoded);
+ debug(D_ACLK, "Password phase: %s",response_json);
+ // TODO - host
+ sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
+ if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
+ {
+ error("Challenge-response failed: %s", data_buffer);
+ goto CLEANUP;
+ }
+
+ debug(D_ACLK, "Password response from cloud: %s", data_buffer);
+
+ struct dictionary_singleton password = { .key = "password", .result = NULL };
+ if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
+ {
+ freez(password.result);
+ error("Could not parse the json response with the password: %s", data_buffer);
+ goto CLEANUP;
+ }
+
+ if (password.result == NULL ) {
+ error("Could not retrieve password from auth response");
+ goto CLEANUP;
+ }
+ if (aclk_password != NULL )
+ freez(aclk_password);
+ aclk_password = password.result;
+ if (aclk_username != NULL)
+ freez(aclk_username);
+ aclk_username = agent_id;
+ agent_id = NULL;
+
+CLEANUP:
+ if (agent_id != NULL)
+ freez(agent_id);
+ freez(data_buffer);
+ return;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+static void aclk_try_to_connect(char *hostname, int port)
+{
+ int rc;
+
+// this is usefull for developers working on ACLK
+// allows connecting agent to any MQTT broker
+// for debugging, development and testing purposes
+#ifndef ACLK_DISABLE_CHALLENGE
+ if (!aclk_private_key) {
+ error("Cannot try to establish the agent cloud link - no private key available!");
+ return;
+ }
+#endif
+
+ info("Attempting to establish the agent cloud link");
+#ifdef ACLK_DISABLE_CHALLENGE
+ error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing "
+ "and development purposes only. Warranty void. Won't be able "
+ "to connect to Netdata Cloud.");
+ if (aclk_password == NULL)
+ aclk_password = strdupz("anon");
+#else
+ aclk_get_challenge(hostname, port);
+ if (aclk_password == NULL)
+ return;
+#endif
+
+ aclk_connecting = 1;
+ create_publish_base_topic();
+
+ ACLK_SHARED_STATE_LOCK;
+ aclk_shared_state.version_neg = 0;
+ aclk_shared_state.version_neg_wait_till = 0;
+ ACLK_SHARED_STATE_UNLOCK;
+
+ rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password);
+ if (unlikely(rc)) {
+ error("Failed to initialize the agent cloud link library");
+ }
+}
+
+// Sends "hello" message to negotiate ACLK version with cloud
+static inline void aclk_hello_msg()
+{
+ BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+
+ char *msg_id = create_uuid();
+
+ ACLK_SHARED_STATE_LOCK;
+ aclk_shared_state.version_neg = 0;
+ aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
+ ACLK_SHARED_STATE_UNLOCK;
+
+ //Hello message is versioned separatelly from the rest of the protocol
+ aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
+ buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX);
+ aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id);
+ freez(msg_id);
+ buffer_free(buf);
+}
+
+/**
+ * Main agent cloud link thread
+ *
+ * This thread will simply call the main event loop that handles
+ * pending requests - both inbound and outbound
+ *
+ * @param ptr is a pointer to the netdata_static_thread structure.
+ *
+ * @return It always returns NULL
+ */
+void *aclk_main(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ struct aclk_query_threads query_threads;
+ struct aclk_stats_thread *stats_thread = NULL;
+ time_t last_periodic_query_wakeup = 0;
+
+ query_threads.thread_list = NULL;
+
+ // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
+ // as it must notify the far end that it shutdown gracefully and avoid the LWT.
+ netdata_thread_disable_cancelability();
+
+#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
+ info("Killing ACLK thread -> cloud functionality has been disabled");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+#endif
+
+#ifndef LWS_WITH_SOCKS5
+ ACLK_PROXY_TYPE proxy_type;
+ aclk_get_proxy(&proxy_type);
+ if(proxy_type == PROXY_TYPE_SOCKS5) {
+ error("Disabling ACLK due to requested SOCKS5 proxy.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+ }
+#endif
+
+ info("Waiting for netdata to be ready");
+ while (!netdata_ready) {
+ sleep_usec(USEC_PER_MS * 300);
+ }
+
+ info("Waiting for Cloud to be enabled");
+ while (!netdata_cloud_setting) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (netdata_exit) {
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+ }
+ }
+
+ query_threads.count = MIN(processors/2, 6);
+ query_threads.count = MAX(query_threads.count, 2);
+ query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
+ if(query_threads.count < 1) {
+ error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count);
+ query_threads.count = 1;
+ config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
+ }
+
+ //start localhost popcorning
+ aclk_start_host_popcorning(localhost);
+
+ aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
+ if (aclk_stats_enabled) {
+ stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
+ stats_thread->thread = mallocz(sizeof(netdata_thread_t));
+ stats_thread->query_thread_count = query_threads.count;
+ netdata_thread_create(
+ stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread);
+ }
+
+ char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
+ int port_num = 0;
+ info("Waiting for netdata to be claimed");
+ while(1) {
+ char *agent_id = is_agent_claimed();
+ while (likely(!agent_id)) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (netdata_exit)
+ goto exited;
+ agent_id = is_agent_claimed();
+ }
+ freez(agent_id);
+ // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
+ // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (cloud_base_url == NULL) {
+ error("Do not move the cloud base url out of post_conf_load!!");
+ goto exited;
+ }
+ if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num))
+ error("Agent is claimed but the configuration is invalid, please fix");
+ else if (!create_private_key() && !_mqtt_lib_init())
+ break;
+
+ for (int i=0; i<60; i++) {
+ if (netdata_exit)
+ goto exited;
+
+ sleep_usec(USEC_PER_SEC * 1);
+ }
+ }
+
+ usec_t reconnect_expiry = 0; // In usecs
+
+ while (!netdata_exit) {
+ static int first_init = 0;
+ /* size_t write_q, write_q_bytes, read_q;
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
+
+ if (aclk_disable_runtime && !aclk_connected) {
+ sleep(1);
+ continue;
+ }
+
+ if (aclk_kill_link) { // User has reloaded the claiming state
+ aclk_kill_link = 0;
+ aclk_graceful_disconnect();
+ create_private_key();
+ continue;
+ }
+
+ if (aclk_force_reconnect) {
+ aclk_lws_wss_destroy_context();
+ aclk_force_reconnect = 0;
+ }
+ if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
+ if (unlikely(!first_init)) {
+ aclk_try_to_connect(aclk_hostname, port_num);
+ first_init = 1;
+ } else {
+ if (aclk_connecting == 0) {
+ if (reconnect_expiry == 0) {
+ unsigned long int delay = aclk_reconnect_delay(1);
+ reconnect_expiry = now_realtime_usec() + delay * 1000;
+ info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
+ }
+ if (now_realtime_usec() >= reconnect_expiry) {
+ reconnect_expiry = 0;
+ aclk_try_to_connect(aclk_hostname, port_num);
+ }
+ sleep_usec(USEC_PER_MS * 100);
+ }
+ }
+ if (aclk_connecting) {
+ _link_event_loop();
+ sleep_usec(USEC_PER_MS * 100);
+ }
+ continue;
+ }
+
+ _link_event_loop();
+ if (unlikely(!aclk_connected || aclk_force_reconnect))
+ continue;
+ /*static int stress_counter = 0;
+ if (write_q_bytes==0 && stress_counter ++ >5)
+ {
+ aclk_send_stress_test(8000000);
+ stress_counter = 0;
+ }*/
+
+ if (unlikely(!aclk_subscribed)) {
+ aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
+ aclk_hello_msg();
+ }
+
+ if (unlikely(!query_threads.thread_list)) {
+ aclk_query_threads_start(&query_threads);
+ }
+
+ time_t now = now_monotonic_sec();
+ if(aclk_connected && last_periodic_query_wakeup < now) {
+ // to make `aclk_queue_query()` param `run_after` work
+ // also makes per child popcorning work
+ last_periodic_query_wakeup = now;
+ QUERY_THREAD_WAKEUP;
+ }
+ } // forever
+exited:
+ // Wakeup query thread to cleanup
+ QUERY_THREAD_WAKEUP_ALL;
+
+ freez(aclk_username);
+ freez(aclk_password);
+ freez(aclk_hostname);
+ if (aclk_private_key != NULL)
+ RSA_free(aclk_private_key);
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id && aclk_connected) {
+ freez(agent_id);
+ // Wakeup thread to cleanup
+ QUERY_THREAD_WAKEUP;
+ aclk_graceful_disconnect();
+ }
+
+ aclk_query_threads_cleanup(&query_threads);
+
+ _reset_collector_list();
+ freez(collector_list);
+
+ if(aclk_stats_enabled) {
+ netdata_thread_join(*stats_thread->thread, NULL);
+ aclk_stats_thread_cleanup();
+ freez(stats_thread->thread);
+ freez(stats_thread);
+ }
+
+ /*
+ * this must be last -> if all static threads signal
+ * THREAD_EXITED rrdengine will dealloc the RRDSETs
+ * and RRDDIMs that are used by still runing stat thread.
+ * see netdata_cleanup_and_exit() for reference
+ */
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+}
+
+/*
+ * Send a message to the cloud, using a base topic and sib_topic
+ * The final topic will be in the form <base_topic>/<sub_topic>
+ * If base_topic is missing then the global_base_topic will be used (if available)
+ *
+ */
+int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id)
+{
+ int rc;
+ int mid;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ UNUSED(msg_id);
+
+ if (!aclk_connected)
+ return 0;
+
+ if (unlikely(!message))
+ return 0;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+
+ if (unlikely(!final_topic)) {
+ errno = 0;
+ error("Unable to build outgoing topic; truncated?");
+ return 1;
+ }
+
+ ACLK_LOCK;
+ rc = _link_send_message(final_topic, message, len, &mid);
+ // TODO: link the msg_id with the mid so we can trace it
+ ACLK_UNLOCK;
+
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
+ }
+
+ return rc;
+}
+
+int aclk_send_message(char *sub_topic, char *message, char *msg_id)
+{
+ return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id);
+}
+
+/*
+ * Subscribe to a topic in the cloud
+ * The final subscription will be in the form
+ * /agent/claim_id/<sub_topic>
+ */
+int aclk_subscribe(char *sub_topic, int qos)
+{
+ int rc;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+ if (unlikely(!final_topic)) {
+ errno = 0;
+ error("Unable to build outgoing topic; truncated?");
+ return 1;
+ }
+
+ if (!aclk_connected) {
+ error("Cannot subscribe to %s - not connected!", topic);
+ return 1;
+ }
+
+ ACLK_LOCK;
+ rc = _link_subscribe(final_topic, qos);
+ ACLK_UNLOCK;
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
+ }
+
+ return rc;
+}
+
+// This is called from a callback when the link goes up
+void aclk_connect()
+{
+ info("Connection detected (%u queued queries)", aclk_query_size());
+
+ aclk_stats_upd_online(1);
+
+ aclk_connected = 1;
+ aclk_reconnect_delay(0);
+
+ QUERY_THREAD_WAKEUP;
+ return;
+}
+
+// This is called from a callback when the link goes down
+void aclk_disconnect()
+{
+ if (likely(aclk_connected))
+ info("Disconnect detected (%u queued queries)", aclk_query_size());
+
+ aclk_stats_upd_online(0);
+
+ aclk_subscribed = 0;
+ rrdhost_aclk_state_lock(localhost);
+ localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED;
+ rrdhost_aclk_state_unlock(localhost);
+ aclk_connected = 0;
+ aclk_connecting = 0;
+ aclk_force_reconnect = 1;
+}
+
+inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version)
+{
+ uuid_t uuid;
+ char uuid_str[36 + 1];
+
+ if (unlikely(!msg_id)) {
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+ msg_id = uuid_str;
+ }
+
+ if (ts_secs == 0) {
+ ts_us = now_realtime_usec();
+ ts_secs = ts_us / USEC_PER_SEC;
+ ts_us = ts_us % USEC_PER_SEC;
+ }
+
+ buffer_sprintf(
+ dest,
+ "{\t\"type\": \"%s\",\n"
+ "\t\"msg-id\": \"%s\",\n"
+ "\t\"timestamp\": %ld,\n"
+ "\t\"timestamp-offset-usec\": %llu,\n"
+ "\t\"connect\": %ld,\n"
+ "\t\"connect-offset-usec\": %llu,\n"
+ "\t\"version\": %d",
+ type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version);
+
+ debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs);
+}
+
+
+/*
+ * This will send alarm information which includes
+ * configured alarms
+ * alarm_log
+ * active alarms
+ */
+void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
+
+void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
+{
+ BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+
+ char *msg_id = create_uuid();
+ buffer_flush(local_buffer);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ debug(D_ACLK, "Metadata alarms start");
+
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+
+ if (metadata_submitted == ACLK_METADATA_SENT)
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
+ else
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
+
+ buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
+ health_alarms2json(localhost, local_buffer, 1);
+ debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
+ // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
+ // health_alarm_log2json(localhost, local_buffer, 0);
+ // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
+ buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
+ health_active_log_alarms_2json(localhost, local_buffer);
+ //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
+
+
+
+ buffer_sprintf(local_buffer, "\n}\n}");
+ aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
+
+ freez(msg_id);
+ buffer_free(local_buffer);
+}
+
+/*
+ * This will send the agent metadata
+ * /api/v1/info
+ * charts
+ */
+int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
+{
+ BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+
+ debug(D_ACLK, "Metadata /info start");
+
+ char *msg_id = create_uuid();
+ buffer_flush(local_buffer);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+ if (metadata_submitted == ACLK_METADATA_SENT)
+ aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg);
+ else
+ aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
+ buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
+ web_client_api_request_v1_info_fill_buffer(host, local_buffer);
+ debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
+
+ buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
+ charts2json(host, local_buffer, 1, 0);
+ buffer_sprintf(local_buffer, "\n}\n}");
+ debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
+
+ aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
+
+ freez(msg_id);
+ buffer_free(local_buffer);
+ return 0;
+}
+
+int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
+{
+ BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg);
+
+ debug(D_ACLK, "Sending Child Disconnect");
+
+ char *msg_id = create_uuid();
+
+ aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg);
+
+ buffer_strcat(local_buffer, ",\"payload\":");
+
+ buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid);
+ rrdhost_aclk_state_lock(host);
+ if(host->aclk_state.claimed_id)
+ buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id);
+ else
+ buffer_strcat(local_buffer, "null}}");
+
+ rrdhost_aclk_state_unlock(host);
+
+ aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
+
+ freez(msg_id);
+ buffer_free(local_buffer);
+ return 0;
+}
+
+void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
+{
+#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
+ if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ return;
+#else
+#warning "This check became unnecessary. Remove"
+#endif
+
+ if (unlikely(aclk_host_initializing(localhost)))
+ return;
+
+ switch (cmd) {
+ case ACLK_CMD_CHILD_CONNECT:
+ debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
+ aclk_start_host_popcorning(host);
+ aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
+ break;
+ case ACLK_CMD_CHILD_DISCONNECT:
+ debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
+ aclk_stop_host_popcorning(host);
+ aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
+ break;
+ default:
+ error("Unknown command for aclk_host_state_update %d.", (int)cmd);
+ }
+}
+
+void aclk_send_stress_test(size_t size)
+{
+ char *buffer = mallocz(size);
+ if (buffer != NULL)
+ {
+ for(size_t i=0; i<size; i++)
+ buffer[i] = 'x';
+ buffer[size-1] = 0;
+ time_t time_created = now_realtime_sec();
+ sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
+ buffer[strlen(buffer)] = '"';
+ buffer[size-2] = '}';
+ buffer[size-3] = '"';
+ aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
+ error("Sending stress of size %zu at time %ld", size, time_created);
+ }
+ free(buffer);
+}
+
+// Send info metadata message to the cloud if the link is established
+// or on request
+int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
+{
+ aclk_send_info_metadata(state, host);
+
+ if(host == localhost)
+ aclk_send_alarm_metadata(state);
+
+ return 0;
+}
+
+void aclk_single_update_disable()
+{
+ aclk_disable_single_updates = 1;
+}
+
+void aclk_single_update_enable()
+{
+ aclk_disable_single_updates = 0;
+}
+
+// Trigged by a health reload, sends the alarm metadata
+void aclk_alarm_reload()
+{
+ if (unlikely(aclk_host_initializing(localhost)))
+ return;
+
+ if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ if (likely(aclk_connected)) {
+ errno = 0;
+ error("ACLK failed to queue on_connect command on alarm reload");
+ }
+ }
+}
+//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
+
+int aclk_send_single_chart(RRDHOST *host, char *chart)
+{
+ RRDSET *st = rrdset_find(host, chart);
+ if (!st)
+ st = rrdset_find_byname(host, chart);
+ if (!st) {
+ info("FAILED to find chart %s", chart);
+ return 1;
+ }
+
+ BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ char *msg_id = create_uuid();
+ buffer_flush(local_buffer);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
+ rrdset2json(st, local_buffer, NULL, NULL, 1);
+ buffer_sprintf(local_buffer, "\t\n}");
+
+ aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
+
+ freez(msg_id);
+ buffer_free(local_buffer);
+ return 0;
+}
+
+int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
+{
+#ifndef ENABLE_ACLK
+ UNUSED(host);
+ UNUSED(chart_name);
+ return 0;
+#else
+ if (unlikely(!netdata_ready))
+ return 0;
+
+ if (!netdata_cloud_setting)
+ return 0;
+
+ if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
+ return 0;
+
+ if (aclk_host_initializing(localhost))
+ return 0;
+
+ if (unlikely(aclk_disable_single_updates))
+ return 0;
+
+ if (aclk_popcorn_check_bump(host))
+ return 0;
+
+ if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
+ if (likely(aclk_connected)) {
+ errno = 0;
+ error("ACLK failed to queue chart_update command");
+ }
+ }
+
+ return 0;
+#endif
+}
+
+int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
+{
+ BUFFER *local_buffer = NULL;
+
+ if (unlikely(!netdata_ready))
+ return 0;
+
+ if (host != localhost)
+ return 0;
+
+ if(unlikely(aclk_host_initializing(localhost)))
+ return 0;
+
+ /*
+ * Check if individual updates have been disabled
+ * This will be the case when we do health reload
+ * and all the alarms will be dropped and recreated.
+ * At the end of the health reload the complete alarm metadata
+ * info will be sent
+ */
+ if (unlikely(aclk_disable_single_updates))
+ return 0;
+
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ char *msg_id = create_uuid();
+
+ buffer_flush(local_buffer);
+ aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg);
+ buffer_strcat(local_buffer, ",\n\t\"payload\": ");
+
+ netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
+ health_alarm_entry2json_nolock(local_buffer, ae, host);
+ netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
+
+ buffer_sprintf(local_buffer, "\n}");
+
+ if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
+ if (likely(aclk_connected)) {
+ errno = 0;
+ error("ACLK failed to queue alarm_command on alarm_update");
+ }
+ }
+
+ freez(msg_id);
+ buffer_free(local_buffer);
+
+ return 0;
+}
diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h
new file mode 100644
index 000000000..e777e0b19
--- /dev/null
+++ b/aclk/legacy/agent_cloud_link.h
@@ -0,0 +1,93 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_AGENT_CLOUD_LINK_H
+#define NETDATA_AGENT_CLOUD_LINK_H
+
+#include "../../daemon/common.h"
+#include "mqtt.h"
+#include "aclk_common.h"
+
+#define ACLK_THREAD_NAME "ACLK_Query"
+#define ACLK_CHART_TOPIC "outbound/meta"
+#define ACLK_ALARMS_TOPIC "outbound/alarms"
+#define ACLK_METADATA_TOPIC "outbound/meta"
+#define ACLK_COMMAND_TOPIC "inbound/cmd"
+#define ACLK_TOPIC_STRUCTURE "/agent/%s"
+
+#define ACLK_MAX_BACKOFF_DELAY 1024 // maximum backoff delay in seconds
+
+#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg)
+#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds
+#define ACLK_QOS 1
+#define ACLK_PING_INTERVAL 60
+#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop
+
+#define ACLK_MAX_TOPIC 255
+
+#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now
+#define ACLK_DEFAULT_PORT 9002
+#define ACLK_DEFAULT_HOST "localhost"
+
+#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
+
+struct aclk_request {
+ char *type_id;
+ char *msg_id;
+ char *callback_topic;
+ char *payload;
+ int version;
+ int min_version;
+ int max_version;
+};
+
+typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION;
+
+void *aclk_main(void *ptr);
+
+#define NETDATA_ACLK_HOOK \
+ { .name = "ACLK_Main", \
+ .config_section = NULL, \
+ .config_name = NULL, \
+ .enabled = 1, \
+ .thread = NULL, \
+ .init_routine = NULL, \
+ .start_routine = aclk_main },
+
+extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
+extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id);
+
+extern char *is_agent_claimed(void);
+extern void aclk_lws_wss_mqtt_layer_disconect_notif();
+char *create_uuid();
+
+// callbacks for agent cloud link
+int aclk_subscribe(char *topic, int qos);
+int cloud_to_agent_parse(JSON_ENTRY *e);
+void aclk_disconnect();
+void aclk_connect();
+
+int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host);
+int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
+void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
+
+int aclk_wait_for_initialization();
+char *create_publish_base_topic();
+
+int aclk_send_single_chart(RRDHOST *host, char *chart);
+int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
+int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version);
+int aclk_handle_cloud_message(char *payload);
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_alarm_reload();
+unsigned long int aclk_reconnect_delay(int mode);
+extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
+void aclk_single_update_enable();
+void aclk_single_update_disable();
+
+void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd);
+int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd);
+void aclk_update_next_child_to_popcorn(void);
+
+#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c
new file mode 100644
index 000000000..6f38a20dc
--- /dev/null
+++ b/aclk/legacy/mqtt.c
@@ -0,0 +1,366 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <libnetdata/json/json.h>
+#include "../../daemon/common.h"
+#include "mqtt.h"
+#include "aclk_lws_wss_client.h"
+#include "aclk_stats.h"
+#include "aclk_rx_msgs.h"
+
+extern usec_t aclk_session_us;
+extern time_t aclk_session_sec;
+
+inline const char *_link_strerror(int rc)
+{
+ return mosquitto_strerror(rc);
+}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+static struct timeval sendTimes[1024];
+#endif
+
+static struct mosquitto *mosq = NULL;
+
+
+void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+
+ aclk_handle_cloud_message(msg->payload);
+}
+
+void publish_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+ UNUSED(rc);
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct timeval now, *orig;
+ now_realtime_timeval(&now);
+ orig = &sendTimes[ rc & 0x3ff ];
+ int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec);
+ diff /= 1000;
+
+ info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff);
+
+ aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff);
+#endif
+ return;
+}
+
+void connect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+ UNUSED(rc);
+
+ info("Connection to cloud estabilished");
+ aclk_connect();
+
+ return;
+}
+
+void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+ UNUSED(mosq);
+ UNUSED(obj);
+ UNUSED(rc);
+
+ if (netdata_exit)
+ info("Connection to cloud terminated due to agent shutdown");
+ else {
+ errno = 0;
+ error("Connection to cloud failed");
+ }
+ aclk_disconnect();
+
+ aclk_lws_wss_mqtt_layer_disconect_notif();
+
+ return;
+}
+
+void _show_mqtt_info()
+{
+ int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+ libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+
+ info(
+ "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
+ libmosq_revision);
+}
+
+size_t _mqtt_external_write_hook(void *buf, size_t count)
+{
+ return aclk_lws_wss_client_write(buf, count);
+}
+
+size_t _mqtt_external_read_hook(void *buf, size_t count)
+{
+ return aclk_lws_wss_client_read(buf, count);
+}
+
+int _mqtt_lib_init()
+{
+ int rc;
+ //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+ /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version.
+ char *ca_crt;
+ char *server_crt;
+ char *server_key;
+
+ // show library info so can have it in the logfile
+ //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+ ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*");
+ server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*");
+ server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*");
+
+ if (ca_crt[0] == '*') {
+ freez(ca_crt);
+ ca_crt = NULL;
+ }
+
+ if (server_crt[0] == '*') {
+ freez(server_crt);
+ server_crt = NULL;
+ }
+
+ if (server_key[0] == '*') {
+ freez(server_key);
+ server_key = NULL;
+ }
+ */
+
+ // info(
+ // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
+ // libmosq_revision);
+
+ rc = mosquitto_lib_init();
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ error("Failed to initialize MQTT (libmosquitto library)");
+ return 1;
+ }
+ return 0;
+}
+
+static int _mqtt_create_connection(char *username, char *password)
+{
+ if (mosq != NULL)
+ mosquitto_destroy(mosq);
+ mosq = mosquitto_new(username, true, NULL);
+ if (unlikely(!mosq)) {
+ mosquitto_lib_cleanup();
+ error("MQTT new structure -- %s", mosquitto_strerror(errno));
+ return MOSQ_ERR_UNKNOWN;
+ }
+
+ // Record the session start time to allow a nominal LWT timestamp
+ usec_t now = now_realtime_usec();
+ aclk_session_sec = now / USEC_PER_SEC;
+ aclk_session_us = now % USEC_PER_SEC;
+
+ _link_set_lwt("outbound/meta", 2);
+
+ mosquitto_connect_callback_set(mosq, connect_callback);
+ mosquitto_disconnect_callback_set(mosq, disconnect_callback);
+ mosquitto_publish_callback_set(mosq, publish_callback);
+
+ info("Using challenge-response: %s / %s", username, password);
+ mosquitto_username_pw_set(mosq, username, password);
+
+ int rc = mosquitto_threaded_set(mosq, 1);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc));
+
+#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000
+ rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc));
+
+ rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1);
+ info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc));
+#endif
+
+ return rc;
+}
+
+static int _link_mqtt_connect(char *aclk_hostname, int aclk_port)
+{
+ int rc;
+
+ rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ error(
+ "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc,
+ mosquitto_strerror(rc));
+ else
+ info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port);
+
+ return rc;
+}
+
+static inline void _link_mosquitto_write()
+{
+ int rc;
+
+ if (unlikely(!mosq)) {
+ return;
+ }
+
+ rc = mosquitto_loop_misc(mosq);
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
+
+ if (likely(mosquitto_want_write(mosq))) {
+ rc = mosquitto_loop_write(mosq, 1);
+ if (rc != MOSQ_ERR_SUCCESS)
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc));
+ }
+}
+
+void aclk_lws_connection_established(char *hostname, int port)
+{
+ _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected.
+ _link_mosquitto_write();
+}
+
+void aclk_lws_connection_data_received()
+{
+ int rc = mosquitto_loop_read(mosq, 1);
+ if (rc != MOSQ_ERR_SUCCESS)
+ debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc));
+}
+
+void aclk_lws_connection_closed()
+{
+ aclk_disconnect();
+
+}
+
+
+int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password)
+{
+ if(aclk_lws_wss_connect(aclk_hostname, aclk_port))
+ return MOSQ_ERR_UNKNOWN;
+ aclk_lws_wss_service_loop();
+
+ int rc = _mqtt_create_connection(username, password);
+ if (rc!= MOSQ_ERR_SUCCESS)
+ return rc;
+
+ mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook);
+ return rc;
+}
+
+inline int _link_event_loop()
+{
+
+ // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1).
+ _link_mosquitto_write();
+ aclk_lws_wss_service_loop();
+
+ // this is because if use LWS we don't want
+ // mqtt to reconnect by itself
+ return MOSQ_ERR_SUCCESS;
+}
+
+void _link_shutdown()
+{
+ int rc;
+
+ if (likely(!mosq))
+ return;
+
+ rc = mosquitto_disconnect(mosq);
+ switch (rc) {
+ case MOSQ_ERR_SUCCESS:
+ info("MQTT disconnected from broker");
+ break;
+ default:
+ info("MQTT invalid structure");
+ break;
+ };
+}
+
+
+int _link_set_lwt(char *sub_topic, int qos)
+{
+ int rc;
+ char topic[ACLK_MAX_TOPIC + 1];
+ char *final_topic;
+
+ final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+ if (unlikely(!final_topic)) {
+ errno = 0;
+ error("Unable to build outgoing topic; truncated?");
+ return 1;
+ }
+
+ usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1;
+ BUFFER *b = buffer_create(512);
+ aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION);
+ buffer_strcat(b, ", \"payload\": \"unexpected\" }");
+ rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0);
+ buffer_free(b);
+
+ return rc;
+}
+
+int _link_subscribe(char *topic, int qos)
+{
+ int rc;
+
+ if (unlikely(!mosq))
+ return 1;
+
+ mosquitto_message_callback_set(mosq, mqtt_message_callback);
+
+ rc = mosquitto_subscribe(mosq, NULL, topic, qos);
+ if (unlikely(rc)) {
+ errno = 0;
+ error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc));
+ return 1;
+ }
+
+ _link_mosquitto_write();
+ return 0;
+}
+
+/*
+ * Send a message to the cloud to specific topic
+ *
+ */
+
+int _link_send_message(char *topic, const void *message, size_t len, int *mid)
+{
+ int rc;
+ size_t write_q, write_q_bytes, read_q;
+
+ rc = mosquitto_pub_topic_check(topic);
+
+ if (unlikely(rc != MOSQ_ERR_SUCCESS))
+ return rc;
+
+ lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+ rc = mosquitto_publish(mosq, mid, topic, len, message, ACLK_QOS, 0);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ char msg_head[64];
+ memset(msg_head, 0, sizeof(msg_head));
+ strncpy(msg_head, (char*)message, 60);
+ for (size_t i = 0; i < sizeof(msg_head); i++)
+ if(msg_head[i] == '\n') msg_head[i] = ' ';
+ info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", (int)len,
+ *mid, write_q, write_q_bytes, read_q, msg_head);
+ now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]);
+#endif
+
+ // TODO: Add better handling -- error will flood the logfile here
+ if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+ errno = 0;
+ error("MQTT message failed : %s", mosquitto_strerror(rc));
+ }
+ _link_mosquitto_write();
+ return rc;
+}
diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h
new file mode 100644
index 000000000..cc4765d62
--- /dev/null
+++ b/aclk/legacy/mqtt.h
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_MQTT_H
+#define NETDATA_MQTT_H
+
+#ifdef ENABLE_ACLK
+#include "externaldeps/mosquitto/mosquitto.h"
+#endif
+
+void _show_mqtt_info();
+int _link_event_loop();
+void _link_shutdown();
+int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password);
+//int _link_lib_init();
+int _mqtt_lib_init();
+int _link_subscribe(char *topic, int qos);
+int _link_send_message(char *topic, const void *message, size_t len, int *mid);
+const char *_link_strerror(int rc);
+int _link_set_lwt(char *topic, int qos);
+
+
+int aclk_handle_cloud_message(char *);
+extern char *get_topic(char *sub_topic, char *final_topic, int max_size);
+
+#endif //NETDATA_MQTT_H
diff --git a/aclk/legacy/tests/fake-charts.d.plugin b/aclk/legacy/tests/fake-charts.d.plugin
new file mode 100644
index 000000000..a13c6bab8
--- /dev/null
+++ b/aclk/legacy/tests/fake-charts.d.plugin
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+
+sleep 45 # Wait until popcorning finishes
+
+echo "CHART aclk_test.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1"
+sleep 5
+echo "DIMENSION aclk1 '' percentage-of-absolute 1 1"
+sleep 5
+echo "BEGIN aclk_test.newcol 1000000"
+echo "SET aclk1 = 3"
+echo "END"
+sleep 5
+echo "DIMENSION aclk2 '' percentage-of-absolute 1 1"
+sleep 5
+echo "BEGIN aclk_test.newcol 1000000"
+echo "SET aclk1 = 3"
+echo "SET aclk2 = 3"
+echo "END"
+sleep 5
+echo "CHART aclk_test2.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1"
+echo "DIMENSION aclk1 '' percentage-of-absolute 1 1"
+
+sleep 5
+exit 0 # Signal that we are done
diff --git a/aclk/legacy/tests/install-fake-charts.d.sh.in b/aclk/legacy/tests/install-fake-charts.d.sh.in
new file mode 100644
index 000000000..ac002a2bd
--- /dev/null
+++ b/aclk/legacy/tests/install-fake-charts.d.sh.in
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+
+TARGET="@pluginsdir_POST@"
+BASE="$(cd "$(dirname "$0")" && pwd)"
+
+cp "$BASE/fake-charts.d.plugin" "$TARGET/charts.d.plugin"
diff --git a/aclk/legacy/tests/launch-paho.sh b/aclk/legacy/tests/launch-paho.sh
new file mode 100755
index 000000000..1c2cb5f2c
--- /dev/null
+++ b/aclk/legacy/tests/launch-paho.sh
@@ -0,0 +1,4 @@
+#!/usr/bin/env bash
+
+docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client
+docker run -it paho-client
diff --git a/aclk/legacy/tests/paho-inspection.py b/aclk/legacy/tests/paho-inspection.py
new file mode 100644
index 000000000..20ab523d4
--- /dev/null
+++ b/aclk/legacy/tests/paho-inspection.py
@@ -0,0 +1,59 @@
+import ssl
+import paho.mqtt.client as mqtt
+import json
+import time
+import sys
+
+def on_connect(mqttc, obj, flags, rc):
+ if rc==0:
+ print("Successful connection", flush=True)
+ else :
+ print(f"Connection error rc={rc}", flush=True)
+ mqttc.subscribe("/agent/#",0)
+
+def on_disconnect(mqttc, obj, flags, rc):
+ print("disconnected rc: "+str(rc), flush=True)
+
+def on_message(mqttc, obj, msg):
+ print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True)
+ try:
+ print(f"Trying decode of {msg.payload[:60]}",flush=True)
+ api_msg = json.loads(msg.payload)
+ except Exception as e:
+ print(e,flush=True)
+ return
+ ts = api_msg["timestamp"]
+ mtype = api_msg["type"]
+ print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True)
+ now = time.time()
+ print(f"Current {now} -> Delay {now-ts}", flush=True)
+ if mtype=="disconnect":
+ print(f"Message dump: {api_msg}", flush=True)
+
+def on_publish(mqttc, obj, mid):
+ print("mid: "+str(mid), flush=True)
+
+def on_subscribe(mqttc, obj, mid, granted_qos):
+ print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True)
+
+def on_log(mqttc, obj, level, string):
+ print(string)
+
+print(f"Starting paho-inspection on {sys.argv[1]}", flush=True)
+mqttc = mqtt.Client(transport='websockets',client_id="paho")
+#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
+#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
+mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
+mqttc.tls_insecure_set(True)
+mqttc.on_message = on_message
+mqttc.on_connect = on_connect
+mqttc.on_disconnect = on_disconnect
+mqttc.on_publish = on_publish
+mqttc.on_subscribe = on_subscribe
+mqttc.username_pw_set("paho","paho")
+mqttc.connect(sys.argv[1], 8443, 60)
+
+#mqttc.publish("/agent/mine","Test1")
+#mqttc.subscribe("$SYS/#", 0)
+print("Connected succesfully, monitoring /agent/#", flush=True)
+mqttc.loop_forever()
diff --git a/aclk/legacy/tests/paho.Dockerfile b/aclk/legacy/tests/paho.Dockerfile
new file mode 100644
index 000000000..d67cc4cb0
--- /dev/null
+++ b/aclk/legacy/tests/paho.Dockerfile
@@ -0,0 +1,14 @@
+FROM archlinux/base:latest
+
+RUN pacman -Syyu --noconfirm
+RUN pacman --noconfirm --needed -S python-pip
+
+RUN pip install paho-mqtt
+
+RUN mkdir -p /opt/paho
+COPY paho-inspection.py /opt/paho/
+
+WORKDIR /opt/paho
+ARG HOST_HOSTNAME
+RUN echo $HOST_HOSTNAME >host
+CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"]