diff options
Diffstat (limited to '')
-rw-r--r-- | aclk/legacy/Makefile.am | 19 | ||||
-rw-r--r-- | aclk/legacy/aclk_common.c | 236 | ||||
-rw-r--r-- | aclk/legacy/aclk_common.h | 70 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_https_client.c | 246 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_https_client.h | 18 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.c | 613 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.h | 92 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.c | 789 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.h | 40 | ||||
-rw-r--r-- | aclk/legacy/aclk_rrdhost_state.h | 42 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.c | 365 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.h | 13 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.c | 298 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.h | 91 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.c | 1683 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.h | 93 | ||||
-rw-r--r-- | aclk/legacy/mqtt.c | 366 | ||||
-rw-r--r-- | aclk/legacy/mqtt.h | 25 | ||||
-rw-r--r-- | aclk/legacy/tests/fake-charts.d.plugin | 24 | ||||
-rw-r--r-- | aclk/legacy/tests/install-fake-charts.d.sh.in | 6 | ||||
-rwxr-xr-x | aclk/legacy/tests/launch-paho.sh | 4 | ||||
-rw-r--r-- | aclk/legacy/tests/paho-inspection.py | 59 | ||||
-rw-r--r-- | aclk/legacy/tests/paho.Dockerfile | 14 |
23 files changed, 5206 insertions, 0 deletions
diff --git a/aclk/legacy/Makefile.am b/aclk/legacy/Makefile.am new file mode 100644 index 000000000..1cd876b40 --- /dev/null +++ b/aclk/legacy/Makefile.am @@ -0,0 +1,19 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +CLEANFILES = \ + tests/install-fake-charts.d.sh \ + $(NULL) + +include $(top_srcdir)/build/subst.inc +SUFFIXES = .in + +#sbin_SCRIPTS = \ +# tests/install-fake-charts.d.sh \ +# $(NULL) + +dist_noinst_SCRIPTS = tests/install-fake-charts.d.sh +dist_noinst_DATA = tests/install-fake-charts.d.sh.in + diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c new file mode 100644 index 000000000..7c8421a93 --- /dev/null +++ b/aclk/legacy/aclk_common.c @@ -0,0 +1,236 @@ +#include "aclk_common.h" + +#include "../../daemon/common.h" + +#ifdef ENABLE_ACLK +#include <libwebsockets.h> +#endif + +netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; + +int aclk_disable_runtime = 0; +int aclk_kill_link = 0; + +struct aclk_shared_state aclk_shared_state = { + .version_neg = 0, + .version_neg_wait_till = 0 +}; + +struct { + ACLK_PROXY_TYPE type; + const char *url_str; +} supported_proxy_types[] = { + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, +}; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) +{ + switch (*type) { + case PROXY_DISABLED: + return "disabled"; + case PROXY_TYPE_HTTP: + return "HTTP"; + case PROXY_TYPE_SOCKS5: + return "SOCKS"; + default: + return "Unknown"; + } +} + +static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) +{ + int i = 0; + while (supported_proxy_types[i].url_str) { + if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) + return supported_proxy_types[i].type; + i++; + } + return PROXY_TYPE_UNKNOWN; +} + +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) +{ + if (!string) + return PROXY_TYPE_UNKNOWN; + + while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove) + string++; + + if (!*string) + return PROXY_TYPE_UNKNOWN; + + return aclk_find_proxy(string); +} + +// helper function to censor user&password +// for logging purposes +void safe_log_proxy_censor(char *proxy) +{ + size_t length = strlen(proxy); + char *auth = proxy + length - 1; + char *cur; + + while ((auth >= proxy) && (*auth != '@')) + auth--; + + //if not found or @ is first char do nothing + if (auth <= proxy) + return; + + cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + if (!cur) + cur = proxy; + else + cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + while (cur < auth) { + *cur = 'X'; + cur++; + } +} + +static inline void safe_log_proxy_error(char *str, const char *proxy) +{ + char *log = strdupz(proxy); + safe_log_proxy_censor(log); + error("%s Provided Value:\"%s\"", str, log); + freez(log); +} + +static inline int check_socks_enviroment(const char **proxy) +{ + char *tmp = getenv("socks_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +static inline int check_http_enviroment(const char **proxy) +{ + char *tmp = getenv("http_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) +{ + const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); + *type = PROXY_DISABLED; + + if (strcmp(proxy, "none") == 0) + return proxy; + + if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { + if (check_socks_enviroment(&proxy) == 0) { +#ifdef LWS_WITH_SOCKS5 + *type = PROXY_TYPE_SOCKS5; + return proxy; +#else + safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " + "but Libwebsockets used doesn't have SOCKS5 support built in. " + "Ignoring and checking for other options.", + proxy); +#endif + } + if (check_http_enviroment(&proxy) == 0) + *type = PROXY_TYPE_HTTP; + return proxy; + } + + *type = aclk_verify_proxy(proxy); +#ifndef LWS_WITH_SOCKS5 + if (*type == PROXY_TYPE_SOCKS5) { + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", + proxy); + } +#endif + if (*type == PROXY_TYPE_UNKNOWN) { + *type = PROXY_DISABLED; + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + proxy); + } + + return proxy; +} + +// helper function to read settings only once (static) +// as claiming, challenge/response and ACLK +// read the same thing, no need to parse again +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) +{ + static const char *proxy = NULL; + static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; + + if (proxy_type == PROXY_NOT_SET) + proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); + + *type = proxy_type; + return proxy; +} + +int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) +{ + int pos = 0; + if (!strncmp("https://", url, 8)) { + pos = 8; + } else if (!strncmp("http://", url, 7)) { + error("Cannot connect ACLK over %s -> unencrypted link is not supported", url); + return 1; + } + int host_end = pos; + while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':') + host_end++; + if (url[host_end] == 0) { + *aclk_hostname = strdupz(url + pos); + *aclk_port = 443; + info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; + } + if (url[host_end] == ':') { + *aclk_hostname = callocz(host_end - pos + 1, 1); + strncpy(*aclk_hostname, url + pos, host_end - pos); + int port_end = host_end + 1; + while (url[port_end] >= '0' && url[port_end] <= '9') + port_end++; + if (port_end - host_end > 6) { + error("Port specified in %s is invalid", url); + return 0; + } + *aclk_port = atoi(&url[host_end+1]); + } + if (url[host_end] == '/') { + *aclk_port = 443; + *aclk_hostname = callocz(1, host_end - pos + 1); + strncpy(*aclk_hostname, url+pos, host_end - pos); + } + info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; +} diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h new file mode 100644 index 000000000..2dc0aa553 --- /dev/null +++ b/aclk/legacy/aclk_common.h @@ -0,0 +1,70 @@ +#ifndef ACLK_COMMON_H +#define ACLK_COMMON_H + +#include "aclk_rrdhost_state.h" +#include "../../daemon/common.h" + +extern netdata_mutex_t aclk_shared_state_mutex; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +// minimum and maximum supported version of ACLK +// in this version of agent +#define ACLK_VERSION_MIN 2 +#define ACLK_VERSION_MAX 3 + +// Version negotiation messages have they own versioning +// this is also used for LWT message as we set that up +// before version negotiation +#define ACLK_VERSION_NEG_VERSION 1 + +// Maximum time to wait for version negotiation before aborting +// and defaulting to oldest supported version +#define VERSION_NEG_TIMEOUT 3 + +#if ACLK_VERSION_MIN > ACLK_VERSION_MAX +#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" +#endif + +// Define ACLK Feature Version Boundaries Here +#define ACLK_V_COMPRESSION 2 +#define ACLK_V_CHILDRENSTATE 3 + +#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING) +#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update) + +extern struct aclk_shared_state { + // optimization to avoid looping trough hosts + // every time Query Thread wakes up + RRDHOST *next_popcorn_host; + + // read only while ACLK connected + // protect by lock otherwise + int version_neg; + usec_t version_neg_wait_till; +} aclk_shared_state; + +typedef enum aclk_proxy_type { + PROXY_TYPE_UNKNOWN = 0, + PROXY_TYPE_SOCKS5, + PROXY_TYPE_HTTP, + PROXY_DISABLED, + PROXY_NOT_SET, +} ACLK_PROXY_TYPE; + +extern int aclk_kill_link; // Tells the agent to tear down the link +extern int aclk_disable_runtime; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); + +#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" +#define ACLK_PROXY_ENV "env" +#define ACLK_PROXY_CONFIG_VAR "proxy" + +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); +void safe_log_proxy_censor(char *proxy); +int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); + +#endif //ACLK_COMMON_H diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c new file mode 100644 index 000000000..c1856ed2c --- /dev/null +++ b/aclk/legacy/aclk_lws_https_client.c @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define ACLK_LWS_HTTPS_CLIENT_INTERNAL +#include "aclk_lws_https_client.h" + +#include "aclk_common.h" + +#include "aclk_lws_wss_client.h" + +#define SMALL_BUFFER 16 + +struct simple_hcc_data { + char *data; + size_t data_size; + size_t written; + char lws_work_buffer[1024 + LWS_PRE]; + char *payload; + int response_code; + int done; +}; + +static int simple_https_client_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + UNUSED(user); + int n; + char *ptr; + char buffer[SMALL_BUFFER]; + struct simple_hcc_data *perconn_data = lws_get_opaque_user_data(wsi); + + switch (reason) { + case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ: + debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ"); + if (perconn_data->data_size - 1 - perconn_data->written < len) + return 1; + memcpy(&perconn_data->data[perconn_data->written], in, len); + perconn_data->written += len; + return 0; + case LWS_CALLBACK_RECEIVE_CLIENT_HTTP: + debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP"); + if(!perconn_data) { + error("Missing Per Connect Data"); + return -1; + } + n = sizeof(perconn_data->lws_work_buffer) - LWS_PRE; + ptr = perconn_data->lws_work_buffer + LWS_PRE; + if (lws_http_client_read(wsi, &ptr, &n) < 0) + return -1; + perconn_data->data[perconn_data->written] = '\0'; + return 0; + case LWS_CALLBACK_WSI_DESTROY: + debug(D_ACLK, "LWS_CALLBACK_WSI_DESTROY"); + if(perconn_data) + perconn_data->done = 1; + return 0; + case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: + debug(D_ACLK, "LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP"); + if(perconn_data) + perconn_data->response_code = lws_http_client_http_response(wsi); + return 0; + case LWS_CALLBACK_CLOSED_CLIENT_HTTP: + debug(D_ACLK, "LWS_CALLBACK_CLOSED_CLIENT_HTTP"); + return 0; + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + debug(D_ACLK, "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS"); + return 0; + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER"); + if(perconn_data && perconn_data->payload) { + unsigned char **p = (unsigned char **)in, *end = (*p) + len; + snprintfz(buffer, SMALL_BUFFER, "%zu", strlen(perconn_data->payload)); + if (lws_add_http_header_by_token(wsi, + WSI_TOKEN_HTTP_CONTENT_LENGTH, + (unsigned char *)buffer, strlen(buffer), p, end)) + return -1; + if (lws_add_http_header_by_token(wsi, + WSI_TOKEN_HTTP_CONTENT_TYPE, + (unsigned char *)ACLK_CONTENT_TYPE_JSON, + strlen(ACLK_CONTENT_TYPE_JSON), p, end)) + return -1; + lws_client_http_body_pending(wsi, 1); + lws_callback_on_writable(wsi); + } + return 0; + case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_WRITEABLE"); + if(perconn_data && perconn_data->payload) { + n = strlen(perconn_data->payload); + if(perconn_data->data_size < (size_t)LWS_PRE + n + 1) { + error("Buffer given is not big enough"); + return 1; + } + + memcpy(&perconn_data->data[LWS_PRE], perconn_data->payload, n); + if(n != lws_write(wsi, (unsigned char*)&perconn_data->data[LWS_PRE], n, LWS_WRITE_HTTP)) { + error("lws_write error"); + perconn_data->data[0] = 0; + return 1; + } + lws_client_http_body_pending(wsi, 0); + // clean for subsequent reply read + perconn_data->data[0] = 0; + } + return 0; + case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL"); + return 0; + case LWS_CALLBACK_WSI_CREATE: + debug(D_ACLK, "LWS_CALLBACK_WSI_CREATE"); + return 0; + case LWS_CALLBACK_PROTOCOL_INIT: + debug(D_ACLK, "LWS_CALLBACK_PROTOCOL_INIT"); + return 0; + case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL"); + return 0; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + debug(D_ACLK, "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"); + return 0; + case LWS_CALLBACK_GET_THREAD_ID: + debug(D_ACLK, "LWS_CALLBACK_GET_THREAD_ID"); + return 0; + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + debug(D_ACLK, "LWS_CALLBACK_EVENT_WAIT_CANCELLED"); + return 0; + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + debug(D_ACLK, "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION"); + return 0; + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH"); + return 0; + default: + debug(D_ACLK, "Unknown callback %d", (int)reason); + return 0; + } +} + +static const struct lws_protocols protocols[] = { + { + "http", + simple_https_client_callback, + 0, + 0, + 0, + 0, + 0 + }, + { NULL, NULL, 0, 0, 0, 0, 0 } +}; + +static void simple_hcc_log_divert(int level, const char *line) +{ + UNUSED(level); + error("Libwebsockets: %s", line); +} + +int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload) +{ + info("%s %s", __func__, method); + + struct lws_context_creation_info info; + struct lws_client_connect_info i; + struct lws_context *context; + + struct simple_hcc_data *data = callocz(1, sizeof(struct simple_hcc_data)); + data->data = b; + data->data[0] = 0; + data->data_size = b_size; + data->payload = payload; + + int n = 0; + time_t timestamp; + + struct lws_vhost *vhost; + + memset(&info, 0, sizeof info); + + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + + + context = lws_create_context(&info); + if (!context) { + error("Error creating LWS context"); + freez(data); + return 1; + } + + lws_set_log_level(LLL_ERR | LLL_WARN, simple_hcc_log_divert); + + lws_service(context, 0); + + memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */ + i.context = context; + +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE; + info("Disabling SSL certificate checks"); +#else + i.ssl_connection = LCCSCF_USE_SSL; +#endif +#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0 +#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM. + i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; +#endif + + i.port = port; + i.address = host; + i.path = url; + + i.host = i.address; + i.origin = i.address; + i.method = method; + i.opaque_user_data = data; + i.alpn = "http/1.1"; + + i.protocol = protocols[0].name; + + vhost = lws_get_vhost_by_name(context, "default"); + if(!vhost) + fatal("Could not find the default LWS vhost."); + + //set up proxy + aclk_wss_set_proxy(vhost); + + lws_client_connect_via_info(&i); + + // libwebsockets handle connection timeouts already + // this adds additional safety in case of bug in LWS + timestamp = now_monotonic_sec(); + while( n >= 0 && !data->done && !netdata_exit) { + n = lws_service(context, 0); + if( now_monotonic_sec() - timestamp > SEND_HTTPS_REQUEST_TIMEOUT ) { + data->data[0] = 0; + data->done = 1; + error("Servicing LWS took too long."); + } + } + + lws_context_destroy(context); + + n = data->response_code; + + freez(data); + return (n < 200 || n >= 300); +} diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h new file mode 100644 index 000000000..811809dd1 --- /dev/null +++ b/aclk/legacy/aclk_lws_https_client.h @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_LWS_HTTPS_CLIENT_H +#define NETDATA_LWS_HTTPS_CLIENT_H + +#include "../../daemon/common.h" +#include "libnetdata/libnetdata.h" + +#define DATAMAXLEN 1024*16 + +#ifdef ACLK_LWS_HTTPS_CLIENT_INTERNAL +#define ACLK_CONTENT_TYPE_JSON "application/json" +#define SEND_HTTPS_REQUEST_TIMEOUT 30 +#endif + +int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload); + +#endif /* NETDATA_LWS_HTTPS_CLIENT_H */ diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c new file mode 100644 index 000000000..2e6fd4ec8 --- /dev/null +++ b/aclk/legacy/aclk_lws_wss_client.c @@ -0,0 +1,613 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_lws_wss_client.h" + +#include "libnetdata/libnetdata.h" +#include "../../daemon/common.h" +#include "aclk_common.h" +#include "aclk_stats.h" + +extern int aclk_shutting_down; + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + +struct aclk_lws_wss_perconnect_data { + int todo; +}; + +static struct aclk_lws_wss_engine_instance *engine_instance = NULL; + +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len) +{ + if (write_len != NULL && write_len_bytes != NULL) + { + *write_len = 0; + *write_len_bytes = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w,wb; + for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + { + w++; + wb += write_b->data_size - write_b->written; + } + *write_len = w; + *write_len_bytes = wb; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + else if (write_len != NULL) + { + *write_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w; + for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + w++; + *write_len = w; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + if (read_len != NULL) + { + *read_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + } + } +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size) +{ + struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); + if (data) { + new->data = mallocz(LWS_PRE + size); + memcpy(new->data + LWS_PRE, data, size); + new->data_size = size; + new->written = 0; + } + return new; +} + +static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item) +{ + struct lws_wss_packet_buffer *tail = *list; + if (!*list) { + *list = item; + return; + } + while (tail->next) { + tail = tail->next; + } + tail->next = item; +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *ret = *list; + if (ret != NULL) + *list = ret->next; + + return ret; +} + +static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item) +{ + freez(item->data); + freez(item); +} + +static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer) +{ + size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL); + if (elems > 0) + lws_ring_consume(ringbuffer, NULL, NULL, elems); +} + +static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *i; + while ((i = lws_wss_packet_buffer_pop(list)) != NULL) { + lws_wss_packet_buffer_free(i); + } + *list = NULL; +} + +static inline void aclk_lws_wss_clear_io_buffers() +{ + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer); + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head); + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); +} + +static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback, + sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 }, + { NULL, NULL, 0, 0, 0, 0, 0 } }; + +static void aclk_lws_wss_log_divert(int level, const char *line) +{ + switch (level) { + case LLL_ERR: + error("Libwebsockets Error: %s", line); + break; + case LLL_WARN: + debug(D_ACLK, "Libwebsockets Warn: %s", line); + break; + default: + error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line); + } +} + +static int aclk_lws_wss_client_init( char *target_hostname, int target_port) +{ + static int lws_logging_initialized = 0; + + if (unlikely(!lws_logging_initialized)) { + lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); + lws_logging_initialized = 1; + } + + if (!target_hostname) + return 1; + + engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance)); + + engine_instance->host = target_hostname; + engine_instance->port = target_port; + + + aclk_lws_mutex_init(&engine_instance->write_buf_mutex); + aclk_lws_mutex_init(&engine_instance->read_buf_mutex); + + engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL); + if (!engine_instance->read_ringbuffer) + goto failure_cleanup; + + return 0; + +failure_cleanup: + freez(engine_instance); + return 1; +} + +void aclk_lws_wss_destroy_context() +{ + if (!engine_instance) + return; + if (!engine_instance->lws_context) + return; + lws_context_destroy(engine_instance->lws_context); + engine_instance->lws_context = NULL; +} + + +void aclk_lws_wss_client_destroy() +{ + if (engine_instance == NULL) + return; + + aclk_lws_wss_destroy_context(); + engine_instance->lws_wsi = NULL; + + aclk_lws_wss_clear_io_buffers(); + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + pthread_mutex_destroy(&engine_instance->write_buf_mutex); + pthread_mutex_destroy(&engine_instance->read_buf_mutex); +#endif +} + +#ifdef LWS_WITH_SOCKS5 +static int aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks) +{ + char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + if (!proxy) + return -1; + + proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + if (!*proxy) + return -1; + + return lws_set_socks(vhost, proxy); +} +#endif + +void aclk_wss_set_proxy(struct lws_vhost *vhost) +{ + const char *proxy; + ACLK_PROXY_TYPE proxy_type; + char *log; + + proxy = aclk_get_proxy(&proxy_type); + +#ifdef LWS_WITH_SOCKS5 + lws_set_socks(vhost, ":"); +#endif + lws_set_proxy(vhost, ":"); + + if (proxy_type == PROXY_TYPE_UNKNOWN) { + error("Unknown proxy type"); + return; + } + + if (proxy_type == PROXY_TYPE_SOCKS5 || proxy_type == PROXY_TYPE_HTTP) { + log = strdupz(proxy); + safe_log_proxy_censor(log); + info("Connecting using %s proxy:\"%s\"", aclk_proxy_type_to_s(&proxy_type), log); + freez(log); + } + if (proxy_type == PROXY_TYPE_SOCKS5) { +#ifdef LWS_WITH_SOCKS5 + if (aclk_wss_set_socks(vhost, proxy)) + error("LWS failed to accept socks proxy."); + return; +#else + fatal("We have no SOCKS5 support but we made it here. Programming error!"); +#endif + } + if (proxy_type == PROXY_TYPE_HTTP) { + if (lws_set_proxy(vhost, proxy)) + error("LWS failed to accept http proxy."); + return; + } + if (proxy_type != PROXY_DISABLED) + error("Unknown proxy type"); +} + +// Return code indicates if connection attempt has started async. +int aclk_lws_wss_connect(char *host, int port) +{ + struct lws_client_connect_info i; + struct lws_vhost *vhost; + int n; + + if (!engine_instance) { + if (aclk_lws_wss_client_init(host, port)) + return 1; // Propagate failure + } + + if (!engine_instance->lws_context) + { + // First time through (on this connection), create the context + struct lws_context_creation_info info; + memset(&info, 0, sizeof(struct lws_context_creation_info)); + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + engine_instance->lws_context = lws_create_context(&info); + if (!engine_instance->lws_context) + { + error("Failed to create lws_context, ACLK will not function"); + return 1; + } + return 0; + // PROTOCOL_INIT callback will call again. + } + + for (n = 0; n < ACLK_LWS_CALLBACK_HISTORY; n++) + engine_instance->lws_callback_history[n] = 0; + + if (engine_instance->lws_wsi) { + error("Already Connected. Only one connection supported at a time."); + return 0; + } + + memset(&i, 0, sizeof(i)); + i.context = engine_instance->lws_context; + i.port = engine_instance->port; + i.address = engine_instance->host; + i.path = "/mqtt"; + i.host = engine_instance->host; + i.protocol = "mqtt"; + + // from LWS docu: + // If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is + // created; you're expected to create your own vhosts afterwards using + // lws_create_vhost(). Otherwise a vhost named "default" is also created + // using the information in the vhost-related members, for compatibility. + vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default"); + if(!vhost) + fatal("Could not find the default LWS vhost."); + + aclk_wss_set_proxy(vhost); + +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE; + info("Disabling SSL certificate checks"); +#else + i.ssl_connection = LCCSCF_USE_SSL; +#endif +#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0 +#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM. + i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; +#endif + lws_client_connect_via_info(&i); + return 0; +} + +static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len) +{ + if (lws_ring_insert(buffer, data, len) != len) { + error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding."); + return 0; + } + return 1; +} + +static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) +{ + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + return "LWS_CALLBACK_CLIENT_WRITEABLE"; + case LWS_CALLBACK_CLIENT_RECEIVE: + return "LWS_CALLBACK_CLIENT_RECEIVE"; + case LWS_CALLBACK_PROTOCOL_INIT: + return "LWS_CALLBACK_PROTOCOL_INIT"; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"; + case LWS_CALLBACK_USER: + return "LWS_CALLBACK_USER"; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR"; + case LWS_CALLBACK_CLIENT_CLOSED: + return "LWS_CALLBACK_CLIENT_CLOSED"; + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE"; + case LWS_CALLBACK_WSI_DESTROY: + return "LWS_CALLBACK_WSI_DESTROY"; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + return "LWS_CALLBACK_CLIENT_ESTABLISHED"; + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION"; + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + return "LWS_CALLBACK_EVENT_WAIT_CANCELLED"; + default: + // Not using an internal buffer here for thread-safety with unknown calling context. + error("Unknown LWS callback %u", reason); + return "unknown"; + } +} + +void aclk_lws_wss_fail_report() +{ + int i; + int anything_to_send = 0; + BUFFER *buf; + + if (netdata_anonymous_statistics_enabled <= 0) + return; + + // guess - most of the callback will be 1-99 + ',' + \0 + buf = buffer_create((ACLK_LWS_CALLBACK_HISTORY * 2) + 10); + + for (i = 0; i < ACLK_LWS_CALLBACK_HISTORY; i++) + if (engine_instance->lws_callback_history[i]) { + buffer_sprintf(buf, "%s%d", (i ? "," : ""), engine_instance->lws_callback_history[i]); + anything_to_send = 1; + } + + if (anything_to_send) + send_statistics("ACLK_CONN_FAIL", "FAIL", buffer_tostring(buf)); + + buffer_free(buf); +} + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + UNUSED(user); + struct lws_wss_packet_buffer *data; + int retval = 0; + static int lws_shutting_down = 0; + int i; + + for (i = ACLK_LWS_CALLBACK_HISTORY - 1; i > 0; i--) + engine_instance->lws_callback_history[i] = engine_instance->lws_callback_history[i - 1]; + engine_instance->lws_callback_history[0] = (int)reason; + + if (unlikely(aclk_shutting_down && !lws_shutting_down)) { + lws_shutting_down = 1; + retval = -1; + engine_instance->upstream_reconnect_request = 0; + } + + // Callback servicing is forced when we are closed from above. + if (engine_instance->upstream_reconnect_request) { + error("Closing lws connectino due to libmosquitto error."); + char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; + lws_close_reason( + wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error, + strlen(upstream_connection_error)); + retval = -1; + engine_instance->upstream_reconnect_request = 0; + } + + // Don't log to info - volume is proportional to message flow on ACLK. + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + data = engine_instance->write_buffer_head; + if (likely(data)) { + size_t bytes_left = data->data_size - data->written; + if ( bytes_left > FRAGMENT_SIZE) + bytes_left = FRAGMENT_SIZE; + int n = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY); + if (n>=0) { + data->written += n; + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.write_q_consumed += n; + ACLK_STATS_UNLOCK; + } + } + //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc); + if (data->written == data->data_size) + { + lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head); + lws_wss_packet_buffer_free(data); + } + if (engine_instance->write_buffer_head) + lws_callback_on_writable(engine_instance->lws_wsi); + } + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + return retval; + + case LWS_CALLBACK_CLIENT_RECEIVE: + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len)) + retval = 1; + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.read_q_added += len; + ACLK_STATS_UNLOCK; + } + + // to future myself -> do not call this while read lock is active as it will eventually + // want to acquire same lock later in aclk_lws_wss_client_read() function + aclk_lws_connection_data_received(); + return retval; + + case LWS_CALLBACK_WSI_CREATE: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_GET_THREAD_ID: // ? + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + // Expected and safe to ignore. + debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason)); + return retval; + + default: + // Pass to next switch, this case removes compiler warnings. + break; + } + // Log to info - volume is proportional to connection attempts. + info("Processing callback %s", aclk_lws_callback_name(reason)); + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection + break; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi) + error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi); + engine_instance->lws_wsi = wsi; + break; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + error( + "Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host, + engine_instance->port, (in ? (char *)in : "not given")); + // Fall-through + case LWS_CALLBACK_CLIENT_CLOSED: + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback + aclk_lws_connection_closed(); + return -1; // the callback response is ignored, hope the above remains true + case LWS_CALLBACK_WSI_DESTROY: + aclk_lws_wss_clear_io_buffers(); + if (!engine_instance->websocket_connection_up) + aclk_lws_wss_fail_report(); + engine_instance->lws_wsi = NULL; + engine_instance->websocket_connection_up = 0; + aclk_lws_connection_closed(); + break; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + engine_instance->websocket_connection_up = 1; + aclk_lws_connection_established(engine_instance->host, engine_instance->port); + break; + + default: + error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason)); + break; + } + return retval; //0-OK, other connection should be closed! +} + +int aclk_lws_wss_client_write(void *buf, size_t count) +{ + if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count)); + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.write_q_added += count; + ACLK_STATS_UNLOCK; + } + + lws_callback_on_writable(engine_instance->lws_wsi); + return count; + } + return 0; +} + +int aclk_lws_wss_client_read(void *buf, size_t count) +{ + size_t data_to_be_read = count; + + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); + if (unlikely(readable_byte_count == 0)) { + errno = EAGAIN; + data_to_be_read = -1; + goto abort; + } + + if (readable_byte_count < data_to_be_read) + data_to_be_read = readable_byte_count; + + data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read); + if (data_to_be_read == readable_byte_count) + engine_instance->data_to_read = 0; + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.read_q_consumed += data_to_be_read; + ACLK_STATS_UNLOCK; + } + +abort: + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + return data_to_be_read; +} + +void aclk_lws_wss_service_loop() +{ + if (engine_instance) + { + /*if (engine_instance->lws_wsi) { + lws_cancel_service(engine_instance->lws_context); + lws_callback_on_writable(engine_instance->lws_wsi); + }*/ + lws_service(engine_instance->lws_context, 0); + } +} + +// in case the MQTT connection disconnect while lws transport is still operational +// we should drop connection and reconnect +// this function should be called when that happens to notify lws of that situation +void aclk_lws_wss_mqtt_layer_disconect_notif() +{ + if (!engine_instance) + return; + if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) { + engine_instance->upstream_reconnect_request = 1; + lws_callback_on_writable( + engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written. + } +} diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h new file mode 100644 index 000000000..584a3cf4f --- /dev/null +++ b/aclk/legacy/aclk_lws_wss_client.h @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_LWS_WSS_CLIENT_H +#define ACLK_LWS_WSS_CLIENT_H + +#include <libwebsockets.h> + +#include "libnetdata/libnetdata.h" + +// This is as define because ideally the ACLK at high level +// can do mosqitto writes and reads only from one thread +// which is cleaner implementation IMHO +// in such case this mutexes are not necessarry and life +// is simpler +#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1 + +#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES (128 * 1024) + +#define ACLK_LWS_CALLBACK_HISTORY 10 + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED +#define aclk_lws_mutex_init(x) netdata_mutex_init(x) +#define aclk_lws_mutex_lock(x) netdata_mutex_lock(x) +#define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x) +#else +#define aclk_lws_mutex_init(x) +#define aclk_lws_mutex_lock(x) +#define aclk_lws_mutex_unlock(x) +#endif + +struct aclk_lws_wss_engine_callbacks { + void (*connection_established_callback)(); + void (*data_rcvd_callback)(); + void (*data_writable_callback)(); + void (*connection_closed)(); +}; + +struct lws_wss_packet_buffer { + unsigned char *data; + size_t data_size, written; + struct lws_wss_packet_buffer *next; +}; + +struct aclk_lws_wss_engine_instance { + //target host/port for connection + char *host; + int port; + + //internal data + struct lws_context *lws_context; + struct lws *lws_wsi; + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + netdata_mutex_t write_buf_mutex; + netdata_mutex_t read_buf_mutex; +#endif + + struct lws_wss_packet_buffer *write_buffer_head; + struct lws_ring *read_ringbuffer; + + //flags to be readed by engine user + int websocket_connection_up; + + // currently this is by default disabled + + int data_to_read; + int upstream_reconnect_request; + + int lws_callback_history[ACLK_LWS_CALLBACK_HISTORY]; +}; + +void aclk_lws_wss_client_destroy(); +void aclk_lws_wss_destroy_context(); + +int aclk_lws_wss_connect(char *host, int port); + +int aclk_lws_wss_client_write(void *buf, size_t count); +int aclk_lws_wss_client_read(void *buf, size_t count); +void aclk_lws_wss_service_loop(); + +void aclk_lws_wss_mqtt_layer_disconect_notif(); + +// Notifications inside the layer above +void aclk_lws_connection_established(); +void aclk_lws_connection_data_received(); +void aclk_lws_connection_closed(); +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); + +void aclk_wss_set_proxy(struct lws_vhost *vhost); + +#define FRAGMENT_SIZE 4096 +#endif diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c new file mode 100644 index 000000000..7ab534f16 --- /dev/null +++ b/aclk/legacy/aclk_query.c @@ -0,0 +1,789 @@ +#include "aclk_common.h" +#include "aclk_query.h" +#include "aclk_stats.h" +#include "aclk_rx_msgs.h" + +#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) + +volatile int aclk_connected = 0; + +#ifndef __GNUC__ +#pragma region ACLK_QUEUE +#endif + +static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex) +#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex) + +struct aclk_query { + usec_t created; + usec_t created_boot_time; + time_t run_after; // Delay run until after this time + ACLK_CMD cmd; // What command is this + char *topic; // Topic to respond to + char *data; // Internal data (NULL if request from the cloud) + char *msg_id; // msg_id generated by the cloud (NULL if internal) + char *query; // The actual query + u_char deleted; // Mark deleted for garbage collect + struct aclk_query *next; +}; + +struct aclk_query_queue { + struct aclk_query *aclk_query_head; + struct aclk_query *aclk_query_tail; + unsigned int count; +} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; + + +unsigned int aclk_query_size() +{ + int r; + ACLK_QUEUE_LOCK; + r = aclk_queue.count; + ACLK_QUEUE_UNLOCK; + return r; +} + +/* + * Free a query structure when done + */ +static void aclk_query_free(struct aclk_query *this_query) +{ + if (unlikely(!this_query)) + return; + + freez(this_query->topic); + if (likely(this_query->query)) + freez(this_query->query); + if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) { + struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data; + freez(del->data); + freez(del); + } + if (likely(this_query->msg_id)) + freez(this_query->msg_id); + freez(this_query); +} + +/* + * Get the next query to process - NULL if nothing there + * The caller needs to free memory by calling aclk_query_free() + * + * topic + * query + * The structure itself + * + */ +static struct aclk_query *aclk_queue_pop() +{ + struct aclk_query *this_query; + + ACLK_QUEUE_LOCK; + + if (likely(!aclk_queue.aclk_query_head)) { + ACLK_QUEUE_UNLOCK; + return NULL; + } + + this_query = aclk_queue.aclk_query_head; + + // Get rid of the deleted entries + while (this_query && this_query->deleted) { + aclk_queue.count--; + + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + aclk_query_free(this_query); + + this_query = aclk_queue.aclk_query_head; + } + + if (likely(!this_query)) { + ACLK_QUEUE_UNLOCK; + return NULL; + } + + if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { + info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); + ACLK_QUEUE_UNLOCK; + return NULL; + } + + aclk_queue.count--; + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + ACLK_QUEUE_UNLOCK; + return this_query; +} + +// Returns the entry after which we need to create a new entry to run at the specified time +// If NULL is returned we need to add to HEAD +// Need to have a QUERY lock before calling this + +static struct aclk_query *aclk_query_find_position(time_t time_to_run) +{ + struct aclk_query *tmp_query, *last_query; + + // Quick check if we will add to the end + if (likely(aclk_queue.aclk_query_tail)) { + if (aclk_queue.aclk_query_tail->run_after <= time_to_run) + return aclk_queue.aclk_query_tail; + } + + last_query = NULL; + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (tmp_query->run_after > time_to_run) + return last_query; + last_query = tmp_query; + tmp_query = tmp_query->next; + } + return last_query; +} + +// Need to have a QUERY lock before calling this +static struct aclk_query * +aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) +{ + struct aclk_query *tmp_query, *prev_query; + UNUSED(cmd); + + tmp_query = aclk_queue.aclk_query_head; + prev_query = NULL; + while (tmp_query) { + if (likely(!tmp_query->deleted)) { + if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { + if ((!data || data == tmp_query->data) && + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { + if (likely(last_query)) + *last_query = prev_query; + return tmp_query; + } + } + } + prev_query = tmp_query; + tmp_query = tmp_query->next; + } + return NULL; +} + +/* + * Add a query to execute, the result will be send to the specified topic + */ + +int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) +{ + struct aclk_query *new_query, *tmp_query; + + // Ignore all commands while we wait for the agent to initialize + if (unlikely(!aclk_connected)) + return 1; + + run_after = now_realtime_sec() + run_after; + + ACLK_QUEUE_LOCK; + struct aclk_query *last_query = NULL; + + tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); + if (unlikely(tmp_query)) { + if (tmp_query->run_after == run_after) { + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + if (last_query) + last_query->next = tmp_query->next; + else + aclk_queue.aclk_query_head = tmp_query->next; + + debug(D_ACLK, "Removing double entry"); + aclk_query_free(tmp_query); + aclk_queue.count--; + } + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_queued++; + ACLK_STATS_UNLOCK; + } + + new_query = callocz(1, sizeof(struct aclk_query)); + new_query->cmd = aclk_cmd; + if (internal) { + new_query->topic = strdupz(topic); + if (likely(query)) + new_query->query = strdupz(query); + } else { + new_query->topic = topic; + new_query->query = query; + new_query->msg_id = msg_id; + } + + new_query->data = data; + new_query->next = NULL; + new_query->created = now_realtime_usec(); + new_query->created_boot_time = now_boottime_usec(); + new_query->run_after = run_after; + + debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); + + tmp_query = aclk_query_find_position(run_after); + + if (tmp_query) { + new_query->next = tmp_query->next; + tmp_query->next = new_query; + if (tmp_query == aclk_queue.aclk_query_tail) + aclk_queue.aclk_query_tail = new_query; + aclk_queue.count++; + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + new_query->next = aclk_queue.aclk_query_head; + aclk_queue.aclk_query_head = new_query; + aclk_queue.count++; + + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region Helper Functions +#endif + +/* + * Take a buffer, encode it and rewrite it + * + */ + +static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines) +{ + char *tmp_buffer = mallocz(content_size * 2); + char *dst = tmp_buffer; + while (content_size > 0) { + switch (*src) { + case '\n': + if (keep_newlines) + { + *dst++ = '\\'; + *dst++ = 'n'; + } + break; + case '\t': + break; + case 0x01 ... 0x08: + case 0x0b ... 0x1F: + *dst++ = '\\'; + *dst++ = 'u'; + *dst++ = '0'; + *dst++ = '0'; + *dst++ = (*src < 0x0F) ? '0' : '1'; + *dst++ = to_hex(*src); + break; + case '\"': + *dst++ = '\\'; + *dst++ = *src; + break; + default: + *dst++ = *src; + } + src++; + content_size--; + } + *dst = '\0'; + + return tmp_buffer; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region ACLK_QUERY +#endif + +static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) +{ + usec_t t = now_boottime_usec(); + aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); + + w->response.code = web_client_api_request_v1(host, w, url); + t = now_boottime_usec() - t; + + aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t); + + return t; +} + +static int aclk_execute_query(struct aclk_query *this_query) +{ + if (strncmp(this_query->query, "/api/v1/", 8) == 0) { + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); + *mysep = '\0'; + } else + strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); + + mysep = strrchr(this_query->query, '/'); + + // TODO: handle bad response perhaps in a different way. For now it does to the payload + aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + now_realtime_timeval(&w->tv_ready); + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); + char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); + + buffer_sprintf( + local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}", + w->response.code, encoded_response, encoded_header); + + buffer_sprintf(local_buffer, "\n}"); + + debug(D_ACLK, "Response:%s", encoded_header); + + aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); + + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + freez(encoded_response); + freez(encoded_header); + return 0; + } + return 1; +} + +static int aclk_execute_query_v2(struct aclk_query *this_query) +{ + int retval = 0; + usec_t t; + BUFFER *local_buffer = NULL; + struct aclk_cloud_req_v2 *cloud_req = (struct aclk_cloud_req_v2 *)this_query->data; + +#ifdef NETDATA_WITH_ZLIB + int z_ret; + BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *start, *end; +#endif + + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); + *mysep = '\0'; + } else + url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + + mysep = strrchr(this_query->query, '/'); + + // execute the query + t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + +#ifdef NETDATA_WITH_ZLIB + // check if gzip encoding can and should be used + if ((start = strstr(cloud_req->data, WEB_HDR_ACCEPT_ENC))) { + start += strlen(WEB_HDR_ACCEPT_ENC); + end = strstr(start, "\x0D\x0A"); + start = strstr(start, "gzip"); + + if (start && start < end) { + w->response.zstream.zalloc = Z_NULL; + w->response.zstream.zfree = Z_NULL; + w->response.zstream.opaque = Z_NULL; + if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { + w->response.zinitialized = 1; + w->response.zoutput = 1; + } else + error("Failed to initialize zlib. Proceeding without compression."); + } + } + + if (w->response.data->len && w->response.zinitialized) { + w->response.zstream.next_in = (Bytef *)w->response.data->buffer; + w->response.zstream.avail_in = w->response.data->len; + do { + w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE; + w->response.zstream.next_out = w->response.zbuffer; + z_ret = deflate(&w->response.zstream, Z_FINISH); + if(z_ret < 0) { + if(w->response.zstream.msg) + error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); + else + error("Unknown error during zlib compression."); + retval = 1; + goto cleanup; + } + int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; + buffer_need_bytes(z_buffer, bytes_to_cpy); + memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy); + z_buffer->len += bytes_to_cpy; + } while(z_ret != Z_STREAM_END); + // so that web_client_build_http_header + // puts correct content lenght into header + buffer_free(w->response.data); + w->response.data = z_buffer; + z_buffer = NULL; + } +#endif + + now_realtime_timeval(&w->tv_ready); + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); + buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); + buffer_strcat(local_buffer, w->response.header_output->buffer); + + if (w->response.data->len) { +#ifdef NETDATA_WITH_ZLIB + if (w->response.zinitialized) { + buffer_need_bytes(local_buffer, w->response.data->len); + memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); + local_buffer->len += w->response.data->len; + } else { +#endif + buffer_strcat(local_buffer, w->response.data->buffer); +#ifdef NETDATA_WITH_ZLIB + } +#endif + } + + aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id); + +cleanup: +#ifdef NETDATA_WITH_ZLIB + if(w->response.zinitialized) + deflateEnd(&w->response.zstream); + buffer_free(z_buffer); +#endif + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + return retval; +} + +#define ACLK_HOST_PTR_COMPULSORY(x) \ + if (unlikely(!host)) { \ + errno = 0; \ + error(x " needs host pointer"); \ + break; \ + } + +/* + * This function will fetch the next pending command and process it + * + */ +static int aclk_process_query(struct aclk_query_thread *t_info) +{ + struct aclk_query *this_query; + static long int query_count = 0; + ACLK_METADATA_STATE meta_state; + RRDHOST *host; + + if (!aclk_connected) + return 0; + + this_query = aclk_queue_pop(); + if (likely(!this_query)) { + return 0; + } + + if (unlikely(this_query->deleted)) { + debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); + aclk_query_free(this_query); + return 1; + } + query_count++; + + host = (RRDHOST*)this_query->data; + + debug( + D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic, + this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS); + + switch (this_query->cmd) { + case ACLK_CMD_ONCONNECT: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { + error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); + break; + } +#else +#warning "This check became unnecessary. Remove" +#endif + + debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"", + host->hostname, + host->machine_guid); + + rrdhost_aclk_state_lock(host); + meta_state = host->aclk_state.metadata; + host->aclk_state.metadata = ACLK_METADATA_SENT; + rrdhost_aclk_state_unlock(host); + aclk_send_metadata(meta_state, host); + break; + + case ACLK_CMD_CHART: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART"); + + debug(D_ACLK, "EXECUTING a chart update command"); + aclk_send_single_chart(host, this_query->query); + break; + + case ACLK_CMD_CHARTDEL: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL"); + + debug(D_ACLK, "EXECUTING a chart delete command"); + //TODO: This send the info metadata for now + aclk_send_info_metadata(ACLK_METADATA_SENT, host); + break; + + case ACLK_CMD_ALARM: + debug(D_ACLK, "EXECUTING an alarm update command"); + aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); + break; + + case ACLK_CMD_CLOUD: + debug(D_ACLK, "EXECUTING a cloud command"); + aclk_execute_query(this_query); + break; + case ACLK_CMD_CLOUD_QUERY_2: + debug(D_ACLK, "EXECUTING Cloud Query v2"); + aclk_execute_query_v2(this_query); + break; + + case ACLK_CMD_CHILD_CONNECT: + case ACLK_CMD_CHILD_DISCONNECT: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT"); + + debug( + D_ACLK, "Execution Child %s command", + this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect"); + aclk_send_info_child_connection(host, this_query->cmd); + break; + + default: + errno = 0; + error("Unknown ACLK Query Command"); + break; + } + debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[t_info->idx]++; + ACLK_STATS_UNLOCK; + } + + aclk_query_free(this_query); + + return 1; +} + +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +{ + if (query_threads && query_threads->thread_list) { + for (int i = 0; i < query_threads->count; i++) { + netdata_thread_join(query_threads->thread_list[i].thread, NULL); + } + freez(query_threads->thread_list); + } + + struct aclk_query *this_query; + + do { + this_query = aclk_queue_pop(); + aclk_query_free(this_query); + } while (this_query); +} + +#define TASK_LEN_MAX 16 +void aclk_query_threads_start(struct aclk_query_threads *query_threads) +{ + info("Starting %d query threads.", query_threads->count); + + char thread_name[TASK_LEN_MAX]; + query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); + for (int i = 0; i < query_threads->count; i++) { + query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics + + if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0)) + error("snprintf encoding error"); + netdata_thread_create( + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i]); + } +} + +/** + * Checks and updates popcorning state of rrdhost + * returns actual/updated popcorning state + */ + +ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + ACLK_POPCORNING_STATE ret = host->aclk_state.state; + if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + if (!host->aclk_state.t_last_popcorn_update){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update; + + if (t_diff >= ACLK_STABLE_TIMEOUT) { + host->aclk_state.state = ACLK_HOST_STABLE; + host->aclk_state.t_last_popcorn_update = 0; + rrdhost_aclk_state_unlock(host); + info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff); + return ACLK_HOST_STABLE; + } + + rrdhost_aclk_state_unlock(host); + return ret; +} + +/** + * Main query processing thread + * + * On startup wait for the agent collectors to initialize + * Expect at least a time of ACLK_STABLE_TIMEOUT seconds + * of no new collectors coming in in order to mark the agent + * as stable (set agent_state = AGENT_STABLE) + */ +void *aclk_query_main_thread(void *ptr) +{ + struct aclk_query_thread *info = ptr; + + while (!netdata_exit) { + if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) { +#ifdef ACLK_DEBUG + _dump_collector_list(); +#endif + break; + } + sleep_usec(USEC_PER_SEC * 1); + } + + while (!netdata_exit) { + if(aclk_disable_runtime) { + sleep(1); + continue; + } + ACLK_SHARED_STATE_LOCK; + if (unlikely(!aclk_shared_state.version_neg)) { + if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { + ACLK_SHARED_STATE_UNLOCK; + info("Waiting for ACLK Version Negotiation message from Cloud"); + sleep(1); + continue; + } + errno = 0; + error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." + " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); + aclk_shared_state.version_neg = ACLK_VERSION_MIN; + aclk_set_rx_handlers(aclk_shared_state.version_neg); + } + ACLK_SHARED_STATE_UNLOCK; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { + if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + rrdhost_aclk_state_unlock(localhost); + errno = 0; + error("ACLK failed to queue on_connect command"); + sleep(1); + continue; + } + localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED; + } + rrdhost_aclk_state_unlock(localhost); + + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { + aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); + } + ACLK_SHARED_STATE_UNLOCK; + + while (aclk_process_query(info)) { + // Process all commands + }; + + QUERY_THREAD_LOCK; + + // TODO: Need to check if there are queries awaiting already + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + QUERY_THREAD_UNLOCK; + } + + return NULL; +} + +#ifndef __GNUC__ +#pragma endregion +#endif diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h new file mode 100644 index 000000000..53eef1392 --- /dev/null +++ b/aclk/legacy/aclk_query.h @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_QUERY_H +#define NETDATA_ACLK_QUERY_H + +#include "libnetdata/libnetdata.h" +#include "web/server/web_client.h" + +#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable + +extern pthread_cond_t query_cond_wait; +extern pthread_mutex_t query_lock_wait; +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) +#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) + +extern volatile int aclk_connected; + +struct aclk_query_thread { + netdata_thread_t thread; + int idx; +}; + +struct aclk_query_threads { + struct aclk_query_thread *thread_list; + int count; +}; + +struct aclk_cloud_req_v2 { + char *data; + RRDHOST *host; +}; + +void *aclk_query_main_thread(void *ptr); +int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); + +void aclk_query_threads_start(struct aclk_query_threads *query_threads); +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); +unsigned int aclk_query_size(); + +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/aclk_rrdhost_state.h b/aclk/legacy/aclk_rrdhost_state.h new file mode 100644 index 000000000..7ab3a502e --- /dev/null +++ b/aclk/legacy/aclk_rrdhost_state.h @@ -0,0 +1,42 @@ +#ifndef ACLK_RRDHOST_STATE_H +#define ACLK_RRDHOST_STATE_H + +#include "../../libnetdata/libnetdata.h" + +typedef enum aclk_cmd { + ACLK_CMD_CLOUD, + ACLK_CMD_ONCONNECT, + ACLK_CMD_INFO, + ACLK_CMD_CHART, + ACLK_CMD_CHARTDEL, + ACLK_CMD_ALARM, + ACLK_CMD_CLOUD_QUERY_2, + ACLK_CMD_CHILD_CONNECT, + ACLK_CMD_CHILD_DISCONNECT +} ACLK_CMD; + +typedef enum aclk_metadata_state { + ACLK_METADATA_REQUIRED, + ACLK_METADATA_CMD_QUEUED, + ACLK_METADATA_SENT +} ACLK_METADATA_STATE; + +typedef enum aclk_agent_state { + ACLK_HOST_INITIALIZING, + ACLK_HOST_STABLE +} ACLK_POPCORNING_STATE; + +typedef struct aclk_rrdhost_state { + char *claimed_id; // Claimed ID if host has one otherwise NULL + +#ifdef ENABLE_ACLK + // per child popcorning + ACLK_POPCORNING_STATE state; + ACLK_METADATA_STATE metadata; + + time_t timestamp_created; + time_t t_last_popcorn_update; +#endif /* ENABLE_ACLK */ +} aclk_rrdhost_state; + +#endif /* ACLK_RRDHOST_STATE_H */ diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c new file mode 100644 index 000000000..99fa9d987 --- /dev/null +++ b/aclk/legacy/aclk_rx_msgs.c @@ -0,0 +1,365 @@ + +#include "aclk_rx_msgs.h" + +#include "aclk_common.h" +#include "aclk_stats.h" +#include "aclk_query.h" + +#ifndef UUID_STR_LEN +#define UUID_STR_LEN 37 +#endif + +static inline int aclk_extract_v2_data(char *payload, char **data) +{ + char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR); + if(!ptr) + return 1; + ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR); + *data = strdupz(ptr); + return 0; +} + +#define ACLK_GET_REQ "GET " +#define ACLK_CHILD_REQ "/host/" +#define ACLK_CLOUD_REQ_V2_PREFIX "/api/v1/" +#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref)) +static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req) +{ + const char *start, *end, *ptr; + char uuid_str[UUID_STR_LEN]; + uuid_t uuid; + + errno = 0; + + if(STRNCMP_CONSTANT_PREFIX(cloud_req->data, ACLK_GET_REQ)) { + error("Only accepting GET HTTP requests from CLOUD"); + return 1; + } + start = ptr = cloud_req->data + strlen(ACLK_GET_REQ); + + if(!STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CHILD_REQ)) { + ptr += strlen(ACLK_CHILD_REQ); + if(strlen(ptr) < UUID_STR_LEN) { + error("the child id in URL too short \"%s\"", start); + return 1; + } + + strncpyz(uuid_str, ptr, UUID_STR_LEN - 1); + + for(int i = 0; i < UUID_STR_LEN && uuid_str[i]; i++) + uuid_str[i] = tolower(uuid_str[i]); + + if(ptr[0] && uuid_parse(uuid_str, uuid)) { + error("Got Child query (/host/XXX/...) host id \"%s\" doesn't look like valid GUID", uuid_str); + return 1; + } + ptr += UUID_STR_LEN - 1; + + cloud_req->host = rrdhost_find_by_guid(uuid_str, 0); + if(!cloud_req->host) { + error("Cannot find host with GUID \"%s\"", uuid_str); + return 1; + } + } + + if(STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CLOUD_REQ_V2_PREFIX)) { + error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); + return 1; + } + + if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) { + errno = 0; + error("Doesn't look like HTTP GET request."); + return 1; + } + + req->payload = mallocz((end - start) + 1); + strncpyz(req->payload, start, end - start); + + return 0; +} + +#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\ + if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\ + debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ + rrdhost_aclk_state_unlock(localhost);\ + return 1;\ + }\ + rrdhost_aclk_state_unlock(localhost); + +/* + * Parse the incoming payload and queue a command if valid + */ +static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + UNUSED(raw_payload); + HTTP_CHECK_AGENT_INITIALIZED(); + + errno = 0; + if (unlikely(cloud_to_agent->version != 1)) { + error( + "Received \"http\" message from Cloud with version %d, but ACLK version %d is used", + cloud_to_agent->version, + aclk_shared_state.version_neg); + return 1; + } + + if (unlikely(!cloud_to_agent->payload)) { + error("payload missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("callback_topic missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("msg_id missing"); + return 1; + } + + if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) + debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); + + return 0; +} + +static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + HTTP_CHECK_AGENT_INITIALIZED(); + + struct aclk_cloud_req_v2 *cloud_req; + char *data; + + errno = 0; + if (cloud_to_agent->version < ACLK_V_COMPRESSION) { + error( + "This handler cannot reply to request with version older than %d, received %d.", + ACLK_V_COMPRESSION, + cloud_to_agent->version); + return 1; + } + + if (unlikely(aclk_extract_v2_data(raw_payload, &data))) { + error("Error extracting payload expected after the JSON dictionary."); + return 1; + } + + cloud_req = mallocz(sizeof(struct aclk_cloud_req_v2)); + cloud_req->data = data; + cloud_req->host = localhost; + + if (unlikely(aclk_v2_payload_get_query(cloud_req, cloud_to_agent))) { + error("Could not extract payload from query"); + goto cleanup; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("Missing callback_topic"); + goto cleanup; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("Missing msg_id"); + goto cleanup; + } + + // aclk_queue_query takes ownership of data pointer + if (unlikely(aclk_queue_query( + cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, + ACLK_CMD_CLOUD_QUERY_2))) { + error("ACLK failed to queue incoming \"http\" v2 message"); + goto cleanup; + } + + return 0; +cleanup: + freez(cloud_req->data); + freez(cloud_req); + return 1; +} + +// This handles `version` message from cloud used to negotiate +// protocol version we will use +static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + UNUSED(raw_payload); + int version = -1; + errno = 0; + + if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { + error( + "Unsuported version of \"version\" message from cloud. Expected %d, Got %d", + ACLK_VERSION_NEG_VERSION, + cloud_to_agent->version); + return 1; + } + if (unlikely(!cloud_to_agent->min_version)) { + error("Min version missing or 0"); + return 1; + } + if (unlikely(!cloud_to_agent->max_version)) { + error("Max version missing or 0"); + return 1; + } + if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { + error( + "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, + cloud_to_agent->min_version); + return 1; + } + + if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { + error( + "Agent too old for this cloud. Minimum version required by cloud %d." + " Maximum version supported by this agent %d.", + cloud_to_agent->min_version, ACLK_VERSION_MAX); + aclk_kill_link = 1; + aclk_disable_runtime = 1; + return 1; + } + if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { + error( + "Cloud version is too old for this agent. Maximum version supported by cloud %d." + " Minimum (oldest) version supported by this agent %d.", + cloud_to_agent->max_version, ACLK_VERSION_MIN); + aclk_kill_link = 1; + return 1; + } + + version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); + + ACLK_SHARED_STATE_LOCK; + if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + errno = 0; + error("The \"version\" message came too late ignoring."); + goto err_cleanup; + } + if (unlikely(aclk_shared_state.version_neg)) { + errno = 0; + error("Version has already been set to %d", aclk_shared_state.version_neg); + goto err_cleanup; + } + aclk_shared_state.version_neg = version; + ACLK_SHARED_STATE_UNLOCK; + + info("Choosing version %d of ACLK", version); + + aclk_set_rx_handlers(version); + + return 0; + +err_cleanup: + ACLK_SHARED_STATE_UNLOCK; + return 1; +} + +typedef struct aclk_incoming_msg_type{ + char *name; + int(*fnc)(struct aclk_request *, char *); +}aclk_incoming_msg_type; + +aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = { + { .name = "http", .fnc = aclk_handle_cloud_request_v1 }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; + +aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { + { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; + +struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1; + +void aclk_set_rx_handlers(int version) +{ + if(version >= ACLK_V_COMPRESSION) { + aclk_incoming_msg_types = aclk_incoming_msg_types_compression; + return; + } + + aclk_incoming_msg_types = aclk_incoming_msg_types_v1; +} + +int aclk_handle_cloud_message(char *payload) +{ + struct aclk_request cloud_to_agent; + memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_recvd++; + ACLK_STATS_UNLOCK; + } + + if (unlikely(!payload)) { + errno = 0; + error("ACLK incoming message is empty"); + goto err_cleanup_nojson; + } + + debug(D_ACLK, "ACLK incoming message (%s)", payload); + + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(rc != JSON_OK)) { + errno = 0; + error("Malformed json request (%s)", payload); + goto err_cleanup; + } + + if (!cloud_to_agent.type_id) { + errno = 0; + error("Cloud message is missing compulsory key \"type\""); + goto err_cleanup; + } + + if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { + error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); + goto err_cleanup; + } + + for (int i = 0; aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { + if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { + // in case of success handler is supposed to clean up after itself + // or as in the case of aclk_handle_cloud_request take + // ownership of the pointers (done to avoid copying) + // see what `aclk_queue_query` parameter `internal` does + + // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! + // msg handlers (namely aclk_handle_version_responce) + // can freely change what aclk_incoming_msg_types points to + // so either exit or restart this for loop + freez(cloud_to_agent.type_id); + return 0; + } + goto err_cleanup; + } + } + + errno = 0; + error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id); + +err_cleanup: + if (cloud_to_agent.payload) + freez(cloud_to_agent.payload); + if (cloud_to_agent.type_id) + freez(cloud_to_agent.type_id); + if (cloud_to_agent.msg_id) + freez(cloud_to_agent.msg_id); + if (cloud_to_agent.callback_topic) + freez(cloud_to_agent.callback_topic); + +err_cleanup_nojson: + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; + } + + return 1; +} diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h new file mode 100644 index 000000000..3095e41a7 --- /dev/null +++ b/aclk/legacy/aclk_rx_msgs.h @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_RX_MSGS_H +#define NETDATA_ACLK_RX_MSGS_H + +#include "../../daemon/common.h" +#include "libnetdata/libnetdata.h" + +int aclk_handle_cloud_message(char *payload); +void aclk_set_rx_handlers(int version); + + +#endif /* NETDATA_ACLK_RX_MSGS_H */ diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c new file mode 100644 index 000000000..2a57cd6f0 --- /dev/null +++ b/aclk/legacy/aclk_stats.c @@ -0,0 +1,298 @@ +#include "aclk_stats.h" + +netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; + +int aclk_stats_enabled; + +int query_thread_count; + +// data ACLK stats need per query thread +struct aclk_qt_data { + RRDDIM *dim; +} *aclk_qt_data = NULL; + +uint32_t *aclk_queries_per_thread = NULL; +uint32_t *aclk_queries_per_thread_sample = NULL; + +struct aclk_metrics aclk_metrics = { + .online = 0, +}; + +struct aclk_metrics_per_sample aclk_metrics_per_sample; + +struct aclk_mat_metrics aclk_mat_metrics = { +#ifdef NETDATA_INTERNAL_CHECKS + .latency = { .name = "aclk_latency_mqtt", + .prio = 200002, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "ms", + .title = "ACLK Message Publish Latency" }, +#endif + + .cloud_q_db_query_time = { .name = "aclk_db_query_time", + .prio = 200006, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "us", + .title = "Time it took to process cloud requested DB queries" }, + + .cloud_q_recvd_to_processed = { .name = "aclk_cloud_q_recvd_to_processed", + .prio = 200007, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "us", + .title = "Time from receiving the Cloud Query until it was picked up " + "by query thread (just before passing to the database)." } +}; + +void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) +{ + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + if (metric->max < measurement) + metric->max = measurement; + + metric->total += measurement; + metric->count++; + ACLK_STATS_UNLOCK; + } +} + +static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent) +{ + static RRDSET *st_aclkstats = NULL; + static RRDDIM *rd_online_status = NULL; + + if (unlikely(!st_aclkstats)) { + st_aclkstats = rrdset_create_localhost( + "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status", + "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st_aclkstats); + + rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online); + + rrdset_done(st_aclkstats); +} + +static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st_query_thread = NULL; + static RRDDIM *rd_queued = NULL; + static RRDDIM *rd_dispatched = NULL; + + if (unlikely(!st_query_thread)) { + st_query_thread = rrdset_create_localhost( + "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s", + "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st_query_thread); + + rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued); + rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched); + + rrdset_done(st_query_thread); +} + +static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_wq_add = NULL; + static RRDDIM *rd_wq_consumed = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "kB/s", + "netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_wq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_wq_add, per_sample->write_q_added); + rrddim_set_by_pointer(st, rd_wq_consumed, per_sample->write_q_consumed); + + rrdset_done(st); +} + +static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_add = NULL; + static RRDDIM *rd_rq_consumed = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "kB/s", + "netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_rq_add, per_sample->read_q_added); + rrddim_set_by_pointer(st, rd_rq_consumed, per_sample->read_q_consumed); + + rrdset_done(st); +} + +static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_rcvd = NULL; + static RRDDIM *rd_rq_err = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s", + "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err); + rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err); + + rrdset_done(st); +} + +#define MAX_DIM_NAME 16 +static void aclk_stats_query_threads(uint32_t *queries_per_thread) +{ + static RRDSET *st = NULL; + + char dim_name[MAX_DIM_NAME]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", + "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < query_thread_count; i++) { + if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) + error("snprintf encoding error"); + aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } else + rrdset_next(st); + + for (int i = 0; i < query_thread_count; i++) { + rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); + } + + rrdset_done(st); +} + +static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct aclk_metric_mat_data *data) +{ + if(unlikely(!metric->st)) { + metric->st = rrdset_create_localhost( + "netdata", metric->name, NULL, "aclk", NULL, metric->title, metric->unit, "netdata", "stats", metric->prio, + localhost->rrd_update_every, RRDSET_TYPE_LINE); + + metric->rd_avg = rrddim_add(metric->st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + metric->rd_max = rrddim_add(metric->st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + metric->rd_total = rrddim_add(metric->st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(metric->st); + + if(data->count) + rrddim_set_by_pointer(metric->st, metric->rd_avg, roundf((float)data->total / data->count)); + else + rrddim_set_by_pointer(metric->st, metric->rd_avg, 0); + rrddim_set_by_pointer(metric->st, metric->rd_max, data->max); + rrddim_set_by_pointer(metric->st, metric->rd_total, data->total); + + rrdset_done(metric->st); +} + +void aclk_stats_thread_cleanup() +{ + freez(aclk_qt_data); + freez(aclk_queries_per_thread); + freez(aclk_queries_per_thread_sample); +} + +void *aclk_stats_main_thread(void *ptr) +{ + struct aclk_stats_thread *args = ptr; + + query_thread_count = args->query_thread_count; + aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); + aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + + heartbeat_t hb; + heartbeat_init(&hb); + usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; + + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + struct aclk_metrics_per_sample per_sample; + struct aclk_metrics permanent; + + while (!netdata_exit) { + netdata_thread_testcancel(); + // ------------------------------------------------------------------------ + // Wait for the next iteration point. + + heartbeat_next(&hb, step_ut); + if (netdata_exit) break; + + ACLK_STATS_LOCK; + // to not hold lock longer than necessary, especially not to hold it + // during database rrd* operations + memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); + memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); + memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); + ACLK_STATS_UNLOCK; + + aclk_stats_collect(&per_sample, &permanent); + aclk_stats_query_queue(&per_sample); + + aclk_stats_write_q(&per_sample); + aclk_stats_read_q(&per_sample); + + aclk_stats_cloud_req(&per_sample); + aclk_stats_query_threads(aclk_queries_per_thread_sample); + +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency); +#endif + aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_db_query_time, &per_sample.cloud_q_db_query_time); + aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_recvd_to_processed, &per_sample.cloud_q_recvd_to_processed); + } + + return 0; +} + +void aclk_stats_upd_online(int online) { + if(!aclk_stats_enabled) + return; + + ACLK_STATS_LOCK; + aclk_metrics.online = online; + + if(!online) + aclk_metrics_per_sample.offline_during_sample = 1; + ACLK_STATS_UNLOCK; +} diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h new file mode 100644 index 000000000..7e74fdf88 --- /dev/null +++ b/aclk/legacy/aclk_stats.h @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_STATS_H +#define NETDATA_ACLK_STATS_H + +#include "../../daemon/common.h" +#include "libnetdata/libnetdata.h" +#include "aclk_common.h" + +#define ACLK_STATS_THREAD_NAME "ACLK_Stats" + +extern netdata_mutex_t aclk_stats_mutex; + +#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex) +#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex) + +extern int aclk_stats_enabled; + +struct aclk_stats_thread { + netdata_thread_t *thread; + int query_thread_count; +}; + +// preserve between samples +struct aclk_metrics { + volatile uint8_t online; +}; + +//mat = max average total +struct aclk_metric_mat_data { + volatile uint32_t total; + volatile uint32_t count; + volatile uint32_t max; +}; + +//mat = max average total +struct aclk_metric_mat { + char *name; + char *title; + RRDSET *st; + RRDDIM *rd_avg; + RRDDIM *rd_max; + RRDDIM *rd_total; + long prio; + char *unit; +}; + +extern struct aclk_mat_metrics { +#ifdef NETDATA_INTERNAL_CHECKS + struct aclk_metric_mat latency; +#endif + struct aclk_metric_mat cloud_q_db_query_time; + struct aclk_metric_mat cloud_q_recvd_to_processed; +} aclk_mat_metrics; + +void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); + +// reset to 0 on every sample +extern struct aclk_metrics_per_sample { + /* in the unlikely event of ACLK disconnecting + and reconnecting under 1 sampling rate + we want to make sure we record the disconnection + despite it being then seemingly longer in graph */ + volatile uint8_t offline_during_sample; + + volatile uint32_t queries_queued; + volatile uint32_t queries_dispatched; + + volatile uint32_t write_q_added; + volatile uint32_t write_q_consumed; + + volatile uint32_t read_q_added; + volatile uint32_t read_q_consumed; + + volatile uint32_t cloud_req_recvd; + volatile uint32_t cloud_req_err; + +#ifdef NETDATA_INTERNAL_CHECKS + struct aclk_metric_mat_data latency; +#endif + struct aclk_metric_mat_data cloud_q_db_query_time; + struct aclk_metric_mat_data cloud_q_recvd_to_processed; +} aclk_metrics_per_sample; + +extern uint32_t *aclk_queries_per_thread; + +void *aclk_stats_main_thread(void *ptr); +void aclk_stats_thread_cleanup(); +void aclk_stats_upd_online(int online); + +#endif /* NETDATA_ACLK_STATS_H */ diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c new file mode 100644 index 000000000..e51a01308 --- /dev/null +++ b/aclk/legacy/agent_cloud_link.c @@ -0,0 +1,1683 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" +#include "agent_cloud_link.h" +#include "aclk_lws_https_client.h" +#include "aclk_query.h" +#include "aclk_common.h" +#include "aclk_stats.h" + +#ifdef ENABLE_ACLK +#include <libwebsockets.h> +#endif + +int aclk_shutting_down = 0; + +// Other global state +static int aclk_subscribed = 0; +static int aclk_disable_single_updates = 0; +static char *aclk_username = NULL; +static char *aclk_password = NULL; + +static char *global_base_topic = NULL; +static int aclk_connecting = 0; +int aclk_force_reconnect = 0; // Indication from lower layers +usec_t aclk_session_us = 0; // Used by the mqtt layer +time_t aclk_session_sec = 0; // Used by the mqtt layer + +static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; + +#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) +#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) + +#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) +#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) + +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); +void aclk_lws_wss_destroy_context(); +/* + * Maintain a list of collectors and chart count + * If all the charts of a collector are deleted + * then a new metadata dataset must be send to the cloud + * + */ +struct _collector { + time_t created; + uint32_t count; //chart count + uint32_t hostname_hash; + uint32_t plugin_hash; + uint32_t module_hash; + char *hostname; + char *plugin_name; + char *module_name; + struct _collector *next; +}; + +struct _collector *collector_list = NULL; + +char *create_uuid() +{ + uuid_t uuid; + char *uuid_str = mallocz(36 + 1); + + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + + return uuid_str; +} + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, "msg-id")) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "type")) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "callback-topic")) { + data->callback_topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "payload")) { + if (likely(e->data.string)) { + size_t len = strlen(e->data.string); + data->payload = mallocz(len+1); + if (!url_decode_r(data->payload, e->data.string, len + 1)) + strcpy(data->payload, e->data.string); + } + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, "version")) { + data->version = e->data.number; + break; + } + if (!strcmp(e->name, "min-version")) { + data->min_version = e->data.number; + break; + } + if (!strcmp(e->name, "max-version")) { + data->max_version = e->data.number; + break; + } + + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + + +static RSA *aclk_private_key = NULL; +static int create_private_key() +{ + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + aclk_private_key = NULL; + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); + + long bytes_read; + char *private_key = read_by_filename(filename, &bytes_read); + if (!private_key) { + error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + return 1; + } + debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + + BIO *key_bio = BIO_new_mem_buf(private_key, -1); + if (key_bio==NULL) { + error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + goto biofailed; + } + + aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); + BIO_free(key_bio); + if (aclk_private_key!=NULL) + { + freez(private_key); + return 0; + } + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + +biofailed: + freez(private_key); + return 1; +} + +/* + * After a connection failure -- delay in milliseconds + * When a connection is established, the delay function + * should be called with + * + * mode 0 to reset the delay + * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * + */ +unsigned long int aclk_reconnect_delay(int mode) +{ + static int fail = -1; + unsigned long int delay; + + if (!mode || fail == -1) { + srandom(time(NULL)); + fail = mode - 1; + return 0; + } + + delay = (1 << fail); + + if (delay >= ACLK_MAX_BACKOFF_DELAY) { + delay = ACLK_MAX_BACKOFF_DELAY * 1000; + } else { + fail++; + delay = (delay * 1000) + (random() % 1000); + } + + return delay; +} + +// This will give the base topic that the agent will publish messages. +// subtopics will be sent under the base topic e.g. base_topic/subtopic +// This is called during the connection, we delete any previous topic +// in-case the user has changed the agent id and reclaimed. + +char *create_publish_base_topic() +{ + char *agent_id = is_agent_claimed(); + if (unlikely(!agent_id)) + return NULL; + + ACLK_LOCK; + + if (global_base_topic) + freez(global_base_topic); + char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; + + snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id); + tmp = strchr(tmp_topic, '\n'); + if (unlikely(tmp)) + *tmp = '\0'; + global_base_topic = strdupz(tmp_topic); + + ACLK_UNLOCK; + freez(agent_id); + return global_base_topic; +} + +/* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + int rc; + + if (likely(sub_topic && sub_topic[0] == '/')) + return sub_topic; + + if (unlikely(!global_base_topic)) + return sub_topic; + + rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + if (unlikely(rc >= max_size)) + debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic); + + return final_topic; +} + +#ifndef __GNUC__ +#pragma region ACLK Internal Collector Tracking +#endif + +/* + * Free a collector structure + */ + +static void _free_collector(struct _collector *collector) +{ + if (likely(collector->plugin_name)) + freez(collector->plugin_name); + + if (likely(collector->module_name)) + freez(collector->module_name); + + if (likely(collector->hostname)) + freez(collector->hostname); + + freez(collector); +} + +/* + * This will report the collector list + * + */ +#ifdef ACLK_DEBUG +static void _dump_collector_list() +{ + struct _collector *tmp_collector; + + COLLECTOR_LOCK; + + info("DUMPING ALL COLLECTORS"); + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + info("DUMPING ALL COLLECTORS -- nothing found"); + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + + while (tmp_collector) { + info( + "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, + tmp_collector->plugin_name ? tmp_collector->plugin_name : "", + tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); + + tmp_collector = tmp_collector->next; + } + info("DUMPING ALL COLLECTORS DONE"); + COLLECTOR_UNLOCK; +} +#endif + +/* + * This will cleanup the collector list + * + */ +static void _reset_collector_list() +{ + struct _collector *tmp_collector, *next_collector; + + COLLECTOR_LOCK; + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + collector_list->count = 0; + collector_list->next = NULL; + + // We broke the link; we can unlock + COLLECTOR_UNLOCK; + + while (tmp_collector) { + next_collector = tmp_collector->next; + _free_collector(tmp_collector); + tmp_collector = next_collector; + } +} + +/* + * Find a collector (if it exists) + * Must lock before calling this + * If last_collector is not null, it will return the previous collector in the linked + * list (used in collector delete) + */ +static struct _collector *_find_collector( + const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) +{ + struct _collector *tmp_collector, *prev_collector; + uint32_t plugin_hash; + uint32_t module_hash; + uint32_t hostname_hash; + + if (unlikely(!collector_list)) { + collector_list = callocz(1, sizeof(struct _collector)); + return NULL; + } + + if (unlikely(!collector_list->next)) + return NULL; + + plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + module_hash = module_name ? simple_hash(module_name) : 1; + hostname_hash = simple_hash(hostname); + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + prev_collector = collector_list; + while (tmp_collector) { + if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && + hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && + (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && + (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { + if (unlikely(last_collector)) + *last_collector = prev_collector; + + return tmp_collector; + } + + prev_collector = tmp_collector; + tmp_collector = tmp_collector->next; + } + + return tmp_collector; +} + +/* + * Called to delete a collector + * It will reduce the count (chart_count) and will remove it + * from the linked list if the count reaches zero + * The structure will be returned to the caller to free + * the resources + * + */ +static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector, *prev_collector = NULL; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); + + if (likely(tmp_collector)) { + --tmp_collector->count; + if (unlikely(!tmp_collector->count)) + prev_collector->next = tmp_collector->next; + } + return tmp_collector; +} + +/* + * Add a new collector (plugin / module) to the list + * If it already exists just update the chart count + * + * Lock before calling + */ +static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); + + if (unlikely(!tmp_collector)) { + tmp_collector = callocz(1, sizeof(struct _collector)); + tmp_collector->hostname_hash = simple_hash(hostname); + tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; + + tmp_collector->hostname = strdupz(hostname); + tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; + tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; + + tmp_collector->next = collector_list->next; + collector_list->next = tmp_collector; + } + tmp_collector->count++; + debug( + D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", + module_name ? module_name : "*", tmp_collector->count); + return tmp_collector; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +/* Avoids the need to scan trough all RRDHOSTS + * every time any Query Thread Wakes Up + * (every time we need to check child popcorn expiry) + * call with ACLK_SHARED_STATE_LOCK held + */ +void aclk_update_next_child_to_popcorn(void) +{ + RRDHOST *host; + int any = 0; + + rrd_rdlock(); + rrdhost_foreach_read(host) { + if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) + continue; + + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); + continue; + } + + any = 1; + + if (unlikely(!aclk_shared_state.next_popcorn_host)) { + aclk_shared_state.next_popcorn_host = host; + rrdhost_aclk_state_unlock(host); + continue; + } + + if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) + aclk_shared_state.next_popcorn_host = host; + + rrdhost_aclk_state_unlock(host); + } + if(!any) + aclk_shared_state.next_popcorn_host = NULL; + + rrd_unlock(); +} + +/* If popcorning bump timer. + * If popcorning or initializing (host not stable) return 1 + * Otherwise return 0 + */ +static int aclk_popcorn_check_bump(RRDHOST *host) +{ + time_t now = now_monotonic_sec(); + int updated = 0, ret; + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + + ret = ACLK_IS_HOST_INITIALIZING(host); + if (unlikely(ACLK_IS_HOST_POPCORNING(host))) { + if(now != host->aclk_state.t_last_popcorn_update) { + updated = 1; + info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + } + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + + if (host != localhost && updated) + aclk_update_next_child_to_popcorn(); + + ACLK_SHARED_STATE_UNLOCK; + return ret; + } + + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return ret; +} + +inline static int aclk_host_initializing(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + int ret = ACLK_IS_HOST_INITIALIZING(host); + rrdhost_aclk_state_unlock(host); + return ret; +} + +static void aclk_start_host_popcorning(RRDHOST *host) +{ + usec_t now = now_monotonic_sec(); + info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { + errno = 0; + error("Localhost is allowed to do popcorning only once after startup!"); + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return; + } + + host->aclk_state.state = ACLK_HOST_INITIALIZING; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + if (host != localhost) + aclk_update_next_child_to_popcorn(); + ACLK_SHARED_STATE_UNLOCK; +} + +static void aclk_stop_host_popcorning(RRDHOST *host) +{ + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return; + } + + info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid); + host->aclk_state.t_last_popcorn_update = 0; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(host); + + if(host == aclk_shared_state.next_popcorn_host) { + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); + } + ACLK_SHARED_STATE_UNLOCK; +} + +/* + * Add a new collector to the list + * If it exists, update the chart count + */ +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(tmp_collector->count != 1)) { + COLLECTOR_UNLOCK; + return; + } + + COLLECTOR_UNLOCK; + + if(aclk_popcorn_check_bump(host)) + return; + + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); +} + +/* + * Delete a collector from the list + * If the chart count reaches zero the collector will be removed + * from the list by calling del_collector. + * + * This function will release the memory used and schedule + * a cloud update + */ +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(!tmp_collector || tmp_collector->count)) { + COLLECTOR_UNLOCK; + return; + } + + debug( + D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", + tmp_collector->count); + + COLLECTOR_UNLOCK; + + _free_collector(tmp_collector); + + if (aclk_popcorn_check_bump(host)) + return; + + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); +} + +static void aclk_graceful_disconnect() +{ + size_t write_q, write_q_bytes, read_q; + time_t event_loop_timeout; + + // Send a graceful disconnect message + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}"); + aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); + buffer_free(b); + + event_loop_timeout = now_realtime_sec() + 5; + write_q = 1; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + + aclk_shutting_down = 1; + _link_shutdown(); + aclk_lws_wss_mqtt_layer_disconect_notif(); + + write_q = 1; + event_loop_timeout = now_realtime_sec() + 5; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + aclk_shutting_down = 0; +} + +#ifndef __GNUC__ +#pragma region Incoming Msg Parsing +#endif + +struct dictionary_singleton { + char *key; + char *result; +}; + +int json_extract_singleton(JSON_ENTRY *e) +{ + struct dictionary_singleton *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, data->key)) { + data->result = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + case JSON_BOOLEAN: + case JSON_NULL: + break; + } + return 0; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + + +#ifndef __GNUC__ +#pragma region Challenge Response +#endif + +// Base-64 decoder. +// Note: This is non-validating, invalid input will be decoded without an error. +// Challenges are packed into json strings so we don't skip newlines. +// Size errors (i.e. invalid input size or insufficient output space) are caught. +size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size) +{ + static char lookup[256]; + static int first_time=1; + if (first_time) + { + first_time = 0; + for(int i=0; i<256; i++) + lookup[i] = -1; + for(int i='A'; i<='Z'; i++) + lookup[i] = i-'A'; + for(int i='a'; i<='z'; i++) + lookup[i] = i-'a' + 26; + for(int i='0'; i<='9'; i++) + lookup[i] = i-'0' + 52; + lookup['+'] = 62; + lookup['/'] = 63; + } + if ((input_size & 3) != 0) + { + error("Can't decode base-64 input length %zu", input_size); + return 0; + } + size_t unpadded_size = (input_size/4) * 3; + if ( unpadded_size > output_size ) + { + error("Output buffer size %zu is too small to decode %zu into", output_size, input_size); + return 0; + } + // Don't check padding within full quantums + for (size_t i = 0 ; i < input_size-4 ; i+=4 ) + { + uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + output += 3; + input += 4; + } + // Handle padding only in last quantum + if (input[2] == '=') { + uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]]; + output[0] = value >> 4; + //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]); + return unpadded_size-2; + } + else if (input[3] == '=') { + uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]]; + output[0] = value >> 10; + output[1] = value >> 2; + //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]); + return unpadded_size-1; + } + else + { + uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + return unpadded_size; + } +} + +size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size) +{ + uint32_t value; + static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + if ((input_size/3+1)*4 >= output_size) + { + error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size); + return 0; + } + size_t count = 0; + while (input_size>3) + { + value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff; + output[0] = lookup[value >> 18]; + output[1] = lookup[(value >> 12) & 0x3f]; + output[2] = lookup[(value >> 6) & 0x3f]; + output[3] = lookup[value & 0x3f]; + //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + output += 4; + input += 3; + input_size -= 3; + count += 4; + } + switch (input_size) + { + case 2: + value = (input[0] << 10) + (input[1] << 2); + output[0] = lookup[(value >> 12) & 0x3f]; + output[1] = lookup[(value >> 6) & 0x3f]; + output[2] = lookup[value & 0x3f]; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 1: + value = input[0] << 4; + output[0] = lookup[(value >> 6) & 0x3f]; + output[1] = lookup[value & 0x3f]; + output[2] = '='; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 0: + break; + } + return count; +} + + + +int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted) +{ + int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING); + if (result == -1) { + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Decryption of the challenge failed: %s", err); + } + return result; +} + +void aclk_get_challenge(char *aclk_hostname, int port) +{ + char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + debug(D_ACLK, "Performing challenge-response sequence"); + if (aclk_password != NULL) + { + freez(aclk_password); + aclk_password = NULL; + } + // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge + // TODO - target host? + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + { + error("Agent was not claimed - cannot perform challenge/response"); + goto CLEANUP; + } + char url[1024]; + sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id); + info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url); + if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL)) + { + error("Challenge failed: %s", data_buffer); + goto CLEANUP; + } + struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; + + debug(D_ACLK, "Challenge response from cloud: %s", data_buffer); + if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK) + { + freez(challenge.result); + error("Could not parse the json response with the challenge: %s", data_buffer); + goto CLEANUP; + } + if (challenge.result == NULL ) { + error("Could not retrieve challenge from auth response: %s", data_buffer); + goto CLEANUP; + } + + + size_t challenge_len = strlen(challenge.result); + unsigned char decoded[512]; + size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); + + unsigned char plaintext[4096]={}; + int decrypted_length = private_decrypt(decoded, decoded_len, plaintext); + freez(challenge.result); + char encoded[512]; + size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); + encoded[encoded_len] = 0; + debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded); + + char response_json[4096]={}; + sprintf(response_json, "{\"response\":\"%s\"}", encoded); + debug(D_ACLK, "Password phase: %s",response_json); + // TODO - host + sprintf(url, "/api/v1/auth/node/%s/password", agent_id); + if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json)) + { + error("Challenge-response failed: %s", data_buffer); + goto CLEANUP; + } + + debug(D_ACLK, "Password response from cloud: %s", data_buffer); + + struct dictionary_singleton password = { .key = "password", .result = NULL }; + if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK) + { + freez(password.result); + error("Could not parse the json response with the password: %s", data_buffer); + goto CLEANUP; + } + + if (password.result == NULL ) { + error("Could not retrieve password from auth response"); + goto CLEANUP; + } + if (aclk_password != NULL ) + freez(aclk_password); + aclk_password = password.result; + if (aclk_username != NULL) + freez(aclk_username); + aclk_username = agent_id; + agent_id = NULL; + +CLEANUP: + if (agent_id != NULL) + freez(agent_id); + freez(data_buffer); + return; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +static void aclk_try_to_connect(char *hostname, int port) +{ + int rc; + +// this is usefull for developers working on ACLK +// allows connecting agent to any MQTT broker +// for debugging, development and testing purposes +#ifndef ACLK_DISABLE_CHALLENGE + if (!aclk_private_key) { + error("Cannot try to establish the agent cloud link - no private key available!"); + return; + } +#endif + + info("Attempting to establish the agent cloud link"); +#ifdef ACLK_DISABLE_CHALLENGE + error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing " + "and development purposes only. Warranty void. Won't be able " + "to connect to Netdata Cloud."); + if (aclk_password == NULL) + aclk_password = strdupz("anon"); +#else + aclk_get_challenge(hostname, port); + if (aclk_password == NULL) + return; +#endif + + aclk_connecting = 1; + create_publish_base_topic(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = 0; + ACLK_SHARED_STATE_UNLOCK; + + rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password); + if (unlikely(rc)) { + error("Failed to initialize the agent cloud link library"); + } +} + +// Sends "hello" message to negotiate ACLK version with cloud +static inline void aclk_hello_msg() +{ + BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + + char *msg_id = create_uuid(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + ACLK_SHARED_STATE_UNLOCK; + + //Hello message is versioned separatelly from the rest of the protocol + aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); + buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX); + aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id); + freez(msg_id); + buffer_free(buf); +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + struct aclk_query_threads query_threads; + struct aclk_stats_thread *stats_thread = NULL; + time_t last_periodic_query_wakeup = 0; + + query_threads.thread_list = NULL; + + // This thread is unusual in that it cannot be cancelled by cancel_main_threads() + // as it must notify the far end that it shutdown gracefully and avoid the LWT. + netdata_thread_disable_cancelability(); + +#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK) + info("Killing ACLK thread -> cloud functionality has been disabled"); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +#endif + +#ifndef LWS_WITH_SOCKS5 + ACLK_PROXY_TYPE proxy_type; + aclk_get_proxy(&proxy_type); + if(proxy_type == PROXY_TYPE_SOCKS5) { + error("Disabling ACLK due to requested SOCKS5 proxy."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } +#endif + + info("Waiting for netdata to be ready"); + while (!netdata_ready) { + sleep_usec(USEC_PER_MS * 300); + } + + info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_setting) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) { + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } + } + + query_threads.count = MIN(processors/2, 6); + query_threads.count = MAX(query_threads.count, 2); + query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); + if(query_threads.count < 1) { + error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count); + query_threads.count = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); + } + + //start localhost popcorning + aclk_start_host_popcorning(localhost); + + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); + if (aclk_stats_enabled) { + stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); + stats_thread->thread = mallocz(sizeof(netdata_thread_t)); + stats_thread->query_thread_count = query_threads.count; + netdata_thread_create( + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread); + } + + char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering. + int port_num = 0; + info("Waiting for netdata to be claimed"); + while(1) { + char *agent_id = is_agent_claimed(); + while (likely(!agent_id)) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + goto exited; + agent_id = is_agent_claimed(); + } + freez(agent_id); + // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. + // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); + goto exited; + } + if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num)) + error("Agent is claimed but the configuration is invalid, please fix"); + else if (!create_private_key() && !_mqtt_lib_init()) + break; + + for (int i=0; i<60; i++) { + if (netdata_exit) + goto exited; + + sleep_usec(USEC_PER_SEC * 1); + } + } + + usec_t reconnect_expiry = 0; // In usecs + + while (!netdata_exit) { + static int first_init = 0; + /* size_t write_q, write_q_bytes, read_q; + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ + + if (aclk_disable_runtime && !aclk_connected) { + sleep(1); + continue; + } + + if (aclk_kill_link) { // User has reloaded the claiming state + aclk_kill_link = 0; + aclk_graceful_disconnect(); + create_private_key(); + continue; + } + + if (aclk_force_reconnect) { + aclk_lws_wss_destroy_context(); + aclk_force_reconnect = 0; + } + if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) { + if (unlikely(!first_init)) { + aclk_try_to_connect(aclk_hostname, port_num); + first_init = 1; + } else { + if (aclk_connecting == 0) { + if (reconnect_expiry == 0) { + unsigned long int delay = aclk_reconnect_delay(1); + reconnect_expiry = now_realtime_usec() + delay * 1000; + info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0); + } + if (now_realtime_usec() >= reconnect_expiry) { + reconnect_expiry = 0; + aclk_try_to_connect(aclk_hostname, port_num); + } + sleep_usec(USEC_PER_MS * 100); + } + } + if (aclk_connecting) { + _link_event_loop(); + sleep_usec(USEC_PER_MS * 100); + } + continue; + } + + _link_event_loop(); + if (unlikely(!aclk_connected || aclk_force_reconnect)) + continue; + /*static int stress_counter = 0; + if (write_q_bytes==0 && stress_counter ++ >5) + { + aclk_send_stress_test(8000000); + stress_counter = 0; + }*/ + + if (unlikely(!aclk_subscribed)) { + aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1); + aclk_hello_msg(); + } + + if (unlikely(!query_threads.thread_list)) { + aclk_query_threads_start(&query_threads); + } + + time_t now = now_monotonic_sec(); + if(aclk_connected && last_periodic_query_wakeup < now) { + // to make `aclk_queue_query()` param `run_after` work + // also makes per child popcorning work + last_periodic_query_wakeup = now; + QUERY_THREAD_WAKEUP; + } + } // forever +exited: + // Wakeup query thread to cleanup + QUERY_THREAD_WAKEUP_ALL; + + freez(aclk_username); + freez(aclk_password); + freez(aclk_hostname); + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + char *agent_id = is_agent_claimed(); + if (agent_id && aclk_connected) { + freez(agent_id); + // Wakeup thread to cleanup + QUERY_THREAD_WAKEUP; + aclk_graceful_disconnect(); + } + + aclk_query_threads_cleanup(&query_threads); + + _reset_collector_list(); + freez(collector_list); + + if(aclk_stats_enabled) { + netdata_thread_join(*stats_thread->thread, NULL); + aclk_stats_thread_cleanup(); + freez(stats_thread->thread); + freez(stats_thread); + } + + /* + * this must be last -> if all static threads signal + * THREAD_EXITED rrdengine will dealloc the RRDSETs + * and RRDDIMs that are used by still runing stat thread. + * see netdata_cleanup_and_exit() for reference + */ + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +} + +/* + * Send a message to the cloud, using a base topic and sib_topic + * The final topic will be in the form <base_topic>/<sub_topic> + * If base_topic is missing then the global_base_topic will be used (if available) + * + */ +int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id) +{ + int rc; + int mid; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + UNUSED(msg_id); + + if (!aclk_connected) + return 0; + + if (unlikely(!message)) + return 0; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + ACLK_LOCK; + rc = _link_send_message(final_topic, message, len, &mid); + // TODO: link the msg_id with the mid so we can trace it + ACLK_UNLOCK; + + if (unlikely(rc)) { + errno = 0; + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + } + + return rc; +} + +int aclk_send_message(char *sub_topic, char *message, char *msg_id) +{ + return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id); +} + +/* + * Subscribe to a topic in the cloud + * The final subscription will be in the form + * /agent/claim_id/<sub_topic> + */ +int aclk_subscribe(char *sub_topic, int qos) +{ + int rc; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + if (!aclk_connected) { + error("Cannot subscribe to %s - not connected!", topic); + return 1; + } + + ACLK_LOCK; + rc = _link_subscribe(final_topic, qos); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) { + errno = 0; + error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc)); + } + + return rc; +} + +// This is called from a callback when the link goes up +void aclk_connect() +{ + info("Connection detected (%u queued queries)", aclk_query_size()); + + aclk_stats_upd_online(1); + + aclk_connected = 1; + aclk_reconnect_delay(0); + + QUERY_THREAD_WAKEUP; + return; +} + +// This is called from a callback when the link goes down +void aclk_disconnect() +{ + if (likely(aclk_connected)) + info("Disconnect detected (%u queued queries)", aclk_query_size()); + + aclk_stats_upd_online(0); + + aclk_subscribed = 0; + rrdhost_aclk_state_lock(localhost); + localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(localhost); + aclk_connected = 0; + aclk_connecting = 0; + aclk_force_reconnect = 1; +} + +inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + if (ts_secs == 0) { + ts_us = now_realtime_usec(); + ts_secs = ts_us / USEC_PER_SEC; + ts_us = ts_us % USEC_PER_SEC; + } + + buffer_sprintf( + dest, + "{\t\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"timestamp\": %ld,\n" + "\t\"timestamp-offset-usec\": %llu,\n" + "\t\"connect\": %ld,\n" + "\t\"connect-offset-usec\": %llu,\n" + "\t\"version\": %d", + type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version); + + debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs); +} + + +/* + * This will send alarm information which includes + * configured alarms + * alarm_log + * active alarms + */ +void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); + +void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + debug(D_ACLK, "Metadata alarms start"); + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + + if (metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + else + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + + buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); + health_alarms2json(localhost, local_buffer, 1); + debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len); + // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : "); + // health_alarm_log2json(localhost, local_buffer, 0); + // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len); + buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : "); + health_active_log_alarms_2json(localhost, local_buffer); + //debug(D_ACLK, "Metadata message %s", local_buffer->buffer); + + + + buffer_sprintf(local_buffer, "\n}\n}"); + aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); +} + +/* + * This will send the agent metadata + * /api/v1/info + * charts + */ +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + debug(D_ACLK, "Metadata /info start"); + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg); + else + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + buffer_sprintf(local_buffer, "{\n\t \"info\" : "); + web_client_api_request_v1_info_fill_buffer(host, local_buffer); + debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); + + buffer_sprintf(local_buffer, ", \n\t \"charts\" : "); + charts2json(host, local_buffer, 1, 0); + buffer_sprintf(local_buffer, "\n}\n}"); + debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len); + + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg); + + debug(D_ACLK, "Sending Child Disconnect"); + + char *msg_id = create_uuid(); + + aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg); + + buffer_strcat(local_buffer, ",\"payload\":"); + + buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid); + rrdhost_aclk_state_lock(host); + if(host->aclk_state.claimed_id) + buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id); + else + buffer_strcat(local_buffer, "null}}"); + + rrdhost_aclk_state_unlock(host); + + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +{ +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + return; +#else +#warning "This check became unnecessary. Remove" +#endif + + if (unlikely(aclk_host_initializing(localhost))) + return; + + switch (cmd) { + case ACLK_CMD_CHILD_CONNECT: + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + break; + case ACLK_CMD_CHILD_DISCONNECT: + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); + break; + default: + error("Unknown command for aclk_host_state_update %d.", (int)cmd); + } +} + +void aclk_send_stress_test(size_t size) +{ + char *buffer = mallocz(size); + if (buffer != NULL) + { + for(size_t i=0; i<size; i++) + buffer[i] = 'x'; + buffer[size-1] = 0; + time_t time_created = now_realtime_sec(); + sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created); + buffer[strlen(buffer)] = '"'; + buffer[size-2] = '}'; + buffer[size-3] = '"'; + aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL); + error("Sending stress of size %zu at time %ld", size, time_created); + } + free(buffer); +} + +// Send info metadata message to the cloud if the link is established +// or on request +int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host) +{ + aclk_send_info_metadata(state, host); + + if(host == localhost) + aclk_send_alarm_metadata(state); + + return 0; +} + +void aclk_single_update_disable() +{ + aclk_disable_single_updates = 1; +} + +void aclk_single_update_enable() +{ + aclk_disable_single_updates = 0; +} + +// Trigged by a health reload, sends the alarm metadata +void aclk_alarm_reload() +{ + if (unlikely(aclk_host_initializing(localhost))) + return; + + if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue on_connect command on alarm reload"); + } + } +} +//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf) + +int aclk_send_single_chart(RRDHOST *host, char *chart) +{ + RRDSET *st = rrdset_find(host, chart); + if (!st) + st = rrdset_find_byname(host, chart); + if (!st) { + info("FAILED to find chart %s", chart); + return 1; + } + + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + rrdset2json(st, local_buffer, NULL, NULL, 1); + buffer_sprintf(local_buffer, "\t\n}"); + + aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) +{ +#ifndef ENABLE_ACLK + UNUSED(host); + UNUSED(chart_name); + return 0; +#else + if (unlikely(!netdata_ready)) + return 0; + + if (!netdata_cloud_setting) + return 0; + + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) + return 0; + + if (aclk_host_initializing(localhost)) + return 0; + + if (unlikely(aclk_disable_single_updates)) + return 0; + + if (aclk_popcorn_check_bump(host)) + return 0; + + if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue chart_update command"); + } + } + + return 0; +#endif +} + +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +{ + BUFFER *local_buffer = NULL; + + if (unlikely(!netdata_ready)) + return 0; + + if (host != localhost) + return 0; + + if(unlikely(aclk_host_initializing(localhost))) + return 0; + + /* + * Check if individual updates have been disabled + * This will be the case when we do health reload + * and all the alarms will be dropped and recreated. + * At the end of the health reload the complete alarm metadata + * info will be sent + */ + if (unlikely(aclk_disable_single_updates)) + return 0; + + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *msg_id = create_uuid(); + + buffer_flush(local_buffer); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); + health_alarm_entry2json_nolock(local_buffer, ae, host); + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); + + buffer_sprintf(local_buffer, "\n}"); + + if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue alarm_command on alarm_update"); + } + } + + freez(msg_id); + buffer_free(local_buffer); + + return 0; +} diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h new file mode 100644 index 000000000..e777e0b19 --- /dev/null +++ b/aclk/legacy/agent_cloud_link.h @@ -0,0 +1,93 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_AGENT_CLOUD_LINK_H +#define NETDATA_AGENT_CLOUD_LINK_H + +#include "../../daemon/common.h" +#include "mqtt.h" +#include "aclk_common.h" + +#define ACLK_THREAD_NAME "ACLK_Query" +#define ACLK_CHART_TOPIC "outbound/meta" +#define ACLK_ALARMS_TOPIC "outbound/alarms" +#define ACLK_METADATA_TOPIC "outbound/meta" +#define ACLK_COMMAND_TOPIC "inbound/cmd" +#define ACLK_TOPIC_STRUCTURE "/agent/%s" + +#define ACLK_MAX_BACKOFF_DELAY 1024 // maximum backoff delay in seconds + +#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg) +#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds +#define ACLK_QOS 1 +#define ACLK_PING_INTERVAL 60 +#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop + +#define ACLK_MAX_TOPIC 255 + +#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now +#define ACLK_DEFAULT_PORT 9002 +#define ACLK_DEFAULT_HOST "localhost" + +#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" + +struct aclk_request { + char *type_id; + char *msg_id; + char *callback_topic; + char *payload; + int version; + int min_version; + int max_version; +}; + +typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION; + +void *aclk_main(void *ptr); + +#define NETDATA_ACLK_HOOK \ + { .name = "ACLK_Main", \ + .config_section = NULL, \ + .config_name = NULL, \ + .enabled = 1, \ + .thread = NULL, \ + .init_routine = NULL, \ + .start_routine = aclk_main }, + +extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); +extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id); + +extern char *is_agent_claimed(void); +extern void aclk_lws_wss_mqtt_layer_disconect_notif(); +char *create_uuid(); + +// callbacks for agent cloud link +int aclk_subscribe(char *topic, int qos); +int cloud_to_agent_parse(JSON_ENTRY *e); +void aclk_disconnect(); +void aclk_connect(); + +int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host); +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); +void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); + +int aclk_wait_for_initialization(); +char *create_publish_base_topic(); + +int aclk_send_single_chart(RRDHOST *host, char *chart); +int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); +void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version); +int aclk_handle_cloud_message(char *payload); +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_alarm_reload(); +unsigned long int aclk_reconnect_delay(int mode); +extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); +void aclk_single_update_enable(); +void aclk_single_update_disable(); + +void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd); +int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd); +void aclk_update_next_child_to_popcorn(void); + +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c new file mode 100644 index 000000000..6f38a20dc --- /dev/null +++ b/aclk/legacy/mqtt.c @@ -0,0 +1,366 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include <libnetdata/json/json.h> +#include "../../daemon/common.h" +#include "mqtt.h" +#include "aclk_lws_wss_client.h" +#include "aclk_stats.h" +#include "aclk_rx_msgs.h" + +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; + +inline const char *_link_strerror(int rc) +{ + return mosquitto_strerror(rc); +} + +#ifdef NETDATA_INTERNAL_CHECKS +static struct timeval sendTimes[1024]; +#endif + +static struct mosquitto *mosq = NULL; + + +void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) +{ + UNUSED(mosq); + UNUSED(obj); + + aclk_handle_cloud_message(msg->payload); +} + +void publish_callback(struct mosquitto *mosq, void *obj, int rc) +{ + UNUSED(mosq); + UNUSED(obj); + UNUSED(rc); +#ifdef NETDATA_INTERNAL_CHECKS + struct timeval now, *orig; + now_realtime_timeval(&now); + orig = &sendTimes[ rc & 0x3ff ]; + int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec); + diff /= 1000; + + info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff); + + aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff); +#endif + return; +} + +void connect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + UNUSED(mosq); + UNUSED(obj); + UNUSED(rc); + + info("Connection to cloud estabilished"); + aclk_connect(); + + return; +} + +void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + UNUSED(mosq); + UNUSED(obj); + UNUSED(rc); + + if (netdata_exit) + info("Connection to cloud terminated due to agent shutdown"); + else { + errno = 0; + error("Connection to cloud failed"); + } + aclk_disconnect(); + + aclk_lws_wss_mqtt_layer_disconect_notif(); + + return; +} + +void _show_mqtt_info() +{ + int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + + info( + "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, + libmosq_revision); +} + +size_t _mqtt_external_write_hook(void *buf, size_t count) +{ + return aclk_lws_wss_client_write(buf, count); +} + +size_t _mqtt_external_read_hook(void *buf, size_t count) +{ + return aclk_lws_wss_client_read(buf, count); +} + +int _mqtt_lib_init() +{ + int rc; + //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version. + char *ca_crt; + char *server_crt; + char *server_key; + + // show library info so can have it in the logfile + //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*"); + server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*"); + server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*"); + + if (ca_crt[0] == '*') { + freez(ca_crt); + ca_crt = NULL; + } + + if (server_crt[0] == '*') { + freez(server_crt); + server_crt = NULL; + } + + if (server_key[0] == '*') { + freez(server_key); + server_key = NULL; + } + */ + + // info( + // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, + // libmosq_revision); + + rc = mosquitto_lib_init(); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("Failed to initialize MQTT (libmosquitto library)"); + return 1; + } + return 0; +} + +static int _mqtt_create_connection(char *username, char *password) +{ + if (mosq != NULL) + mosquitto_destroy(mosq); + mosq = mosquitto_new(username, true, NULL); + if (unlikely(!mosq)) { + mosquitto_lib_cleanup(); + error("MQTT new structure -- %s", mosquitto_strerror(errno)); + return MOSQ_ERR_UNKNOWN; + } + + // Record the session start time to allow a nominal LWT timestamp + usec_t now = now_realtime_usec(); + aclk_session_sec = now / USEC_PER_SEC; + aclk_session_us = now % USEC_PER_SEC; + + _link_set_lwt("outbound/meta", 2); + + mosquitto_connect_callback_set(mosq, connect_callback); + mosquitto_disconnect_callback_set(mosq, disconnect_callback); + mosquitto_publish_callback_set(mosq, publish_callback); + + info("Using challenge-response: %s / %s", username, password); + mosquitto_username_pw_set(mosq, username, password); + + int rc = mosquitto_threaded_set(mosq, 1); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc)); + +#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000 + rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc)); + + rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1); + info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); +#endif + + return rc; +} + +static int _link_mqtt_connect(char *aclk_hostname, int aclk_port) +{ + int rc; + + rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error( + "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc, + mosquitto_strerror(rc)); + else + info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port); + + return rc; +} + +static inline void _link_mosquitto_write() +{ + int rc; + + if (unlikely(!mosq)) { + return; + } + + rc = mosquitto_loop_misc(mosq); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc)); + + if (likely(mosquitto_want_write(mosq))) { + rc = mosquitto_loop_write(mosq, 1); + if (rc != MOSQ_ERR_SUCCESS) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc)); + } +} + +void aclk_lws_connection_established(char *hostname, int port) +{ + _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected. + _link_mosquitto_write(); +} + +void aclk_lws_connection_data_received() +{ + int rc = mosquitto_loop_read(mosq, 1); + if (rc != MOSQ_ERR_SUCCESS) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc)); +} + +void aclk_lws_connection_closed() +{ + aclk_disconnect(); + +} + + +int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password) +{ + if(aclk_lws_wss_connect(aclk_hostname, aclk_port)) + return MOSQ_ERR_UNKNOWN; + aclk_lws_wss_service_loop(); + + int rc = _mqtt_create_connection(username, password); + if (rc!= MOSQ_ERR_SUCCESS) + return rc; + + mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook); + return rc; +} + +inline int _link_event_loop() +{ + + // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1). + _link_mosquitto_write(); + aclk_lws_wss_service_loop(); + + // this is because if use LWS we don't want + // mqtt to reconnect by itself + return MOSQ_ERR_SUCCESS; +} + +void _link_shutdown() +{ + int rc; + + if (likely(!mosq)) + return; + + rc = mosquitto_disconnect(mosq); + switch (rc) { + case MOSQ_ERR_SUCCESS: + info("MQTT disconnected from broker"); + break; + default: + info("MQTT invalid structure"); + break; + }; +} + + +int _link_set_lwt(char *sub_topic, int qos) +{ + int rc; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION); + buffer_strcat(b, ", \"payload\": \"unexpected\" }"); + rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); + buffer_free(b); + + return rc; +} + +int _link_subscribe(char *topic, int qos) +{ + int rc; + + if (unlikely(!mosq)) + return 1; + + mosquitto_message_callback_set(mosq, mqtt_message_callback); + + rc = mosquitto_subscribe(mosq, NULL, topic, qos); + if (unlikely(rc)) { + errno = 0; + error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc)); + return 1; + } + + _link_mosquitto_write(); + return 0; +} + +/* + * Send a message to the cloud to specific topic + * + */ + +int _link_send_message(char *topic, const void *message, size_t len, int *mid) +{ + int rc; + size_t write_q, write_q_bytes, read_q; + + rc = mosquitto_pub_topic_check(topic); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + return rc; + + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + rc = mosquitto_publish(mosq, mid, topic, len, message, ACLK_QOS, 0); + +#ifdef NETDATA_INTERNAL_CHECKS + char msg_head[64]; + memset(msg_head, 0, sizeof(msg_head)); + strncpy(msg_head, (char*)message, 60); + for (size_t i = 0; i < sizeof(msg_head); i++) + if(msg_head[i] == '\n') msg_head[i] = ' '; + info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", (int)len, + *mid, write_q, write_q_bytes, read_q, msg_head); + now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]); +#endif + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + errno = 0; + error("MQTT message failed : %s", mosquitto_strerror(rc)); + } + _link_mosquitto_write(); + return rc; +} diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h new file mode 100644 index 000000000..cc4765d62 --- /dev/null +++ b/aclk/legacy/mqtt.h @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_MQTT_H +#define NETDATA_MQTT_H + +#ifdef ENABLE_ACLK +#include "externaldeps/mosquitto/mosquitto.h" +#endif + +void _show_mqtt_info(); +int _link_event_loop(); +void _link_shutdown(); +int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password); +//int _link_lib_init(); +int _mqtt_lib_init(); +int _link_subscribe(char *topic, int qos); +int _link_send_message(char *topic, const void *message, size_t len, int *mid); +const char *_link_strerror(int rc); +int _link_set_lwt(char *topic, int qos); + + +int aclk_handle_cloud_message(char *); +extern char *get_topic(char *sub_topic, char *final_topic, int max_size); + +#endif //NETDATA_MQTT_H diff --git a/aclk/legacy/tests/fake-charts.d.plugin b/aclk/legacy/tests/fake-charts.d.plugin new file mode 100644 index 000000000..a13c6bab8 --- /dev/null +++ b/aclk/legacy/tests/fake-charts.d.plugin @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +sleep 45 # Wait until popcorning finishes + +echo "CHART aclk_test.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1" +sleep 5 +echo "DIMENSION aclk1 '' percentage-of-absolute 1 1" +sleep 5 +echo "BEGIN aclk_test.newcol 1000000" +echo "SET aclk1 = 3" +echo "END" +sleep 5 +echo "DIMENSION aclk2 '' percentage-of-absolute 1 1" +sleep 5 +echo "BEGIN aclk_test.newcol 1000000" +echo "SET aclk1 = 3" +echo "SET aclk2 = 3" +echo "END" +sleep 5 +echo "CHART aclk_test2.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1" +echo "DIMENSION aclk1 '' percentage-of-absolute 1 1" + +sleep 5 +exit 0 # Signal that we are done diff --git a/aclk/legacy/tests/install-fake-charts.d.sh.in b/aclk/legacy/tests/install-fake-charts.d.sh.in new file mode 100644 index 000000000..ac002a2bd --- /dev/null +++ b/aclk/legacy/tests/install-fake-charts.d.sh.in @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +TARGET="@pluginsdir_POST@" +BASE="$(cd "$(dirname "$0")" && pwd)" + +cp "$BASE/fake-charts.d.plugin" "$TARGET/charts.d.plugin" diff --git a/aclk/legacy/tests/launch-paho.sh b/aclk/legacy/tests/launch-paho.sh new file mode 100755 index 000000000..1c2cb5f2c --- /dev/null +++ b/aclk/legacy/tests/launch-paho.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client +docker run -it paho-client diff --git a/aclk/legacy/tests/paho-inspection.py b/aclk/legacy/tests/paho-inspection.py new file mode 100644 index 000000000..20ab523d4 --- /dev/null +++ b/aclk/legacy/tests/paho-inspection.py @@ -0,0 +1,59 @@ +import ssl +import paho.mqtt.client as mqtt +import json +import time +import sys + +def on_connect(mqttc, obj, flags, rc): + if rc==0: + print("Successful connection", flush=True) + else : + print(f"Connection error rc={rc}", flush=True) + mqttc.subscribe("/agent/#",0) + +def on_disconnect(mqttc, obj, flags, rc): + print("disconnected rc: "+str(rc), flush=True) + +def on_message(mqttc, obj, msg): + print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True) + try: + print(f"Trying decode of {msg.payload[:60]}",flush=True) + api_msg = json.loads(msg.payload) + except Exception as e: + print(e,flush=True) + return + ts = api_msg["timestamp"] + mtype = api_msg["type"] + print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True) + now = time.time() + print(f"Current {now} -> Delay {now-ts}", flush=True) + if mtype=="disconnect": + print(f"Message dump: {api_msg}", flush=True) + +def on_publish(mqttc, obj, mid): + print("mid: "+str(mid), flush=True) + +def on_subscribe(mqttc, obj, mid, granted_qos): + print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True) + +def on_log(mqttc, obj, level, string): + print(string) + +print(f"Starting paho-inspection on {sys.argv[1]}", flush=True) +mqttc = mqtt.Client(transport='websockets',client_id="paho") +#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +mqttc.tls_insecure_set(True) +mqttc.on_message = on_message +mqttc.on_connect = on_connect +mqttc.on_disconnect = on_disconnect +mqttc.on_publish = on_publish +mqttc.on_subscribe = on_subscribe +mqttc.username_pw_set("paho","paho") +mqttc.connect(sys.argv[1], 8443, 60) + +#mqttc.publish("/agent/mine","Test1") +#mqttc.subscribe("$SYS/#", 0) +print("Connected succesfully, monitoring /agent/#", flush=True) +mqttc.loop_forever() diff --git a/aclk/legacy/tests/paho.Dockerfile b/aclk/legacy/tests/paho.Dockerfile new file mode 100644 index 000000000..d67cc4cb0 --- /dev/null +++ b/aclk/legacy/tests/paho.Dockerfile @@ -0,0 +1,14 @@ +FROM archlinux/base:latest + +RUN pacman -Syyu --noconfirm +RUN pacman --noconfirm --needed -S python-pip + +RUN pip install paho-mqtt + +RUN mkdir -p /opt/paho +COPY paho-inspection.py /opt/paho/ + +WORKDIR /opt/paho +ARG HOST_HOSTNAME +RUN echo $HOST_HOSTNAME >host +CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"] |