From 2e85f9325a797977eea9dfea0a925775ddd211d9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Feb 2021 12:49:00 +0100 Subject: Merging upstream version 1.29.0. Signed-off-by: Daniel Baumann --- aclk/README.md | 151 +++ aclk/legacy/Makefile.am | 19 + aclk/legacy/aclk_common.c | 236 ++++ aclk/legacy/aclk_common.h | 70 + aclk/legacy/aclk_lws_https_client.c | 246 ++++ aclk/legacy/aclk_lws_https_client.h | 18 + aclk/legacy/aclk_lws_wss_client.c | 613 +++++++++ aclk/legacy/aclk_lws_wss_client.h | 92 ++ aclk/legacy/aclk_query.c | 789 ++++++++++++ aclk/legacy/aclk_query.h | 40 + aclk/legacy/aclk_rrdhost_state.h | 42 + aclk/legacy/aclk_rx_msgs.c | 365 ++++++ aclk/legacy/aclk_rx_msgs.h | 13 + aclk/legacy/aclk_stats.c | 298 +++++ aclk/legacy/aclk_stats.h | 91 ++ aclk/legacy/agent_cloud_link.c | 1683 +++++++++++++++++++++++++ aclk/legacy/agent_cloud_link.h | 93 ++ aclk/legacy/mqtt.c | 366 ++++++ aclk/legacy/mqtt.h | 25 + aclk/legacy/tests/fake-charts.d.plugin | 24 + aclk/legacy/tests/install-fake-charts.d.sh.in | 6 + aclk/legacy/tests/launch-paho.sh | 4 + aclk/legacy/tests/paho-inspection.py | 59 + aclk/legacy/tests/paho.Dockerfile | 14 + 24 files changed, 5357 insertions(+) create mode 100644 aclk/README.md create mode 100644 aclk/legacy/Makefile.am create mode 100644 aclk/legacy/aclk_common.c create mode 100644 aclk/legacy/aclk_common.h create mode 100644 aclk/legacy/aclk_lws_https_client.c create mode 100644 aclk/legacy/aclk_lws_https_client.h create mode 100644 aclk/legacy/aclk_lws_wss_client.c create mode 100644 aclk/legacy/aclk_lws_wss_client.h create mode 100644 aclk/legacy/aclk_query.c create mode 100644 aclk/legacy/aclk_query.h create mode 100644 aclk/legacy/aclk_rrdhost_state.h create mode 100644 aclk/legacy/aclk_rx_msgs.c create mode 100644 aclk/legacy/aclk_rx_msgs.h create mode 100644 aclk/legacy/aclk_stats.c create mode 100644 aclk/legacy/aclk_stats.h create mode 100644 aclk/legacy/agent_cloud_link.c create mode 100644 aclk/legacy/agent_cloud_link.h create mode 100644 aclk/legacy/mqtt.c create mode 100644 aclk/legacy/mqtt.h create mode 100644 aclk/legacy/tests/fake-charts.d.plugin create mode 100644 aclk/legacy/tests/install-fake-charts.d.sh.in create mode 100755 aclk/legacy/tests/launch-paho.sh create mode 100644 aclk/legacy/tests/paho-inspection.py create mode 100644 aclk/legacy/tests/paho.Dockerfile (limited to 'aclk') diff --git a/aclk/README.md b/aclk/README.md new file mode 100644 index 000000000..e279f0182 --- /dev/null +++ b/aclk/README.md @@ -0,0 +1,151 @@ + + +# Agent-cloud link (ACLK) + +The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting a Netdata Agent to your web browser +through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port +`443`. The ACLK is encrypted, safe, and _is only established if you claim your node_. + +The Cloud App lives at app.netdata.cloud which currently resolves to 35.196.244.138. However, this IP or range of +IPs can change without notice. Watch this page for updates. + +For a guide to claiming a node using the ACLK, plus additional troubleshooting and reference information, read our [get +started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [claiming +documentation](/claim/README.md). + +## Data privacy + +Privacy is very important to us. We firmly believe that your data belongs to you. This is why **we don't store any metric data in Netdata Cloud**. + +All the data that the user sees in the web browser when using Netdata Cloud, are actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard. They pass through our systems, but they are not stored. + +We do however store a limited number of *metadata* to be able to offer the stunning visualizations and advanced functionality of Netdata Cloud. + +### Metadata + +The information we store in Netdata Cloud is the following (using the publicly available demo server `frankfurt.my-netdata.io` as an example): +- The email address you used to sign up/or sign in +- For each node claimed to your Spaces in Netdata Cloud: + - Hostname (as it appears in Netdata Cloud) + - Information shown in `/api/v1/info`. For example: [https://frankfurt.my-netdata.io/api/v1/info](https://frankfurt.my-netdata.io/api/v1/info). + - The chart metadata shown in `/api/v1/charts`. For example: [https://frankfurt.my-netdata.io/api/v1/info](https://frankfurt.my-netdata.io/api/v1/info). + - Alarm configurations shown in `/api/v1/alarms?all`. For example: [https://frankfurt.my-netdata.io/api/v1/alarms?all](https://frankfurt.my-netdata.io/api/v1/alarms?all). + - Active alarms shown in `/api/v1/alarms`. For example: [https://frankfurt.my-netdata.io/api/v1/alarms](https://frankfurt.my-netdata.io/api/v1/alarms). + +How we use them: +- The data are stored in our production database on Google Cloud and some of it is also used in BigQuery, our data lake, for analytics purposes. These analytics are crucial for our product development process. +- Email is used to identify users in regards to product use and to enrich our tools with product use, such as our CRM. +- This data is only be available to Netdata and never to a 3rd party. + +## Enable and configure the ACLK + +The ACLK is enabled by default, with its settings automatically configured and stored in the Agent's memory. No file is +created at `/var/lib/netdata/cloud.d/cloud.conf` until you either claim a node or create it yourself. The default +configuration uses two settings: + +```conf +[global] + enabled = yes + cloud base url = https://app.netdata.cloud +``` + +If your Agent needs to use a proxy to access the internet, you must [set up a proxy for +claiming](/claim/README.md#claim-through-a-proxy). + +You can configure following keys in the `netdata.conf` section `[cloud]`: +``` +[cloud] + statistics = yes + query thread count = 2 +``` + +- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. +- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). + +## Disable the ACLK + +You have two options if you prefer to disable the ACLK and not use Netdata Cloud. + +### Disable at installation + +You can pass the `--disable-cloud` parameter to the Agent installation when using a kickstart script +([kickstart.sh](/packaging/installer/methods/kickstart.md) or +[kickstart-static64.sh](/packaging/installer/methods/kickstart-64.md)), or a [manual installation from +Git](/packaging/installer/methods/manual.md). + +When you pass this parameter, the installer does not download or compile any extra libraries. Once running, the Agent +kills the thread responsible for the ACLK and claiming behavior, and behaves as though the ACLK, and thus Netdata Cloud, +does not exist. + +### Disable at runtime + +You can change a runtime setting in your `cloud.conf` file to disable the ACLK. This setting only stops the Agent from +attempting any connection via the ACLK, but does not prevent the installer from downloading and compiling the ACLK's +dependencies. + +The file typically exists at `/var/lib/netdata/cloud.d/cloud.conf`, but can change if you set a prefix during +installation. To disable the ACLK, open that file and change the `enabled` setting to `no`: + +```conf +[global] + enabled = no +``` + +If the file at `/var/lib/netdata/cloud.d/cloud.conf` doesn't exist, you need to create it. + +Copy and paste the first two lines from below, which will change your prompt to `cat`. + +```bash +cd /var/lib/netdata/cloud.d +cat > cloud.conf << EOF +``` + +Copy and paste in lines 3-6, and after the final `EOF`, hit **Enter**. The final line must contain only `EOF`. Hit **Enter** again to return to your normal prompt with the newly-created file. + +To get your normal prompt back, the final line +must contain only `EOF`. + +```bash +[global] + enabled = no + cloud base url = https://app.netdata.cloud +EOF +``` + +You also need to change the file's permissions. Use `grep "run as user" /etc/netdata/netdata.conf` to figure out which +user your Agent runs as (typically `netdata`), and replace `netdata:netdata` as shown below if necessary: + +```bash +sudo chmod 0770 cloud.conf +sudo chown netdata:netdata cloud.conf +``` + +Restart your Agent to disable the ACLK. + +### Re-enable the ACLK + +If you first disable the ACLK and any Cloud functionality and then decide you would like to use Cloud, you must either +[reinstall Netdata](/packaging/installer/REINSTALL.md) with Cloud enabled or change the runtime setting in your +`cloud.conf` file. + +If you passed `--disable-cloud` to `netdata-installer.sh` during installation, you must +[reinstall](/packaging/installer/REINSTALL.md) your Agent. Use the same method as before, but pass `--require-cloud` to +the installer. When installation finishes you can [claim your node](/claim/README.md#how-to-claim-a-node). + +If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` file, edit the file again and change +`enabled` to `yes`: + +```conf +[global] + enabled = yes +``` + +Restart your Agent and [claim your node](/claim/README.md#how-to-claim-a-node). + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Faclk%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) + diff --git a/aclk/legacy/Makefile.am b/aclk/legacy/Makefile.am new file mode 100644 index 000000000..1cd876b40 --- /dev/null +++ b/aclk/legacy/Makefile.am @@ -0,0 +1,19 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +CLEANFILES = \ + tests/install-fake-charts.d.sh \ + $(NULL) + +include $(top_srcdir)/build/subst.inc +SUFFIXES = .in + +#sbin_SCRIPTS = \ +# tests/install-fake-charts.d.sh \ +# $(NULL) + +dist_noinst_SCRIPTS = tests/install-fake-charts.d.sh +dist_noinst_DATA = tests/install-fake-charts.d.sh.in + diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c new file mode 100644 index 000000000..7c8421a93 --- /dev/null +++ b/aclk/legacy/aclk_common.c @@ -0,0 +1,236 @@ +#include "aclk_common.h" + +#include "../../daemon/common.h" + +#ifdef ENABLE_ACLK +#include +#endif + +netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; + +int aclk_disable_runtime = 0; +int aclk_kill_link = 0; + +struct aclk_shared_state aclk_shared_state = { + .version_neg = 0, + .version_neg_wait_till = 0 +}; + +struct { + ACLK_PROXY_TYPE type; + const char *url_str; +} supported_proxy_types[] = { + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, +}; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) +{ + switch (*type) { + case PROXY_DISABLED: + return "disabled"; + case PROXY_TYPE_HTTP: + return "HTTP"; + case PROXY_TYPE_SOCKS5: + return "SOCKS"; + default: + return "Unknown"; + } +} + +static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) +{ + int i = 0; + while (supported_proxy_types[i].url_str) { + if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) + return supported_proxy_types[i].type; + i++; + } + return PROXY_TYPE_UNKNOWN; +} + +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) +{ + if (!string) + return PROXY_TYPE_UNKNOWN; + + while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove) + string++; + + if (!*string) + return PROXY_TYPE_UNKNOWN; + + return aclk_find_proxy(string); +} + +// helper function to censor user&password +// for logging purposes +void safe_log_proxy_censor(char *proxy) +{ + size_t length = strlen(proxy); + char *auth = proxy + length - 1; + char *cur; + + while ((auth >= proxy) && (*auth != '@')) + auth--; + + //if not found or @ is first char do nothing + if (auth <= proxy) + return; + + cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + if (!cur) + cur = proxy; + else + cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + while (cur < auth) { + *cur = 'X'; + cur++; + } +} + +static inline void safe_log_proxy_error(char *str, const char *proxy) +{ + char *log = strdupz(proxy); + safe_log_proxy_censor(log); + error("%s Provided Value:\"%s\"", str, log); + freez(log); +} + +static inline int check_socks_enviroment(const char **proxy) +{ + char *tmp = getenv("socks_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +static inline int check_http_enviroment(const char **proxy) +{ + char *tmp = getenv("http_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) +{ + const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); + *type = PROXY_DISABLED; + + if (strcmp(proxy, "none") == 0) + return proxy; + + if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { + if (check_socks_enviroment(&proxy) == 0) { +#ifdef LWS_WITH_SOCKS5 + *type = PROXY_TYPE_SOCKS5; + return proxy; +#else + safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " + "but Libwebsockets used doesn't have SOCKS5 support built in. " + "Ignoring and checking for other options.", + proxy); +#endif + } + if (check_http_enviroment(&proxy) == 0) + *type = PROXY_TYPE_HTTP; + return proxy; + } + + *type = aclk_verify_proxy(proxy); +#ifndef LWS_WITH_SOCKS5 + if (*type == PROXY_TYPE_SOCKS5) { + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", + proxy); + } +#endif + if (*type == PROXY_TYPE_UNKNOWN) { + *type = PROXY_DISABLED; + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + proxy); + } + + return proxy; +} + +// helper function to read settings only once (static) +// as claiming, challenge/response and ACLK +// read the same thing, no need to parse again +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) +{ + static const char *proxy = NULL; + static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; + + if (proxy_type == PROXY_NOT_SET) + proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); + + *type = proxy_type; + return proxy; +} + +int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) +{ + int pos = 0; + if (!strncmp("https://", url, 8)) { + pos = 8; + } else if (!strncmp("http://", url, 7)) { + error("Cannot connect ACLK over %s -> unencrypted link is not supported", url); + return 1; + } + int host_end = pos; + while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':') + host_end++; + if (url[host_end] == 0) { + *aclk_hostname = strdupz(url + pos); + *aclk_port = 443; + info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; + } + if (url[host_end] == ':') { + *aclk_hostname = callocz(host_end - pos + 1, 1); + strncpy(*aclk_hostname, url + pos, host_end - pos); + int port_end = host_end + 1; + while (url[port_end] >= '0' && url[port_end] <= '9') + port_end++; + if (port_end - host_end > 6) { + error("Port specified in %s is invalid", url); + return 0; + } + *aclk_port = atoi(&url[host_end+1]); + } + if (url[host_end] == '/') { + *aclk_port = 443; + *aclk_hostname = callocz(1, host_end - pos + 1); + strncpy(*aclk_hostname, url+pos, host_end - pos); + } + info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; +} diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h new file mode 100644 index 000000000..2dc0aa553 --- /dev/null +++ b/aclk/legacy/aclk_common.h @@ -0,0 +1,70 @@ +#ifndef ACLK_COMMON_H +#define ACLK_COMMON_H + +#include "aclk_rrdhost_state.h" +#include "../../daemon/common.h" + +extern netdata_mutex_t aclk_shared_state_mutex; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +// minimum and maximum supported version of ACLK +// in this version of agent +#define ACLK_VERSION_MIN 2 +#define ACLK_VERSION_MAX 3 + +// Version negotiation messages have they own versioning +// this is also used for LWT message as we set that up +// before version negotiation +#define ACLK_VERSION_NEG_VERSION 1 + +// Maximum time to wait for version negotiation before aborting +// and defaulting to oldest supported version +#define VERSION_NEG_TIMEOUT 3 + +#if ACLK_VERSION_MIN > ACLK_VERSION_MAX +#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" +#endif + +// Define ACLK Feature Version Boundaries Here +#define ACLK_V_COMPRESSION 2 +#define ACLK_V_CHILDRENSTATE 3 + +#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING) +#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update) + +extern struct aclk_shared_state { + // optimization to avoid looping trough hosts + // every time Query Thread wakes up + RRDHOST *next_popcorn_host; + + // read only while ACLK connected + // protect by lock otherwise + int version_neg; + usec_t version_neg_wait_till; +} aclk_shared_state; + +typedef enum aclk_proxy_type { + PROXY_TYPE_UNKNOWN = 0, + PROXY_TYPE_SOCKS5, + PROXY_TYPE_HTTP, + PROXY_DISABLED, + PROXY_NOT_SET, +} ACLK_PROXY_TYPE; + +extern int aclk_kill_link; // Tells the agent to tear down the link +extern int aclk_disable_runtime; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); + +#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" +#define ACLK_PROXY_ENV "env" +#define ACLK_PROXY_CONFIG_VAR "proxy" + +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); +void safe_log_proxy_censor(char *proxy); +int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); + +#endif //ACLK_COMMON_H diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c new file mode 100644 index 000000000..c1856ed2c --- /dev/null +++ b/aclk/legacy/aclk_lws_https_client.c @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define ACLK_LWS_HTTPS_CLIENT_INTERNAL +#include "aclk_lws_https_client.h" + +#include "aclk_common.h" + +#include "aclk_lws_wss_client.h" + +#define SMALL_BUFFER 16 + +struct simple_hcc_data { + char *data; + size_t data_size; + size_t written; + char lws_work_buffer[1024 + LWS_PRE]; + char *payload; + int response_code; + int done; +}; + +static int simple_https_client_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + UNUSED(user); + int n; + char *ptr; + char buffer[SMALL_BUFFER]; + struct simple_hcc_data *perconn_data = lws_get_opaque_user_data(wsi); + + switch (reason) { + case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ: + debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ"); + if (perconn_data->data_size - 1 - perconn_data->written < len) + return 1; + memcpy(&perconn_data->data[perconn_data->written], in, len); + perconn_data->written += len; + return 0; + case LWS_CALLBACK_RECEIVE_CLIENT_HTTP: + debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP"); + if(!perconn_data) { + error("Missing Per Connect Data"); + return -1; + } + n = sizeof(perconn_data->lws_work_buffer) - LWS_PRE; + ptr = perconn_data->lws_work_buffer + LWS_PRE; + if (lws_http_client_read(wsi, &ptr, &n) < 0) + return -1; + perconn_data->data[perconn_data->written] = '\0'; + return 0; + case LWS_CALLBACK_WSI_DESTROY: + debug(D_ACLK, "LWS_CALLBACK_WSI_DESTROY"); + if(perconn_data) + perconn_data->done = 1; + return 0; + case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: + debug(D_ACLK, "LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP"); + if(perconn_data) + perconn_data->response_code = lws_http_client_http_response(wsi); + return 0; + case LWS_CALLBACK_CLOSED_CLIENT_HTTP: + debug(D_ACLK, "LWS_CALLBACK_CLOSED_CLIENT_HTTP"); + return 0; + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + debug(D_ACLK, "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS"); + return 0; + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER"); + if(perconn_data && perconn_data->payload) { + unsigned char **p = (unsigned char **)in, *end = (*p) + len; + snprintfz(buffer, SMALL_BUFFER, "%zu", strlen(perconn_data->payload)); + if (lws_add_http_header_by_token(wsi, + WSI_TOKEN_HTTP_CONTENT_LENGTH, + (unsigned char *)buffer, strlen(buffer), p, end)) + return -1; + if (lws_add_http_header_by_token(wsi, + WSI_TOKEN_HTTP_CONTENT_TYPE, + (unsigned char *)ACLK_CONTENT_TYPE_JSON, + strlen(ACLK_CONTENT_TYPE_JSON), p, end)) + return -1; + lws_client_http_body_pending(wsi, 1); + lws_callback_on_writable(wsi); + } + return 0; + case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_WRITEABLE"); + if(perconn_data && perconn_data->payload) { + n = strlen(perconn_data->payload); + if(perconn_data->data_size < (size_t)LWS_PRE + n + 1) { + error("Buffer given is not big enough"); + return 1; + } + + memcpy(&perconn_data->data[LWS_PRE], perconn_data->payload, n); + if(n != lws_write(wsi, (unsigned char*)&perconn_data->data[LWS_PRE], n, LWS_WRITE_HTTP)) { + error("lws_write error"); + perconn_data->data[0] = 0; + return 1; + } + lws_client_http_body_pending(wsi, 0); + // clean for subsequent reply read + perconn_data->data[0] = 0; + } + return 0; + case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL"); + return 0; + case LWS_CALLBACK_WSI_CREATE: + debug(D_ACLK, "LWS_CALLBACK_WSI_CREATE"); + return 0; + case LWS_CALLBACK_PROTOCOL_INIT: + debug(D_ACLK, "LWS_CALLBACK_PROTOCOL_INIT"); + return 0; + case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL"); + return 0; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + debug(D_ACLK, "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"); + return 0; + case LWS_CALLBACK_GET_THREAD_ID: + debug(D_ACLK, "LWS_CALLBACK_GET_THREAD_ID"); + return 0; + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + debug(D_ACLK, "LWS_CALLBACK_EVENT_WAIT_CANCELLED"); + return 0; + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + debug(D_ACLK, "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION"); + return 0; + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + debug(D_ACLK, "LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH"); + return 0; + default: + debug(D_ACLK, "Unknown callback %d", (int)reason); + return 0; + } +} + +static const struct lws_protocols protocols[] = { + { + "http", + simple_https_client_callback, + 0, + 0, + 0, + 0, + 0 + }, + { NULL, NULL, 0, 0, 0, 0, 0 } +}; + +static void simple_hcc_log_divert(int level, const char *line) +{ + UNUSED(level); + error("Libwebsockets: %s", line); +} + +int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload) +{ + info("%s %s", __func__, method); + + struct lws_context_creation_info info; + struct lws_client_connect_info i; + struct lws_context *context; + + struct simple_hcc_data *data = callocz(1, sizeof(struct simple_hcc_data)); + data->data = b; + data->data[0] = 0; + data->data_size = b_size; + data->payload = payload; + + int n = 0; + time_t timestamp; + + struct lws_vhost *vhost; + + memset(&info, 0, sizeof info); + + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + + + context = lws_create_context(&info); + if (!context) { + error("Error creating LWS context"); + freez(data); + return 1; + } + + lws_set_log_level(LLL_ERR | LLL_WARN, simple_hcc_log_divert); + + lws_service(context, 0); + + memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */ + i.context = context; + +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE; + info("Disabling SSL certificate checks"); +#else + i.ssl_connection = LCCSCF_USE_SSL; +#endif +#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0 +#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM. + i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; +#endif + + i.port = port; + i.address = host; + i.path = url; + + i.host = i.address; + i.origin = i.address; + i.method = method; + i.opaque_user_data = data; + i.alpn = "http/1.1"; + + i.protocol = protocols[0].name; + + vhost = lws_get_vhost_by_name(context, "default"); + if(!vhost) + fatal("Could not find the default LWS vhost."); + + //set up proxy + aclk_wss_set_proxy(vhost); + + lws_client_connect_via_info(&i); + + // libwebsockets handle connection timeouts already + // this adds additional safety in case of bug in LWS + timestamp = now_monotonic_sec(); + while( n >= 0 && !data->done && !netdata_exit) { + n = lws_service(context, 0); + if( now_monotonic_sec() - timestamp > SEND_HTTPS_REQUEST_TIMEOUT ) { + data->data[0] = 0; + data->done = 1; + error("Servicing LWS took too long."); + } + } + + lws_context_destroy(context); + + n = data->response_code; + + freez(data); + return (n < 200 || n >= 300); +} diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h new file mode 100644 index 000000000..811809dd1 --- /dev/null +++ b/aclk/legacy/aclk_lws_https_client.h @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_LWS_HTTPS_CLIENT_H +#define NETDATA_LWS_HTTPS_CLIENT_H + +#include "../../daemon/common.h" +#include "libnetdata/libnetdata.h" + +#define DATAMAXLEN 1024*16 + +#ifdef ACLK_LWS_HTTPS_CLIENT_INTERNAL +#define ACLK_CONTENT_TYPE_JSON "application/json" +#define SEND_HTTPS_REQUEST_TIMEOUT 30 +#endif + +int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload); + +#endif /* NETDATA_LWS_HTTPS_CLIENT_H */ diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c new file mode 100644 index 000000000..2e6fd4ec8 --- /dev/null +++ b/aclk/legacy/aclk_lws_wss_client.c @@ -0,0 +1,613 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_lws_wss_client.h" + +#include "libnetdata/libnetdata.h" +#include "../../daemon/common.h" +#include "aclk_common.h" +#include "aclk_stats.h" + +extern int aclk_shutting_down; + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + +struct aclk_lws_wss_perconnect_data { + int todo; +}; + +static struct aclk_lws_wss_engine_instance *engine_instance = NULL; + +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len) +{ + if (write_len != NULL && write_len_bytes != NULL) + { + *write_len = 0; + *write_len_bytes = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w,wb; + for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + { + w++; + wb += write_b->data_size - write_b->written; + } + *write_len = w; + *write_len_bytes = wb; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + else if (write_len != NULL) + { + *write_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + + struct lws_wss_packet_buffer *write_b; + size_t w; + for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) + w++; + *write_len = w; + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + } + } + if (read_len != NULL) + { + *read_len = 0; + if (engine_instance != NULL) + { + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + } + } +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size) +{ + struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); + if (data) { + new->data = mallocz(LWS_PRE + size); + memcpy(new->data + LWS_PRE, data, size); + new->data_size = size; + new->written = 0; + } + return new; +} + +static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item) +{ + struct lws_wss_packet_buffer *tail = *list; + if (!*list) { + *list = item; + return; + } + while (tail->next) { + tail = tail->next; + } + tail->next = item; +} + +static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *ret = *list; + if (ret != NULL) + *list = ret->next; + + return ret; +} + +static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item) +{ + freez(item->data); + freez(item); +} + +static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer) +{ + size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL); + if (elems > 0) + lws_ring_consume(ringbuffer, NULL, NULL, elems); +} + +static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list) +{ + struct lws_wss_packet_buffer *i; + while ((i = lws_wss_packet_buffer_pop(list)) != NULL) { + lws_wss_packet_buffer_free(i); + } + *list = NULL; +} + +static inline void aclk_lws_wss_clear_io_buffers() +{ + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer); + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head); + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); +} + +static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback, + sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 }, + { NULL, NULL, 0, 0, 0, 0, 0 } }; + +static void aclk_lws_wss_log_divert(int level, const char *line) +{ + switch (level) { + case LLL_ERR: + error("Libwebsockets Error: %s", line); + break; + case LLL_WARN: + debug(D_ACLK, "Libwebsockets Warn: %s", line); + break; + default: + error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line); + } +} + +static int aclk_lws_wss_client_init( char *target_hostname, int target_port) +{ + static int lws_logging_initialized = 0; + + if (unlikely(!lws_logging_initialized)) { + lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); + lws_logging_initialized = 1; + } + + if (!target_hostname) + return 1; + + engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance)); + + engine_instance->host = target_hostname; + engine_instance->port = target_port; + + + aclk_lws_mutex_init(&engine_instance->write_buf_mutex); + aclk_lws_mutex_init(&engine_instance->read_buf_mutex); + + engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL); + if (!engine_instance->read_ringbuffer) + goto failure_cleanup; + + return 0; + +failure_cleanup: + freez(engine_instance); + return 1; +} + +void aclk_lws_wss_destroy_context() +{ + if (!engine_instance) + return; + if (!engine_instance->lws_context) + return; + lws_context_destroy(engine_instance->lws_context); + engine_instance->lws_context = NULL; +} + + +void aclk_lws_wss_client_destroy() +{ + if (engine_instance == NULL) + return; + + aclk_lws_wss_destroy_context(); + engine_instance->lws_wsi = NULL; + + aclk_lws_wss_clear_io_buffers(); + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + pthread_mutex_destroy(&engine_instance->write_buf_mutex); + pthread_mutex_destroy(&engine_instance->read_buf_mutex); +#endif +} + +#ifdef LWS_WITH_SOCKS5 +static int aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks) +{ + char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + if (!proxy) + return -1; + + proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + if (!*proxy) + return -1; + + return lws_set_socks(vhost, proxy); +} +#endif + +void aclk_wss_set_proxy(struct lws_vhost *vhost) +{ + const char *proxy; + ACLK_PROXY_TYPE proxy_type; + char *log; + + proxy = aclk_get_proxy(&proxy_type); + +#ifdef LWS_WITH_SOCKS5 + lws_set_socks(vhost, ":"); +#endif + lws_set_proxy(vhost, ":"); + + if (proxy_type == PROXY_TYPE_UNKNOWN) { + error("Unknown proxy type"); + return; + } + + if (proxy_type == PROXY_TYPE_SOCKS5 || proxy_type == PROXY_TYPE_HTTP) { + log = strdupz(proxy); + safe_log_proxy_censor(log); + info("Connecting using %s proxy:\"%s\"", aclk_proxy_type_to_s(&proxy_type), log); + freez(log); + } + if (proxy_type == PROXY_TYPE_SOCKS5) { +#ifdef LWS_WITH_SOCKS5 + if (aclk_wss_set_socks(vhost, proxy)) + error("LWS failed to accept socks proxy."); + return; +#else + fatal("We have no SOCKS5 support but we made it here. Programming error!"); +#endif + } + if (proxy_type == PROXY_TYPE_HTTP) { + if (lws_set_proxy(vhost, proxy)) + error("LWS failed to accept http proxy."); + return; + } + if (proxy_type != PROXY_DISABLED) + error("Unknown proxy type"); +} + +// Return code indicates if connection attempt has started async. +int aclk_lws_wss_connect(char *host, int port) +{ + struct lws_client_connect_info i; + struct lws_vhost *vhost; + int n; + + if (!engine_instance) { + if (aclk_lws_wss_client_init(host, port)) + return 1; // Propagate failure + } + + if (!engine_instance->lws_context) + { + // First time through (on this connection), create the context + struct lws_context_creation_info info; + memset(&info, 0, sizeof(struct lws_context_creation_info)); + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; + info.protocols = protocols; + engine_instance->lws_context = lws_create_context(&info); + if (!engine_instance->lws_context) + { + error("Failed to create lws_context, ACLK will not function"); + return 1; + } + return 0; + // PROTOCOL_INIT callback will call again. + } + + for (n = 0; n < ACLK_LWS_CALLBACK_HISTORY; n++) + engine_instance->lws_callback_history[n] = 0; + + if (engine_instance->lws_wsi) { + error("Already Connected. Only one connection supported at a time."); + return 0; + } + + memset(&i, 0, sizeof(i)); + i.context = engine_instance->lws_context; + i.port = engine_instance->port; + i.address = engine_instance->host; + i.path = "/mqtt"; + i.host = engine_instance->host; + i.protocol = "mqtt"; + + // from LWS docu: + // If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is + // created; you're expected to create your own vhosts afterwards using + // lws_create_vhost(). Otherwise a vhost named "default" is also created + // using the information in the vhost-related members, for compatibility. + vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default"); + if(!vhost) + fatal("Could not find the default LWS vhost."); + + aclk_wss_set_proxy(vhost); + +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE; + info("Disabling SSL certificate checks"); +#else + i.ssl_connection = LCCSCF_USE_SSL; +#endif +#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0 +#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM. + i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; +#endif + lws_client_connect_via_info(&i); + return 0; +} + +static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len) +{ + if (lws_ring_insert(buffer, data, len) != len) { + error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding."); + return 0; + } + return 1; +} + +static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) +{ + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + return "LWS_CALLBACK_CLIENT_WRITEABLE"; + case LWS_CALLBACK_CLIENT_RECEIVE: + return "LWS_CALLBACK_CLIENT_RECEIVE"; + case LWS_CALLBACK_PROTOCOL_INIT: + return "LWS_CALLBACK_PROTOCOL_INIT"; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"; + case LWS_CALLBACK_USER: + return "LWS_CALLBACK_USER"; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR"; + case LWS_CALLBACK_CLIENT_CLOSED: + return "LWS_CALLBACK_CLIENT_CLOSED"; + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE"; + case LWS_CALLBACK_WSI_DESTROY: + return "LWS_CALLBACK_WSI_DESTROY"; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + return "LWS_CALLBACK_CLIENT_ESTABLISHED"; + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION"; + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + return "LWS_CALLBACK_EVENT_WAIT_CANCELLED"; + default: + // Not using an internal buffer here for thread-safety with unknown calling context. + error("Unknown LWS callback %u", reason); + return "unknown"; + } +} + +void aclk_lws_wss_fail_report() +{ + int i; + int anything_to_send = 0; + BUFFER *buf; + + if (netdata_anonymous_statistics_enabled <= 0) + return; + + // guess - most of the callback will be 1-99 + ',' + \0 + buf = buffer_create((ACLK_LWS_CALLBACK_HISTORY * 2) + 10); + + for (i = 0; i < ACLK_LWS_CALLBACK_HISTORY; i++) + if (engine_instance->lws_callback_history[i]) { + buffer_sprintf(buf, "%s%d", (i ? "," : ""), engine_instance->lws_callback_history[i]); + anything_to_send = 1; + } + + if (anything_to_send) + send_statistics("ACLK_CONN_FAIL", "FAIL", buffer_tostring(buf)); + + buffer_free(buf); +} + +static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + UNUSED(user); + struct lws_wss_packet_buffer *data; + int retval = 0; + static int lws_shutting_down = 0; + int i; + + for (i = ACLK_LWS_CALLBACK_HISTORY - 1; i > 0; i--) + engine_instance->lws_callback_history[i] = engine_instance->lws_callback_history[i - 1]; + engine_instance->lws_callback_history[0] = (int)reason; + + if (unlikely(aclk_shutting_down && !lws_shutting_down)) { + lws_shutting_down = 1; + retval = -1; + engine_instance->upstream_reconnect_request = 0; + } + + // Callback servicing is forced when we are closed from above. + if (engine_instance->upstream_reconnect_request) { + error("Closing lws connectino due to libmosquitto error."); + char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; + lws_close_reason( + wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error, + strlen(upstream_connection_error)); + retval = -1; + engine_instance->upstream_reconnect_request = 0; + } + + // Don't log to info - volume is proportional to message flow on ACLK. + switch (reason) { + case LWS_CALLBACK_CLIENT_WRITEABLE: + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + data = engine_instance->write_buffer_head; + if (likely(data)) { + size_t bytes_left = data->data_size - data->written; + if ( bytes_left > FRAGMENT_SIZE) + bytes_left = FRAGMENT_SIZE; + int n = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY); + if (n>=0) { + data->written += n; + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.write_q_consumed += n; + ACLK_STATS_UNLOCK; + } + } + //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc); + if (data->written == data->data_size) + { + lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head); + lws_wss_packet_buffer_free(data); + } + if (engine_instance->write_buffer_head) + lws_callback_on_writable(engine_instance->lws_wsi); + } + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + return retval; + + case LWS_CALLBACK_CLIENT_RECEIVE: + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len)) + retval = 1; + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.read_q_added += len; + ACLK_STATS_UNLOCK; + } + + // to future myself -> do not call this while read lock is active as it will eventually + // want to acquire same lock later in aclk_lws_wss_client_read() function + aclk_lws_connection_data_received(); + return retval; + + case LWS_CALLBACK_WSI_CREATE: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_GET_THREAD_ID: // ? + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + // Expected and safe to ignore. + debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason)); + return retval; + + default: + // Pass to next switch, this case removes compiler warnings. + break; + } + // Log to info - volume is proportional to connection attempts. + info("Processing callback %s", aclk_lws_callback_name(reason)); + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection + break; + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi) + error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi); + engine_instance->lws_wsi = wsi; + break; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + error( + "Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host, + engine_instance->port, (in ? (char *)in : "not given")); + // Fall-through + case LWS_CALLBACK_CLIENT_CLOSED: + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback + aclk_lws_connection_closed(); + return -1; // the callback response is ignored, hope the above remains true + case LWS_CALLBACK_WSI_DESTROY: + aclk_lws_wss_clear_io_buffers(); + if (!engine_instance->websocket_connection_up) + aclk_lws_wss_fail_report(); + engine_instance->lws_wsi = NULL; + engine_instance->websocket_connection_up = 0; + aclk_lws_connection_closed(); + break; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + engine_instance->websocket_connection_up = 1; + aclk_lws_connection_established(engine_instance->host, engine_instance->port); + break; + + default: + error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason)); + break; + } + return retval; //0-OK, other connection should be closed! +} + +int aclk_lws_wss_client_write(void *buf, size_t count) +{ + if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) { + aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); + lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count)); + aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.write_q_added += count; + ACLK_STATS_UNLOCK; + } + + lws_callback_on_writable(engine_instance->lws_wsi); + return count; + } + return 0; +} + +int aclk_lws_wss_client_read(void *buf, size_t count) +{ + size_t data_to_be_read = count; + + aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); + size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); + if (unlikely(readable_byte_count == 0)) { + errno = EAGAIN; + data_to_be_read = -1; + goto abort; + } + + if (readable_byte_count < data_to_be_read) + data_to_be_read = readable_byte_count; + + data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read); + if (data_to_be_read == readable_byte_count) + engine_instance->data_to_read = 0; + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.read_q_consumed += data_to_be_read; + ACLK_STATS_UNLOCK; + } + +abort: + aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); + return data_to_be_read; +} + +void aclk_lws_wss_service_loop() +{ + if (engine_instance) + { + /*if (engine_instance->lws_wsi) { + lws_cancel_service(engine_instance->lws_context); + lws_callback_on_writable(engine_instance->lws_wsi); + }*/ + lws_service(engine_instance->lws_context, 0); + } +} + +// in case the MQTT connection disconnect while lws transport is still operational +// we should drop connection and reconnect +// this function should be called when that happens to notify lws of that situation +void aclk_lws_wss_mqtt_layer_disconect_notif() +{ + if (!engine_instance) + return; + if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) { + engine_instance->upstream_reconnect_request = 1; + lws_callback_on_writable( + engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written. + } +} diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h new file mode 100644 index 000000000..584a3cf4f --- /dev/null +++ b/aclk/legacy/aclk_lws_wss_client.h @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_LWS_WSS_CLIENT_H +#define ACLK_LWS_WSS_CLIENT_H + +#include + +#include "libnetdata/libnetdata.h" + +// This is as define because ideally the ACLK at high level +// can do mosqitto writes and reads only from one thread +// which is cleaner implementation IMHO +// in such case this mutexes are not necessarry and life +// is simpler +#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1 + +#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES (128 * 1024) + +#define ACLK_LWS_CALLBACK_HISTORY 10 + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED +#define aclk_lws_mutex_init(x) netdata_mutex_init(x) +#define aclk_lws_mutex_lock(x) netdata_mutex_lock(x) +#define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x) +#else +#define aclk_lws_mutex_init(x) +#define aclk_lws_mutex_lock(x) +#define aclk_lws_mutex_unlock(x) +#endif + +struct aclk_lws_wss_engine_callbacks { + void (*connection_established_callback)(); + void (*data_rcvd_callback)(); + void (*data_writable_callback)(); + void (*connection_closed)(); +}; + +struct lws_wss_packet_buffer { + unsigned char *data; + size_t data_size, written; + struct lws_wss_packet_buffer *next; +}; + +struct aclk_lws_wss_engine_instance { + //target host/port for connection + char *host; + int port; + + //internal data + struct lws_context *lws_context; + struct lws *lws_wsi; + +#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED + netdata_mutex_t write_buf_mutex; + netdata_mutex_t read_buf_mutex; +#endif + + struct lws_wss_packet_buffer *write_buffer_head; + struct lws_ring *read_ringbuffer; + + //flags to be readed by engine user + int websocket_connection_up; + + // currently this is by default disabled + + int data_to_read; + int upstream_reconnect_request; + + int lws_callback_history[ACLK_LWS_CALLBACK_HISTORY]; +}; + +void aclk_lws_wss_client_destroy(); +void aclk_lws_wss_destroy_context(); + +int aclk_lws_wss_connect(char *host, int port); + +int aclk_lws_wss_client_write(void *buf, size_t count); +int aclk_lws_wss_client_read(void *buf, size_t count); +void aclk_lws_wss_service_loop(); + +void aclk_lws_wss_mqtt_layer_disconect_notif(); + +// Notifications inside the layer above +void aclk_lws_connection_established(); +void aclk_lws_connection_data_received(); +void aclk_lws_connection_closed(); +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); + +void aclk_wss_set_proxy(struct lws_vhost *vhost); + +#define FRAGMENT_SIZE 4096 +#endif diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c new file mode 100644 index 000000000..7ab534f16 --- /dev/null +++ b/aclk/legacy/aclk_query.c @@ -0,0 +1,789 @@ +#include "aclk_common.h" +#include "aclk_query.h" +#include "aclk_stats.h" +#include "aclk_rx_msgs.h" + +#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) + +volatile int aclk_connected = 0; + +#ifndef __GNUC__ +#pragma region ACLK_QUEUE +#endif + +static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex) +#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex) + +struct aclk_query { + usec_t created; + usec_t created_boot_time; + time_t run_after; // Delay run until after this time + ACLK_CMD cmd; // What command is this + char *topic; // Topic to respond to + char *data; // Internal data (NULL if request from the cloud) + char *msg_id; // msg_id generated by the cloud (NULL if internal) + char *query; // The actual query + u_char deleted; // Mark deleted for garbage collect + struct aclk_query *next; +}; + +struct aclk_query_queue { + struct aclk_query *aclk_query_head; + struct aclk_query *aclk_query_tail; + unsigned int count; +} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; + + +unsigned int aclk_query_size() +{ + int r; + ACLK_QUEUE_LOCK; + r = aclk_queue.count; + ACLK_QUEUE_UNLOCK; + return r; +} + +/* + * Free a query structure when done + */ +static void aclk_query_free(struct aclk_query *this_query) +{ + if (unlikely(!this_query)) + return; + + freez(this_query->topic); + if (likely(this_query->query)) + freez(this_query->query); + if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) { + struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data; + freez(del->data); + freez(del); + } + if (likely(this_query->msg_id)) + freez(this_query->msg_id); + freez(this_query); +} + +/* + * Get the next query to process - NULL if nothing there + * The caller needs to free memory by calling aclk_query_free() + * + * topic + * query + * The structure itself + * + */ +static struct aclk_query *aclk_queue_pop() +{ + struct aclk_query *this_query; + + ACLK_QUEUE_LOCK; + + if (likely(!aclk_queue.aclk_query_head)) { + ACLK_QUEUE_UNLOCK; + return NULL; + } + + this_query = aclk_queue.aclk_query_head; + + // Get rid of the deleted entries + while (this_query && this_query->deleted) { + aclk_queue.count--; + + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + aclk_query_free(this_query); + + this_query = aclk_queue.aclk_query_head; + } + + if (likely(!this_query)) { + ACLK_QUEUE_UNLOCK; + return NULL; + } + + if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { + info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); + ACLK_QUEUE_UNLOCK; + return NULL; + } + + aclk_queue.count--; + aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; + + if (likely(!aclk_queue.aclk_query_head)) { + aclk_queue.aclk_query_tail = NULL; + } + + ACLK_QUEUE_UNLOCK; + return this_query; +} + +// Returns the entry after which we need to create a new entry to run at the specified time +// If NULL is returned we need to add to HEAD +// Need to have a QUERY lock before calling this + +static struct aclk_query *aclk_query_find_position(time_t time_to_run) +{ + struct aclk_query *tmp_query, *last_query; + + // Quick check if we will add to the end + if (likely(aclk_queue.aclk_query_tail)) { + if (aclk_queue.aclk_query_tail->run_after <= time_to_run) + return aclk_queue.aclk_query_tail; + } + + last_query = NULL; + tmp_query = aclk_queue.aclk_query_head; + + while (tmp_query) { + if (tmp_query->run_after > time_to_run) + return last_query; + last_query = tmp_query; + tmp_query = tmp_query->next; + } + return last_query; +} + +// Need to have a QUERY lock before calling this +static struct aclk_query * +aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) +{ + struct aclk_query *tmp_query, *prev_query; + UNUSED(cmd); + + tmp_query = aclk_queue.aclk_query_head; + prev_query = NULL; + while (tmp_query) { + if (likely(!tmp_query->deleted)) { + if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { + if ((!data || data == tmp_query->data) && + (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { + if (likely(last_query)) + *last_query = prev_query; + return tmp_query; + } + } + } + prev_query = tmp_query; + tmp_query = tmp_query->next; + } + return NULL; +} + +/* + * Add a query to execute, the result will be send to the specified topic + */ + +int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) +{ + struct aclk_query *new_query, *tmp_query; + + // Ignore all commands while we wait for the agent to initialize + if (unlikely(!aclk_connected)) + return 1; + + run_after = now_realtime_sec() + run_after; + + ACLK_QUEUE_LOCK; + struct aclk_query *last_query = NULL; + + tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); + if (unlikely(tmp_query)) { + if (tmp_query->run_after == run_after) { + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + if (last_query) + last_query->next = tmp_query->next; + else + aclk_queue.aclk_query_head = tmp_query->next; + + debug(D_ACLK, "Removing double entry"); + aclk_query_free(tmp_query); + aclk_queue.count--; + } + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_queued++; + ACLK_STATS_UNLOCK; + } + + new_query = callocz(1, sizeof(struct aclk_query)); + new_query->cmd = aclk_cmd; + if (internal) { + new_query->topic = strdupz(topic); + if (likely(query)) + new_query->query = strdupz(query); + } else { + new_query->topic = topic; + new_query->query = query; + new_query->msg_id = msg_id; + } + + new_query->data = data; + new_query->next = NULL; + new_query->created = now_realtime_usec(); + new_query->created_boot_time = now_boottime_usec(); + new_query->run_after = run_after; + + debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); + + tmp_query = aclk_query_find_position(run_after); + + if (tmp_query) { + new_query->next = tmp_query->next; + tmp_query->next = new_query; + if (tmp_query == aclk_queue.aclk_query_tail) + aclk_queue.aclk_query_tail = new_query; + aclk_queue.count++; + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; + } + + new_query->next = aclk_queue.aclk_query_head; + aclk_queue.aclk_query_head = new_query; + aclk_queue.count++; + + ACLK_QUEUE_UNLOCK; + QUERY_THREAD_WAKEUP; + return 0; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region Helper Functions +#endif + +/* + * Take a buffer, encode it and rewrite it + * + */ + +static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines) +{ + char *tmp_buffer = mallocz(content_size * 2); + char *dst = tmp_buffer; + while (content_size > 0) { + switch (*src) { + case '\n': + if (keep_newlines) + { + *dst++ = '\\'; + *dst++ = 'n'; + } + break; + case '\t': + break; + case 0x01 ... 0x08: + case 0x0b ... 0x1F: + *dst++ = '\\'; + *dst++ = 'u'; + *dst++ = '0'; + *dst++ = '0'; + *dst++ = (*src < 0x0F) ? '0' : '1'; + *dst++ = to_hex(*src); + break; + case '\"': + *dst++ = '\\'; + *dst++ = *src; + break; + default: + *dst++ = *src; + } + src++; + content_size--; + } + *dst = '\0'; + + return tmp_buffer; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region ACLK_QUERY +#endif + +static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) +{ + usec_t t = now_boottime_usec(); + aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); + + w->response.code = web_client_api_request_v1(host, w, url); + t = now_boottime_usec() - t; + + aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t); + + return t; +} + +static int aclk_execute_query(struct aclk_query *this_query) +{ + if (strncmp(this_query->query, "/api/v1/", 8) == 0) { + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); + *mysep = '\0'; + } else + strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); + + mysep = strrchr(this_query->query, '/'); + + // TODO: handle bad response perhaps in a different way. For now it does to the payload + aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + now_realtime_timeval(&w->tv_ready); + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); + char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); + + buffer_sprintf( + local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}", + w->response.code, encoded_response, encoded_header); + + buffer_sprintf(local_buffer, "\n}"); + + debug(D_ACLK, "Response:%s", encoded_header); + + aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); + + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + freez(encoded_response); + freez(encoded_header); + return 0; + } + return 1; +} + +static int aclk_execute_query_v2(struct aclk_query *this_query) +{ + int retval = 0; + usec_t t; + BUFFER *local_buffer = NULL; + struct aclk_cloud_req_v2 *cloud_req = (struct aclk_cloud_req_v2 *)this_query->data; + +#ifdef NETDATA_WITH_ZLIB + int z_ret; + BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *start, *end; +#endif + + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(this_query->query, '?'); + if (mysep) { + url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); + *mysep = '\0'; + } else + url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + + mysep = strrchr(this_query->query, '/'); + + // execute the query + t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + +#ifdef NETDATA_WITH_ZLIB + // check if gzip encoding can and should be used + if ((start = strstr(cloud_req->data, WEB_HDR_ACCEPT_ENC))) { + start += strlen(WEB_HDR_ACCEPT_ENC); + end = strstr(start, "\x0D\x0A"); + start = strstr(start, "gzip"); + + if (start && start < end) { + w->response.zstream.zalloc = Z_NULL; + w->response.zstream.zfree = Z_NULL; + w->response.zstream.opaque = Z_NULL; + if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { + w->response.zinitialized = 1; + w->response.zoutput = 1; + } else + error("Failed to initialize zlib. Proceeding without compression."); + } + } + + if (w->response.data->len && w->response.zinitialized) { + w->response.zstream.next_in = (Bytef *)w->response.data->buffer; + w->response.zstream.avail_in = w->response.data->len; + do { + w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE; + w->response.zstream.next_out = w->response.zbuffer; + z_ret = deflate(&w->response.zstream, Z_FINISH); + if(z_ret < 0) { + if(w->response.zstream.msg) + error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); + else + error("Unknown error during zlib compression."); + retval = 1; + goto cleanup; + } + int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; + buffer_need_bytes(z_buffer, bytes_to_cpy); + memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy); + z_buffer->len += bytes_to_cpy; + } while(z_ret != Z_STREAM_END); + // so that web_client_build_http_header + // puts correct content lenght into header + buffer_free(w->response.data); + w->response.data = z_buffer; + z_buffer = NULL; + } +#endif + + now_realtime_timeval(&w->tv_ready); + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); + buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); + buffer_strcat(local_buffer, w->response.header_output->buffer); + + if (w->response.data->len) { +#ifdef NETDATA_WITH_ZLIB + if (w->response.zinitialized) { + buffer_need_bytes(local_buffer, w->response.data->len); + memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); + local_buffer->len += w->response.data->len; + } else { +#endif + buffer_strcat(local_buffer, w->response.data->buffer); +#ifdef NETDATA_WITH_ZLIB + } +#endif + } + + aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id); + +cleanup: +#ifdef NETDATA_WITH_ZLIB + if(w->response.zinitialized) + deflateEnd(&w->response.zstream); + buffer_free(z_buffer); +#endif + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + return retval; +} + +#define ACLK_HOST_PTR_COMPULSORY(x) \ + if (unlikely(!host)) { \ + errno = 0; \ + error(x " needs host pointer"); \ + break; \ + } + +/* + * This function will fetch the next pending command and process it + * + */ +static int aclk_process_query(struct aclk_query_thread *t_info) +{ + struct aclk_query *this_query; + static long int query_count = 0; + ACLK_METADATA_STATE meta_state; + RRDHOST *host; + + if (!aclk_connected) + return 0; + + this_query = aclk_queue_pop(); + if (likely(!this_query)) { + return 0; + } + + if (unlikely(this_query->deleted)) { + debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); + aclk_query_free(this_query); + return 1; + } + query_count++; + + host = (RRDHOST*)this_query->data; + + debug( + D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic, + this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS); + + switch (this_query->cmd) { + case ACLK_CMD_ONCONNECT: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { + error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); + break; + } +#else +#warning "This check became unnecessary. Remove" +#endif + + debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"", + host->hostname, + host->machine_guid); + + rrdhost_aclk_state_lock(host); + meta_state = host->aclk_state.metadata; + host->aclk_state.metadata = ACLK_METADATA_SENT; + rrdhost_aclk_state_unlock(host); + aclk_send_metadata(meta_state, host); + break; + + case ACLK_CMD_CHART: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART"); + + debug(D_ACLK, "EXECUTING a chart update command"); + aclk_send_single_chart(host, this_query->query); + break; + + case ACLK_CMD_CHARTDEL: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL"); + + debug(D_ACLK, "EXECUTING a chart delete command"); + //TODO: This send the info metadata for now + aclk_send_info_metadata(ACLK_METADATA_SENT, host); + break; + + case ACLK_CMD_ALARM: + debug(D_ACLK, "EXECUTING an alarm update command"); + aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); + break; + + case ACLK_CMD_CLOUD: + debug(D_ACLK, "EXECUTING a cloud command"); + aclk_execute_query(this_query); + break; + case ACLK_CMD_CLOUD_QUERY_2: + debug(D_ACLK, "EXECUTING Cloud Query v2"); + aclk_execute_query_v2(this_query); + break; + + case ACLK_CMD_CHILD_CONNECT: + case ACLK_CMD_CHILD_DISCONNECT: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT"); + + debug( + D_ACLK, "Execution Child %s command", + this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect"); + aclk_send_info_child_connection(host, this_query->cmd); + break; + + default: + errno = 0; + error("Unknown ACLK Query Command"); + break; + } + debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[t_info->idx]++; + ACLK_STATS_UNLOCK; + } + + aclk_query_free(this_query); + + return 1; +} + +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +{ + if (query_threads && query_threads->thread_list) { + for (int i = 0; i < query_threads->count; i++) { + netdata_thread_join(query_threads->thread_list[i].thread, NULL); + } + freez(query_threads->thread_list); + } + + struct aclk_query *this_query; + + do { + this_query = aclk_queue_pop(); + aclk_query_free(this_query); + } while (this_query); +} + +#define TASK_LEN_MAX 16 +void aclk_query_threads_start(struct aclk_query_threads *query_threads) +{ + info("Starting %d query threads.", query_threads->count); + + char thread_name[TASK_LEN_MAX]; + query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); + for (int i = 0; i < query_threads->count; i++) { + query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics + + if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0)) + error("snprintf encoding error"); + netdata_thread_create( + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i]); + } +} + +/** + * Checks and updates popcorning state of rrdhost + * returns actual/updated popcorning state + */ + +ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + ACLK_POPCORNING_STATE ret = host->aclk_state.state; + if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + if (!host->aclk_state.t_last_popcorn_update){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update; + + if (t_diff >= ACLK_STABLE_TIMEOUT) { + host->aclk_state.state = ACLK_HOST_STABLE; + host->aclk_state.t_last_popcorn_update = 0; + rrdhost_aclk_state_unlock(host); + info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff); + return ACLK_HOST_STABLE; + } + + rrdhost_aclk_state_unlock(host); + return ret; +} + +/** + * Main query processing thread + * + * On startup wait for the agent collectors to initialize + * Expect at least a time of ACLK_STABLE_TIMEOUT seconds + * of no new collectors coming in in order to mark the agent + * as stable (set agent_state = AGENT_STABLE) + */ +void *aclk_query_main_thread(void *ptr) +{ + struct aclk_query_thread *info = ptr; + + while (!netdata_exit) { + if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) { +#ifdef ACLK_DEBUG + _dump_collector_list(); +#endif + break; + } + sleep_usec(USEC_PER_SEC * 1); + } + + while (!netdata_exit) { + if(aclk_disable_runtime) { + sleep(1); + continue; + } + ACLK_SHARED_STATE_LOCK; + if (unlikely(!aclk_shared_state.version_neg)) { + if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { + ACLK_SHARED_STATE_UNLOCK; + info("Waiting for ACLK Version Negotiation message from Cloud"); + sleep(1); + continue; + } + errno = 0; + error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." + " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); + aclk_shared_state.version_neg = ACLK_VERSION_MIN; + aclk_set_rx_handlers(aclk_shared_state.version_neg); + } + ACLK_SHARED_STATE_UNLOCK; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { + if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + rrdhost_aclk_state_unlock(localhost); + errno = 0; + error("ACLK failed to queue on_connect command"); + sleep(1); + continue; + } + localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED; + } + rrdhost_aclk_state_unlock(localhost); + + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { + aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); + } + ACLK_SHARED_STATE_UNLOCK; + + while (aclk_process_query(info)) { + // Process all commands + }; + + QUERY_THREAD_LOCK; + + // TODO: Need to check if there are queries awaiting already + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + QUERY_THREAD_UNLOCK; + } + + return NULL; +} + +#ifndef __GNUC__ +#pragma endregion +#endif diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h new file mode 100644 index 000000000..53eef1392 --- /dev/null +++ b/aclk/legacy/aclk_query.h @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_QUERY_H +#define NETDATA_ACLK_QUERY_H + +#include "libnetdata/libnetdata.h" +#include "web/server/web_client.h" + +#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable + +extern pthread_cond_t query_cond_wait; +extern pthread_mutex_t query_lock_wait; +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) +#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) + +extern volatile int aclk_connected; + +struct aclk_query_thread { + netdata_thread_t thread; + int idx; +}; + +struct aclk_query_threads { + struct aclk_query_thread *thread_list; + int count; +}; + +struct aclk_cloud_req_v2 { + char *data; + RRDHOST *host; +}; + +void *aclk_query_main_thread(void *ptr); +int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); + +void aclk_query_threads_start(struct aclk_query_threads *query_threads); +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); +unsigned int aclk_query_size(); + +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/aclk_rrdhost_state.h b/aclk/legacy/aclk_rrdhost_state.h new file mode 100644 index 000000000..7ab3a502e --- /dev/null +++ b/aclk/legacy/aclk_rrdhost_state.h @@ -0,0 +1,42 @@ +#ifndef ACLK_RRDHOST_STATE_H +#define ACLK_RRDHOST_STATE_H + +#include "../../libnetdata/libnetdata.h" + +typedef enum aclk_cmd { + ACLK_CMD_CLOUD, + ACLK_CMD_ONCONNECT, + ACLK_CMD_INFO, + ACLK_CMD_CHART, + ACLK_CMD_CHARTDEL, + ACLK_CMD_ALARM, + ACLK_CMD_CLOUD_QUERY_2, + ACLK_CMD_CHILD_CONNECT, + ACLK_CMD_CHILD_DISCONNECT +} ACLK_CMD; + +typedef enum aclk_metadata_state { + ACLK_METADATA_REQUIRED, + ACLK_METADATA_CMD_QUEUED, + ACLK_METADATA_SENT +} ACLK_METADATA_STATE; + +typedef enum aclk_agent_state { + ACLK_HOST_INITIALIZING, + ACLK_HOST_STABLE +} ACLK_POPCORNING_STATE; + +typedef struct aclk_rrdhost_state { + char *claimed_id; // Claimed ID if host has one otherwise NULL + +#ifdef ENABLE_ACLK + // per child popcorning + ACLK_POPCORNING_STATE state; + ACLK_METADATA_STATE metadata; + + time_t timestamp_created; + time_t t_last_popcorn_update; +#endif /* ENABLE_ACLK */ +} aclk_rrdhost_state; + +#endif /* ACLK_RRDHOST_STATE_H */ diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c new file mode 100644 index 000000000..99fa9d987 --- /dev/null +++ b/aclk/legacy/aclk_rx_msgs.c @@ -0,0 +1,365 @@ + +#include "aclk_rx_msgs.h" + +#include "aclk_common.h" +#include "aclk_stats.h" +#include "aclk_query.h" + +#ifndef UUID_STR_LEN +#define UUID_STR_LEN 37 +#endif + +static inline int aclk_extract_v2_data(char *payload, char **data) +{ + char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR); + if(!ptr) + return 1; + ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR); + *data = strdupz(ptr); + return 0; +} + +#define ACLK_GET_REQ "GET " +#define ACLK_CHILD_REQ "/host/" +#define ACLK_CLOUD_REQ_V2_PREFIX "/api/v1/" +#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref)) +static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req) +{ + const char *start, *end, *ptr; + char uuid_str[UUID_STR_LEN]; + uuid_t uuid; + + errno = 0; + + if(STRNCMP_CONSTANT_PREFIX(cloud_req->data, ACLK_GET_REQ)) { + error("Only accepting GET HTTP requests from CLOUD"); + return 1; + } + start = ptr = cloud_req->data + strlen(ACLK_GET_REQ); + + if(!STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CHILD_REQ)) { + ptr += strlen(ACLK_CHILD_REQ); + if(strlen(ptr) < UUID_STR_LEN) { + error("the child id in URL too short \"%s\"", start); + return 1; + } + + strncpyz(uuid_str, ptr, UUID_STR_LEN - 1); + + for(int i = 0; i < UUID_STR_LEN && uuid_str[i]; i++) + uuid_str[i] = tolower(uuid_str[i]); + + if(ptr[0] && uuid_parse(uuid_str, uuid)) { + error("Got Child query (/host/XXX/...) host id \"%s\" doesn't look like valid GUID", uuid_str); + return 1; + } + ptr += UUID_STR_LEN - 1; + + cloud_req->host = rrdhost_find_by_guid(uuid_str, 0); + if(!cloud_req->host) { + error("Cannot find host with GUID \"%s\"", uuid_str); + return 1; + } + } + + if(STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CLOUD_REQ_V2_PREFIX)) { + error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); + return 1; + } + + if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) { + errno = 0; + error("Doesn't look like HTTP GET request."); + return 1; + } + + req->payload = mallocz((end - start) + 1); + strncpyz(req->payload, start, end - start); + + return 0; +} + +#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\ + if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\ + debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ + rrdhost_aclk_state_unlock(localhost);\ + return 1;\ + }\ + rrdhost_aclk_state_unlock(localhost); + +/* + * Parse the incoming payload and queue a command if valid + */ +static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + UNUSED(raw_payload); + HTTP_CHECK_AGENT_INITIALIZED(); + + errno = 0; + if (unlikely(cloud_to_agent->version != 1)) { + error( + "Received \"http\" message from Cloud with version %d, but ACLK version %d is used", + cloud_to_agent->version, + aclk_shared_state.version_neg); + return 1; + } + + if (unlikely(!cloud_to_agent->payload)) { + error("payload missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("callback_topic missing"); + return 1; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("msg_id missing"); + return 1; + } + + if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) + debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); + + return 0; +} + +static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + HTTP_CHECK_AGENT_INITIALIZED(); + + struct aclk_cloud_req_v2 *cloud_req; + char *data; + + errno = 0; + if (cloud_to_agent->version < ACLK_V_COMPRESSION) { + error( + "This handler cannot reply to request with version older than %d, received %d.", + ACLK_V_COMPRESSION, + cloud_to_agent->version); + return 1; + } + + if (unlikely(aclk_extract_v2_data(raw_payload, &data))) { + error("Error extracting payload expected after the JSON dictionary."); + return 1; + } + + cloud_req = mallocz(sizeof(struct aclk_cloud_req_v2)); + cloud_req->data = data; + cloud_req->host = localhost; + + if (unlikely(aclk_v2_payload_get_query(cloud_req, cloud_to_agent))) { + error("Could not extract payload from query"); + goto cleanup; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("Missing callback_topic"); + goto cleanup; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("Missing msg_id"); + goto cleanup; + } + + // aclk_queue_query takes ownership of data pointer + if (unlikely(aclk_queue_query( + cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, + ACLK_CMD_CLOUD_QUERY_2))) { + error("ACLK failed to queue incoming \"http\" v2 message"); + goto cleanup; + } + + return 0; +cleanup: + freez(cloud_req->data); + freez(cloud_req); + return 1; +} + +// This handles `version` message from cloud used to negotiate +// protocol version we will use +static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + UNUSED(raw_payload); + int version = -1; + errno = 0; + + if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { + error( + "Unsuported version of \"version\" message from cloud. Expected %d, Got %d", + ACLK_VERSION_NEG_VERSION, + cloud_to_agent->version); + return 1; + } + if (unlikely(!cloud_to_agent->min_version)) { + error("Min version missing or 0"); + return 1; + } + if (unlikely(!cloud_to_agent->max_version)) { + error("Max version missing or 0"); + return 1; + } + if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { + error( + "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, + cloud_to_agent->min_version); + return 1; + } + + if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { + error( + "Agent too old for this cloud. Minimum version required by cloud %d." + " Maximum version supported by this agent %d.", + cloud_to_agent->min_version, ACLK_VERSION_MAX); + aclk_kill_link = 1; + aclk_disable_runtime = 1; + return 1; + } + if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { + error( + "Cloud version is too old for this agent. Maximum version supported by cloud %d." + " Minimum (oldest) version supported by this agent %d.", + cloud_to_agent->max_version, ACLK_VERSION_MIN); + aclk_kill_link = 1; + return 1; + } + + version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); + + ACLK_SHARED_STATE_LOCK; + if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + errno = 0; + error("The \"version\" message came too late ignoring."); + goto err_cleanup; + } + if (unlikely(aclk_shared_state.version_neg)) { + errno = 0; + error("Version has already been set to %d", aclk_shared_state.version_neg); + goto err_cleanup; + } + aclk_shared_state.version_neg = version; + ACLK_SHARED_STATE_UNLOCK; + + info("Choosing version %d of ACLK", version); + + aclk_set_rx_handlers(version); + + return 0; + +err_cleanup: + ACLK_SHARED_STATE_UNLOCK; + return 1; +} + +typedef struct aclk_incoming_msg_type{ + char *name; + int(*fnc)(struct aclk_request *, char *); +}aclk_incoming_msg_type; + +aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = { + { .name = "http", .fnc = aclk_handle_cloud_request_v1 }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; + +aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { + { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; + +struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1; + +void aclk_set_rx_handlers(int version) +{ + if(version >= ACLK_V_COMPRESSION) { + aclk_incoming_msg_types = aclk_incoming_msg_types_compression; + return; + } + + aclk_incoming_msg_types = aclk_incoming_msg_types_v1; +} + +int aclk_handle_cloud_message(char *payload) +{ + struct aclk_request cloud_to_agent; + memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_recvd++; + ACLK_STATS_UNLOCK; + } + + if (unlikely(!payload)) { + errno = 0; + error("ACLK incoming message is empty"); + goto err_cleanup_nojson; + } + + debug(D_ACLK, "ACLK incoming message (%s)", payload); + + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(rc != JSON_OK)) { + errno = 0; + error("Malformed json request (%s)", payload); + goto err_cleanup; + } + + if (!cloud_to_agent.type_id) { + errno = 0; + error("Cloud message is missing compulsory key \"type\""); + goto err_cleanup; + } + + if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { + error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); + goto err_cleanup; + } + + for (int i = 0; aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { + if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { + // in case of success handler is supposed to clean up after itself + // or as in the case of aclk_handle_cloud_request take + // ownership of the pointers (done to avoid copying) + // see what `aclk_queue_query` parameter `internal` does + + // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! + // msg handlers (namely aclk_handle_version_responce) + // can freely change what aclk_incoming_msg_types points to + // so either exit or restart this for loop + freez(cloud_to_agent.type_id); + return 0; + } + goto err_cleanup; + } + } + + errno = 0; + error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id); + +err_cleanup: + if (cloud_to_agent.payload) + freez(cloud_to_agent.payload); + if (cloud_to_agent.type_id) + freez(cloud_to_agent.type_id); + if (cloud_to_agent.msg_id) + freez(cloud_to_agent.msg_id); + if (cloud_to_agent.callback_topic) + freez(cloud_to_agent.callback_topic); + +err_cleanup_nojson: + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; + } + + return 1; +} diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h new file mode 100644 index 000000000..3095e41a7 --- /dev/null +++ b/aclk/legacy/aclk_rx_msgs.h @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_RX_MSGS_H +#define NETDATA_ACLK_RX_MSGS_H + +#include "../../daemon/common.h" +#include "libnetdata/libnetdata.h" + +int aclk_handle_cloud_message(char *payload); +void aclk_set_rx_handlers(int version); + + +#endif /* NETDATA_ACLK_RX_MSGS_H */ diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c new file mode 100644 index 000000000..2a57cd6f0 --- /dev/null +++ b/aclk/legacy/aclk_stats.c @@ -0,0 +1,298 @@ +#include "aclk_stats.h" + +netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; + +int aclk_stats_enabled; + +int query_thread_count; + +// data ACLK stats need per query thread +struct aclk_qt_data { + RRDDIM *dim; +} *aclk_qt_data = NULL; + +uint32_t *aclk_queries_per_thread = NULL; +uint32_t *aclk_queries_per_thread_sample = NULL; + +struct aclk_metrics aclk_metrics = { + .online = 0, +}; + +struct aclk_metrics_per_sample aclk_metrics_per_sample; + +struct aclk_mat_metrics aclk_mat_metrics = { +#ifdef NETDATA_INTERNAL_CHECKS + .latency = { .name = "aclk_latency_mqtt", + .prio = 200002, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "ms", + .title = "ACLK Message Publish Latency" }, +#endif + + .cloud_q_db_query_time = { .name = "aclk_db_query_time", + .prio = 200006, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "us", + .title = "Time it took to process cloud requested DB queries" }, + + .cloud_q_recvd_to_processed = { .name = "aclk_cloud_q_recvd_to_processed", + .prio = 200007, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "us", + .title = "Time from receiving the Cloud Query until it was picked up " + "by query thread (just before passing to the database)." } +}; + +void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) +{ + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + if (metric->max < measurement) + metric->max = measurement; + + metric->total += measurement; + metric->count++; + ACLK_STATS_UNLOCK; + } +} + +static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent) +{ + static RRDSET *st_aclkstats = NULL; + static RRDDIM *rd_online_status = NULL; + + if (unlikely(!st_aclkstats)) { + st_aclkstats = rrdset_create_localhost( + "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status", + "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st_aclkstats); + + rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online); + + rrdset_done(st_aclkstats); +} + +static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st_query_thread = NULL; + static RRDDIM *rd_queued = NULL; + static RRDDIM *rd_dispatched = NULL; + + if (unlikely(!st_query_thread)) { + st_query_thread = rrdset_create_localhost( + "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s", + "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st_query_thread); + + rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued); + rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched); + + rrdset_done(st_query_thread); +} + +static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_wq_add = NULL; + static RRDDIM *rd_wq_consumed = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "kB/s", + "netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_wq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_wq_add, per_sample->write_q_added); + rrddim_set_by_pointer(st, rd_wq_consumed, per_sample->write_q_consumed); + + rrdset_done(st); +} + +static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_add = NULL; + static RRDDIM *rd_rq_consumed = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "kB/s", + "netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_rq_add, per_sample->read_q_added); + rrddim_set_by_pointer(st, rd_rq_consumed, per_sample->read_q_consumed); + + rrdset_done(st); +} + +static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_rcvd = NULL; + static RRDDIM *rd_rq_err = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s", + "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err); + rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err); + + rrdset_done(st); +} + +#define MAX_DIM_NAME 16 +static void aclk_stats_query_threads(uint32_t *queries_per_thread) +{ + static RRDSET *st = NULL; + + char dim_name[MAX_DIM_NAME]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", + "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < query_thread_count; i++) { + if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) + error("snprintf encoding error"); + aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } else + rrdset_next(st); + + for (int i = 0; i < query_thread_count; i++) { + rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); + } + + rrdset_done(st); +} + +static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct aclk_metric_mat_data *data) +{ + if(unlikely(!metric->st)) { + metric->st = rrdset_create_localhost( + "netdata", metric->name, NULL, "aclk", NULL, metric->title, metric->unit, "netdata", "stats", metric->prio, + localhost->rrd_update_every, RRDSET_TYPE_LINE); + + metric->rd_avg = rrddim_add(metric->st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + metric->rd_max = rrddim_add(metric->st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + metric->rd_total = rrddim_add(metric->st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(metric->st); + + if(data->count) + rrddim_set_by_pointer(metric->st, metric->rd_avg, roundf((float)data->total / data->count)); + else + rrddim_set_by_pointer(metric->st, metric->rd_avg, 0); + rrddim_set_by_pointer(metric->st, metric->rd_max, data->max); + rrddim_set_by_pointer(metric->st, metric->rd_total, data->total); + + rrdset_done(metric->st); +} + +void aclk_stats_thread_cleanup() +{ + freez(aclk_qt_data); + freez(aclk_queries_per_thread); + freez(aclk_queries_per_thread_sample); +} + +void *aclk_stats_main_thread(void *ptr) +{ + struct aclk_stats_thread *args = ptr; + + query_thread_count = args->query_thread_count; + aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); + aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + + heartbeat_t hb; + heartbeat_init(&hb); + usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; + + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + struct aclk_metrics_per_sample per_sample; + struct aclk_metrics permanent; + + while (!netdata_exit) { + netdata_thread_testcancel(); + // ------------------------------------------------------------------------ + // Wait for the next iteration point. + + heartbeat_next(&hb, step_ut); + if (netdata_exit) break; + + ACLK_STATS_LOCK; + // to not hold lock longer than necessary, especially not to hold it + // during database rrd* operations + memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); + memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); + memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); + ACLK_STATS_UNLOCK; + + aclk_stats_collect(&per_sample, &permanent); + aclk_stats_query_queue(&per_sample); + + aclk_stats_write_q(&per_sample); + aclk_stats_read_q(&per_sample); + + aclk_stats_cloud_req(&per_sample); + aclk_stats_query_threads(aclk_queries_per_thread_sample); + +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency); +#endif + aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_db_query_time, &per_sample.cloud_q_db_query_time); + aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_recvd_to_processed, &per_sample.cloud_q_recvd_to_processed); + } + + return 0; +} + +void aclk_stats_upd_online(int online) { + if(!aclk_stats_enabled) + return; + + ACLK_STATS_LOCK; + aclk_metrics.online = online; + + if(!online) + aclk_metrics_per_sample.offline_during_sample = 1; + ACLK_STATS_UNLOCK; +} diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h new file mode 100644 index 000000000..7e74fdf88 --- /dev/null +++ b/aclk/legacy/aclk_stats.h @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_STATS_H +#define NETDATA_ACLK_STATS_H + +#include "../../daemon/common.h" +#include "libnetdata/libnetdata.h" +#include "aclk_common.h" + +#define ACLK_STATS_THREAD_NAME "ACLK_Stats" + +extern netdata_mutex_t aclk_stats_mutex; + +#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex) +#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex) + +extern int aclk_stats_enabled; + +struct aclk_stats_thread { + netdata_thread_t *thread; + int query_thread_count; +}; + +// preserve between samples +struct aclk_metrics { + volatile uint8_t online; +}; + +//mat = max average total +struct aclk_metric_mat_data { + volatile uint32_t total; + volatile uint32_t count; + volatile uint32_t max; +}; + +//mat = max average total +struct aclk_metric_mat { + char *name; + char *title; + RRDSET *st; + RRDDIM *rd_avg; + RRDDIM *rd_max; + RRDDIM *rd_total; + long prio; + char *unit; +}; + +extern struct aclk_mat_metrics { +#ifdef NETDATA_INTERNAL_CHECKS + struct aclk_metric_mat latency; +#endif + struct aclk_metric_mat cloud_q_db_query_time; + struct aclk_metric_mat cloud_q_recvd_to_processed; +} aclk_mat_metrics; + +void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); + +// reset to 0 on every sample +extern struct aclk_metrics_per_sample { + /* in the unlikely event of ACLK disconnecting + and reconnecting under 1 sampling rate + we want to make sure we record the disconnection + despite it being then seemingly longer in graph */ + volatile uint8_t offline_during_sample; + + volatile uint32_t queries_queued; + volatile uint32_t queries_dispatched; + + volatile uint32_t write_q_added; + volatile uint32_t write_q_consumed; + + volatile uint32_t read_q_added; + volatile uint32_t read_q_consumed; + + volatile uint32_t cloud_req_recvd; + volatile uint32_t cloud_req_err; + +#ifdef NETDATA_INTERNAL_CHECKS + struct aclk_metric_mat_data latency; +#endif + struct aclk_metric_mat_data cloud_q_db_query_time; + struct aclk_metric_mat_data cloud_q_recvd_to_processed; +} aclk_metrics_per_sample; + +extern uint32_t *aclk_queries_per_thread; + +void *aclk_stats_main_thread(void *ptr); +void aclk_stats_thread_cleanup(); +void aclk_stats_upd_online(int online); + +#endif /* NETDATA_ACLK_STATS_H */ diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c new file mode 100644 index 000000000..e51a01308 --- /dev/null +++ b/aclk/legacy/agent_cloud_link.c @@ -0,0 +1,1683 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" +#include "agent_cloud_link.h" +#include "aclk_lws_https_client.h" +#include "aclk_query.h" +#include "aclk_common.h" +#include "aclk_stats.h" + +#ifdef ENABLE_ACLK +#include +#endif + +int aclk_shutting_down = 0; + +// Other global state +static int aclk_subscribed = 0; +static int aclk_disable_single_updates = 0; +static char *aclk_username = NULL; +static char *aclk_password = NULL; + +static char *global_base_topic = NULL; +static int aclk_connecting = 0; +int aclk_force_reconnect = 0; // Indication from lower layers +usec_t aclk_session_us = 0; // Used by the mqtt layer +time_t aclk_session_sec = 0; // Used by the mqtt layer + +static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; + +#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) +#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) + +#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) +#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) + +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); +void aclk_lws_wss_destroy_context(); +/* + * Maintain a list of collectors and chart count + * If all the charts of a collector are deleted + * then a new metadata dataset must be send to the cloud + * + */ +struct _collector { + time_t created; + uint32_t count; //chart count + uint32_t hostname_hash; + uint32_t plugin_hash; + uint32_t module_hash; + char *hostname; + char *plugin_name; + char *module_name; + struct _collector *next; +}; + +struct _collector *collector_list = NULL; + +char *create_uuid() +{ + uuid_t uuid; + char *uuid_str = mallocz(36 + 1); + + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + + return uuid_str; +} + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, "msg-id")) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "type")) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "callback-topic")) { + data->callback_topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "payload")) { + if (likely(e->data.string)) { + size_t len = strlen(e->data.string); + data->payload = mallocz(len+1); + if (!url_decode_r(data->payload, e->data.string, len + 1)) + strcpy(data->payload, e->data.string); + } + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, "version")) { + data->version = e->data.number; + break; + } + if (!strcmp(e->name, "min-version")) { + data->min_version = e->data.number; + break; + } + if (!strcmp(e->name, "max-version")) { + data->max_version = e->data.number; + break; + } + + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + + +static RSA *aclk_private_key = NULL; +static int create_private_key() +{ + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + aclk_private_key = NULL; + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); + + long bytes_read; + char *private_key = read_by_filename(filename, &bytes_read); + if (!private_key) { + error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + return 1; + } + debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + + BIO *key_bio = BIO_new_mem_buf(private_key, -1); + if (key_bio==NULL) { + error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + goto biofailed; + } + + aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); + BIO_free(key_bio); + if (aclk_private_key!=NULL) + { + freez(private_key); + return 0; + } + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + +biofailed: + freez(private_key); + return 1; +} + +/* + * After a connection failure -- delay in milliseconds + * When a connection is established, the delay function + * should be called with + * + * mode 0 to reset the delay + * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * + */ +unsigned long int aclk_reconnect_delay(int mode) +{ + static int fail = -1; + unsigned long int delay; + + if (!mode || fail == -1) { + srandom(time(NULL)); + fail = mode - 1; + return 0; + } + + delay = (1 << fail); + + if (delay >= ACLK_MAX_BACKOFF_DELAY) { + delay = ACLK_MAX_BACKOFF_DELAY * 1000; + } else { + fail++; + delay = (delay * 1000) + (random() % 1000); + } + + return delay; +} + +// This will give the base topic that the agent will publish messages. +// subtopics will be sent under the base topic e.g. base_topic/subtopic +// This is called during the connection, we delete any previous topic +// in-case the user has changed the agent id and reclaimed. + +char *create_publish_base_topic() +{ + char *agent_id = is_agent_claimed(); + if (unlikely(!agent_id)) + return NULL; + + ACLK_LOCK; + + if (global_base_topic) + freez(global_base_topic); + char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; + + snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id); + tmp = strchr(tmp_topic, '\n'); + if (unlikely(tmp)) + *tmp = '\0'; + global_base_topic = strdupz(tmp_topic); + + ACLK_UNLOCK; + freez(agent_id); + return global_base_topic; +} + +/* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + int rc; + + if (likely(sub_topic && sub_topic[0] == '/')) + return sub_topic; + + if (unlikely(!global_base_topic)) + return sub_topic; + + rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + if (unlikely(rc >= max_size)) + debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic); + + return final_topic; +} + +#ifndef __GNUC__ +#pragma region ACLK Internal Collector Tracking +#endif + +/* + * Free a collector structure + */ + +static void _free_collector(struct _collector *collector) +{ + if (likely(collector->plugin_name)) + freez(collector->plugin_name); + + if (likely(collector->module_name)) + freez(collector->module_name); + + if (likely(collector->hostname)) + freez(collector->hostname); + + freez(collector); +} + +/* + * This will report the collector list + * + */ +#ifdef ACLK_DEBUG +static void _dump_collector_list() +{ + struct _collector *tmp_collector; + + COLLECTOR_LOCK; + + info("DUMPING ALL COLLECTORS"); + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + info("DUMPING ALL COLLECTORS -- nothing found"); + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + + while (tmp_collector) { + info( + "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, + tmp_collector->plugin_name ? tmp_collector->plugin_name : "", + tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); + + tmp_collector = tmp_collector->next; + } + info("DUMPING ALL COLLECTORS DONE"); + COLLECTOR_UNLOCK; +} +#endif + +/* + * This will cleanup the collector list + * + */ +static void _reset_collector_list() +{ + struct _collector *tmp_collector, *next_collector; + + COLLECTOR_LOCK; + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + collector_list->count = 0; + collector_list->next = NULL; + + // We broke the link; we can unlock + COLLECTOR_UNLOCK; + + while (tmp_collector) { + next_collector = tmp_collector->next; + _free_collector(tmp_collector); + tmp_collector = next_collector; + } +} + +/* + * Find a collector (if it exists) + * Must lock before calling this + * If last_collector is not null, it will return the previous collector in the linked + * list (used in collector delete) + */ +static struct _collector *_find_collector( + const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) +{ + struct _collector *tmp_collector, *prev_collector; + uint32_t plugin_hash; + uint32_t module_hash; + uint32_t hostname_hash; + + if (unlikely(!collector_list)) { + collector_list = callocz(1, sizeof(struct _collector)); + return NULL; + } + + if (unlikely(!collector_list->next)) + return NULL; + + plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + module_hash = module_name ? simple_hash(module_name) : 1; + hostname_hash = simple_hash(hostname); + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + prev_collector = collector_list; + while (tmp_collector) { + if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && + hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && + (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && + (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { + if (unlikely(last_collector)) + *last_collector = prev_collector; + + return tmp_collector; + } + + prev_collector = tmp_collector; + tmp_collector = tmp_collector->next; + } + + return tmp_collector; +} + +/* + * Called to delete a collector + * It will reduce the count (chart_count) and will remove it + * from the linked list if the count reaches zero + * The structure will be returned to the caller to free + * the resources + * + */ +static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector, *prev_collector = NULL; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); + + if (likely(tmp_collector)) { + --tmp_collector->count; + if (unlikely(!tmp_collector->count)) + prev_collector->next = tmp_collector->next; + } + return tmp_collector; +} + +/* + * Add a new collector (plugin / module) to the list + * If it already exists just update the chart count + * + * Lock before calling + */ +static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); + + if (unlikely(!tmp_collector)) { + tmp_collector = callocz(1, sizeof(struct _collector)); + tmp_collector->hostname_hash = simple_hash(hostname); + tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; + + tmp_collector->hostname = strdupz(hostname); + tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; + tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; + + tmp_collector->next = collector_list->next; + collector_list->next = tmp_collector; + } + tmp_collector->count++; + debug( + D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", + module_name ? module_name : "*", tmp_collector->count); + return tmp_collector; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +/* Avoids the need to scan trough all RRDHOSTS + * every time any Query Thread Wakes Up + * (every time we need to check child popcorn expiry) + * call with ACLK_SHARED_STATE_LOCK held + */ +void aclk_update_next_child_to_popcorn(void) +{ + RRDHOST *host; + int any = 0; + + rrd_rdlock(); + rrdhost_foreach_read(host) { + if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) + continue; + + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); + continue; + } + + any = 1; + + if (unlikely(!aclk_shared_state.next_popcorn_host)) { + aclk_shared_state.next_popcorn_host = host; + rrdhost_aclk_state_unlock(host); + continue; + } + + if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) + aclk_shared_state.next_popcorn_host = host; + + rrdhost_aclk_state_unlock(host); + } + if(!any) + aclk_shared_state.next_popcorn_host = NULL; + + rrd_unlock(); +} + +/* If popcorning bump timer. + * If popcorning or initializing (host not stable) return 1 + * Otherwise return 0 + */ +static int aclk_popcorn_check_bump(RRDHOST *host) +{ + time_t now = now_monotonic_sec(); + int updated = 0, ret; + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + + ret = ACLK_IS_HOST_INITIALIZING(host); + if (unlikely(ACLK_IS_HOST_POPCORNING(host))) { + if(now != host->aclk_state.t_last_popcorn_update) { + updated = 1; + info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + } + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + + if (host != localhost && updated) + aclk_update_next_child_to_popcorn(); + + ACLK_SHARED_STATE_UNLOCK; + return ret; + } + + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return ret; +} + +inline static int aclk_host_initializing(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + int ret = ACLK_IS_HOST_INITIALIZING(host); + rrdhost_aclk_state_unlock(host); + return ret; +} + +static void aclk_start_host_popcorning(RRDHOST *host) +{ + usec_t now = now_monotonic_sec(); + info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { + errno = 0; + error("Localhost is allowed to do popcorning only once after startup!"); + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return; + } + + host->aclk_state.state = ACLK_HOST_INITIALIZING; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + if (host != localhost) + aclk_update_next_child_to_popcorn(); + ACLK_SHARED_STATE_UNLOCK; +} + +static void aclk_stop_host_popcorning(RRDHOST *host) +{ + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return; + } + + info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid); + host->aclk_state.t_last_popcorn_update = 0; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(host); + + if(host == aclk_shared_state.next_popcorn_host) { + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); + } + ACLK_SHARED_STATE_UNLOCK; +} + +/* + * Add a new collector to the list + * If it exists, update the chart count + */ +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(tmp_collector->count != 1)) { + COLLECTOR_UNLOCK; + return; + } + + COLLECTOR_UNLOCK; + + if(aclk_popcorn_check_bump(host)) + return; + + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); +} + +/* + * Delete a collector from the list + * If the chart count reaches zero the collector will be removed + * from the list by calling del_collector. + * + * This function will release the memory used and schedule + * a cloud update + */ +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(!tmp_collector || tmp_collector->count)) { + COLLECTOR_UNLOCK; + return; + } + + debug( + D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", + tmp_collector->count); + + COLLECTOR_UNLOCK; + + _free_collector(tmp_collector); + + if (aclk_popcorn_check_bump(host)) + return; + + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); +} + +static void aclk_graceful_disconnect() +{ + size_t write_q, write_q_bytes, read_q; + time_t event_loop_timeout; + + // Send a graceful disconnect message + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}"); + aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); + buffer_free(b); + + event_loop_timeout = now_realtime_sec() + 5; + write_q = 1; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + + aclk_shutting_down = 1; + _link_shutdown(); + aclk_lws_wss_mqtt_layer_disconect_notif(); + + write_q = 1; + event_loop_timeout = now_realtime_sec() + 5; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + aclk_shutting_down = 0; +} + +#ifndef __GNUC__ +#pragma region Incoming Msg Parsing +#endif + +struct dictionary_singleton { + char *key; + char *result; +}; + +int json_extract_singleton(JSON_ENTRY *e) +{ + struct dictionary_singleton *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, data->key)) { + data->result = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + case JSON_BOOLEAN: + case JSON_NULL: + break; + } + return 0; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + + +#ifndef __GNUC__ +#pragma region Challenge Response +#endif + +// Base-64 decoder. +// Note: This is non-validating, invalid input will be decoded without an error. +// Challenges are packed into json strings so we don't skip newlines. +// Size errors (i.e. invalid input size or insufficient output space) are caught. +size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size) +{ + static char lookup[256]; + static int first_time=1; + if (first_time) + { + first_time = 0; + for(int i=0; i<256; i++) + lookup[i] = -1; + for(int i='A'; i<='Z'; i++) + lookup[i] = i-'A'; + for(int i='a'; i<='z'; i++) + lookup[i] = i-'a' + 26; + for(int i='0'; i<='9'; i++) + lookup[i] = i-'0' + 52; + lookup['+'] = 62; + lookup['/'] = 63; + } + if ((input_size & 3) != 0) + { + error("Can't decode base-64 input length %zu", input_size); + return 0; + } + size_t unpadded_size = (input_size/4) * 3; + if ( unpadded_size > output_size ) + { + error("Output buffer size %zu is too small to decode %zu into", output_size, input_size); + return 0; + } + // Don't check padding within full quantums + for (size_t i = 0 ; i < input_size-4 ; i+=4 ) + { + uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + output += 3; + input += 4; + } + // Handle padding only in last quantum + if (input[2] == '=') { + uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]]; + output[0] = value >> 4; + //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]); + return unpadded_size-2; + } + else if (input[3] == '=') { + uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]]; + output[0] = value >> 10; + output[1] = value >> 2; + //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]); + return unpadded_size-1; + } + else + { + uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + return unpadded_size; + } +} + +size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size) +{ + uint32_t value; + static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + if ((input_size/3+1)*4 >= output_size) + { + error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size); + return 0; + } + size_t count = 0; + while (input_size>3) + { + value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff; + output[0] = lookup[value >> 18]; + output[1] = lookup[(value >> 12) & 0x3f]; + output[2] = lookup[(value >> 6) & 0x3f]; + output[3] = lookup[value & 0x3f]; + //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + output += 4; + input += 3; + input_size -= 3; + count += 4; + } + switch (input_size) + { + case 2: + value = (input[0] << 10) + (input[1] << 2); + output[0] = lookup[(value >> 12) & 0x3f]; + output[1] = lookup[(value >> 6) & 0x3f]; + output[2] = lookup[value & 0x3f]; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 1: + value = input[0] << 4; + output[0] = lookup[(value >> 6) & 0x3f]; + output[1] = lookup[value & 0x3f]; + output[2] = '='; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 0: + break; + } + return count; +} + + + +int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted) +{ + int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING); + if (result == -1) { + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Decryption of the challenge failed: %s", err); + } + return result; +} + +void aclk_get_challenge(char *aclk_hostname, int port) +{ + char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + debug(D_ACLK, "Performing challenge-response sequence"); + if (aclk_password != NULL) + { + freez(aclk_password); + aclk_password = NULL; + } + // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge + // TODO - target host? + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + { + error("Agent was not claimed - cannot perform challenge/response"); + goto CLEANUP; + } + char url[1024]; + sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id); + info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url); + if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL)) + { + error("Challenge failed: %s", data_buffer); + goto CLEANUP; + } + struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; + + debug(D_ACLK, "Challenge response from cloud: %s", data_buffer); + if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK) + { + freez(challenge.result); + error("Could not parse the json response with the challenge: %s", data_buffer); + goto CLEANUP; + } + if (challenge.result == NULL ) { + error("Could not retrieve challenge from auth response: %s", data_buffer); + goto CLEANUP; + } + + + size_t challenge_len = strlen(challenge.result); + unsigned char decoded[512]; + size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); + + unsigned char plaintext[4096]={}; + int decrypted_length = private_decrypt(decoded, decoded_len, plaintext); + freez(challenge.result); + char encoded[512]; + size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); + encoded[encoded_len] = 0; + debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded); + + char response_json[4096]={}; + sprintf(response_json, "{\"response\":\"%s\"}", encoded); + debug(D_ACLK, "Password phase: %s",response_json); + // TODO - host + sprintf(url, "/api/v1/auth/node/%s/password", agent_id); + if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json)) + { + error("Challenge-response failed: %s", data_buffer); + goto CLEANUP; + } + + debug(D_ACLK, "Password response from cloud: %s", data_buffer); + + struct dictionary_singleton password = { .key = "password", .result = NULL }; + if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK) + { + freez(password.result); + error("Could not parse the json response with the password: %s", data_buffer); + goto CLEANUP; + } + + if (password.result == NULL ) { + error("Could not retrieve password from auth response"); + goto CLEANUP; + } + if (aclk_password != NULL ) + freez(aclk_password); + aclk_password = password.result; + if (aclk_username != NULL) + freez(aclk_username); + aclk_username = agent_id; + agent_id = NULL; + +CLEANUP: + if (agent_id != NULL) + freez(agent_id); + freez(data_buffer); + return; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +static void aclk_try_to_connect(char *hostname, int port) +{ + int rc; + +// this is usefull for developers working on ACLK +// allows connecting agent to any MQTT broker +// for debugging, development and testing purposes +#ifndef ACLK_DISABLE_CHALLENGE + if (!aclk_private_key) { + error("Cannot try to establish the agent cloud link - no private key available!"); + return; + } +#endif + + info("Attempting to establish the agent cloud link"); +#ifdef ACLK_DISABLE_CHALLENGE + error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing " + "and development purposes only. Warranty void. Won't be able " + "to connect to Netdata Cloud."); + if (aclk_password == NULL) + aclk_password = strdupz("anon"); +#else + aclk_get_challenge(hostname, port); + if (aclk_password == NULL) + return; +#endif + + aclk_connecting = 1; + create_publish_base_topic(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = 0; + ACLK_SHARED_STATE_UNLOCK; + + rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password); + if (unlikely(rc)) { + error("Failed to initialize the agent cloud link library"); + } +} + +// Sends "hello" message to negotiate ACLK version with cloud +static inline void aclk_hello_msg() +{ + BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + + char *msg_id = create_uuid(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + ACLK_SHARED_STATE_UNLOCK; + + //Hello message is versioned separatelly from the rest of the protocol + aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); + buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX); + aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id); + freez(msg_id); + buffer_free(buf); +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + struct aclk_query_threads query_threads; + struct aclk_stats_thread *stats_thread = NULL; + time_t last_periodic_query_wakeup = 0; + + query_threads.thread_list = NULL; + + // This thread is unusual in that it cannot be cancelled by cancel_main_threads() + // as it must notify the far end that it shutdown gracefully and avoid the LWT. + netdata_thread_disable_cancelability(); + +#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK) + info("Killing ACLK thread -> cloud functionality has been disabled"); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +#endif + +#ifndef LWS_WITH_SOCKS5 + ACLK_PROXY_TYPE proxy_type; + aclk_get_proxy(&proxy_type); + if(proxy_type == PROXY_TYPE_SOCKS5) { + error("Disabling ACLK due to requested SOCKS5 proxy."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } +#endif + + info("Waiting for netdata to be ready"); + while (!netdata_ready) { + sleep_usec(USEC_PER_MS * 300); + } + + info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_setting) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) { + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } + } + + query_threads.count = MIN(processors/2, 6); + query_threads.count = MAX(query_threads.count, 2); + query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); + if(query_threads.count < 1) { + error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count); + query_threads.count = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); + } + + //start localhost popcorning + aclk_start_host_popcorning(localhost); + + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); + if (aclk_stats_enabled) { + stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); + stats_thread->thread = mallocz(sizeof(netdata_thread_t)); + stats_thread->query_thread_count = query_threads.count; + netdata_thread_create( + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread); + } + + char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering. + int port_num = 0; + info("Waiting for netdata to be claimed"); + while(1) { + char *agent_id = is_agent_claimed(); + while (likely(!agent_id)) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + goto exited; + agent_id = is_agent_claimed(); + } + freez(agent_id); + // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. + // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); + goto exited; + } + if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num)) + error("Agent is claimed but the configuration is invalid, please fix"); + else if (!create_private_key() && !_mqtt_lib_init()) + break; + + for (int i=0; i<60; i++) { + if (netdata_exit) + goto exited; + + sleep_usec(USEC_PER_SEC * 1); + } + } + + usec_t reconnect_expiry = 0; // In usecs + + while (!netdata_exit) { + static int first_init = 0; + /* size_t write_q, write_q_bytes, read_q; + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ + + if (aclk_disable_runtime && !aclk_connected) { + sleep(1); + continue; + } + + if (aclk_kill_link) { // User has reloaded the claiming state + aclk_kill_link = 0; + aclk_graceful_disconnect(); + create_private_key(); + continue; + } + + if (aclk_force_reconnect) { + aclk_lws_wss_destroy_context(); + aclk_force_reconnect = 0; + } + if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) { + if (unlikely(!first_init)) { + aclk_try_to_connect(aclk_hostname, port_num); + first_init = 1; + } else { + if (aclk_connecting == 0) { + if (reconnect_expiry == 0) { + unsigned long int delay = aclk_reconnect_delay(1); + reconnect_expiry = now_realtime_usec() + delay * 1000; + info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0); + } + if (now_realtime_usec() >= reconnect_expiry) { + reconnect_expiry = 0; + aclk_try_to_connect(aclk_hostname, port_num); + } + sleep_usec(USEC_PER_MS * 100); + } + } + if (aclk_connecting) { + _link_event_loop(); + sleep_usec(USEC_PER_MS * 100); + } + continue; + } + + _link_event_loop(); + if (unlikely(!aclk_connected || aclk_force_reconnect)) + continue; + /*static int stress_counter = 0; + if (write_q_bytes==0 && stress_counter ++ >5) + { + aclk_send_stress_test(8000000); + stress_counter = 0; + }*/ + + if (unlikely(!aclk_subscribed)) { + aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1); + aclk_hello_msg(); + } + + if (unlikely(!query_threads.thread_list)) { + aclk_query_threads_start(&query_threads); + } + + time_t now = now_monotonic_sec(); + if(aclk_connected && last_periodic_query_wakeup < now) { + // to make `aclk_queue_query()` param `run_after` work + // also makes per child popcorning work + last_periodic_query_wakeup = now; + QUERY_THREAD_WAKEUP; + } + } // forever +exited: + // Wakeup query thread to cleanup + QUERY_THREAD_WAKEUP_ALL; + + freez(aclk_username); + freez(aclk_password); + freez(aclk_hostname); + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + char *agent_id = is_agent_claimed(); + if (agent_id && aclk_connected) { + freez(agent_id); + // Wakeup thread to cleanup + QUERY_THREAD_WAKEUP; + aclk_graceful_disconnect(); + } + + aclk_query_threads_cleanup(&query_threads); + + _reset_collector_list(); + freez(collector_list); + + if(aclk_stats_enabled) { + netdata_thread_join(*stats_thread->thread, NULL); + aclk_stats_thread_cleanup(); + freez(stats_thread->thread); + freez(stats_thread); + } + + /* + * this must be last -> if all static threads signal + * THREAD_EXITED rrdengine will dealloc the RRDSETs + * and RRDDIMs that are used by still runing stat thread. + * see netdata_cleanup_and_exit() for reference + */ + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +} + +/* + * Send a message to the cloud, using a base topic and sib_topic + * The final topic will be in the form / + * If base_topic is missing then the global_base_topic will be used (if available) + * + */ +int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id) +{ + int rc; + int mid; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + UNUSED(msg_id); + + if (!aclk_connected) + return 0; + + if (unlikely(!message)) + return 0; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + ACLK_LOCK; + rc = _link_send_message(final_topic, message, len, &mid); + // TODO: link the msg_id with the mid so we can trace it + ACLK_UNLOCK; + + if (unlikely(rc)) { + errno = 0; + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + } + + return rc; +} + +int aclk_send_message(char *sub_topic, char *message, char *msg_id) +{ + return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id); +} + +/* + * Subscribe to a topic in the cloud + * The final subscription will be in the form + * /agent/claim_id/ + */ +int aclk_subscribe(char *sub_topic, int qos) +{ + int rc; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + if (!aclk_connected) { + error("Cannot subscribe to %s - not connected!", topic); + return 1; + } + + ACLK_LOCK; + rc = _link_subscribe(final_topic, qos); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) { + errno = 0; + error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc)); + } + + return rc; +} + +// This is called from a callback when the link goes up +void aclk_connect() +{ + info("Connection detected (%u queued queries)", aclk_query_size()); + + aclk_stats_upd_online(1); + + aclk_connected = 1; + aclk_reconnect_delay(0); + + QUERY_THREAD_WAKEUP; + return; +} + +// This is called from a callback when the link goes down +void aclk_disconnect() +{ + if (likely(aclk_connected)) + info("Disconnect detected (%u queued queries)", aclk_query_size()); + + aclk_stats_upd_online(0); + + aclk_subscribed = 0; + rrdhost_aclk_state_lock(localhost); + localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(localhost); + aclk_connected = 0; + aclk_connecting = 0; + aclk_force_reconnect = 1; +} + +inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + if (ts_secs == 0) { + ts_us = now_realtime_usec(); + ts_secs = ts_us / USEC_PER_SEC; + ts_us = ts_us % USEC_PER_SEC; + } + + buffer_sprintf( + dest, + "{\t\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"timestamp\": %ld,\n" + "\t\"timestamp-offset-usec\": %llu,\n" + "\t\"connect\": %ld,\n" + "\t\"connect-offset-usec\": %llu,\n" + "\t\"version\": %d", + type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version); + + debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs); +} + + +/* + * This will send alarm information which includes + * configured alarms + * alarm_log + * active alarms + */ +void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); + +void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + debug(D_ACLK, "Metadata alarms start"); + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + + if (metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + else + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + + buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); + health_alarms2json(localhost, local_buffer, 1); + debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len); + // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : "); + // health_alarm_log2json(localhost, local_buffer, 0); + // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len); + buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : "); + health_active_log_alarms_2json(localhost, local_buffer); + //debug(D_ACLK, "Metadata message %s", local_buffer->buffer); + + + + buffer_sprintf(local_buffer, "\n}\n}"); + aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); +} + +/* + * This will send the agent metadata + * /api/v1/info + * charts + */ +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + debug(D_ACLK, "Metadata /info start"); + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg); + else + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + buffer_sprintf(local_buffer, "{\n\t \"info\" : "); + web_client_api_request_v1_info_fill_buffer(host, local_buffer); + debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); + + buffer_sprintf(local_buffer, ", \n\t \"charts\" : "); + charts2json(host, local_buffer, 1, 0); + buffer_sprintf(local_buffer, "\n}\n}"); + debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len); + + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg); + + debug(D_ACLK, "Sending Child Disconnect"); + + char *msg_id = create_uuid(); + + aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg); + + buffer_strcat(local_buffer, ",\"payload\":"); + + buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid); + rrdhost_aclk_state_lock(host); + if(host->aclk_state.claimed_id) + buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id); + else + buffer_strcat(local_buffer, "null}}"); + + rrdhost_aclk_state_unlock(host); + + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +{ +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + return; +#else +#warning "This check became unnecessary. Remove" +#endif + + if (unlikely(aclk_host_initializing(localhost))) + return; + + switch (cmd) { + case ACLK_CMD_CHILD_CONNECT: + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + break; + case ACLK_CMD_CHILD_DISCONNECT: + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); + break; + default: + error("Unknown command for aclk_host_state_update %d.", (int)cmd); + } +} + +void aclk_send_stress_test(size_t size) +{ + char *buffer = mallocz(size); + if (buffer != NULL) + { + for(size_t i=0; icontenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + rrdset2json(st, local_buffer, NULL, NULL, 1); + buffer_sprintf(local_buffer, "\t\n}"); + + aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) +{ +#ifndef ENABLE_ACLK + UNUSED(host); + UNUSED(chart_name); + return 0; +#else + if (unlikely(!netdata_ready)) + return 0; + + if (!netdata_cloud_setting) + return 0; + + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) + return 0; + + if (aclk_host_initializing(localhost)) + return 0; + + if (unlikely(aclk_disable_single_updates)) + return 0; + + if (aclk_popcorn_check_bump(host)) + return 0; + + if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue chart_update command"); + } + } + + return 0; +#endif +} + +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +{ + BUFFER *local_buffer = NULL; + + if (unlikely(!netdata_ready)) + return 0; + + if (host != localhost) + return 0; + + if(unlikely(aclk_host_initializing(localhost))) + return 0; + + /* + * Check if individual updates have been disabled + * This will be the case when we do health reload + * and all the alarms will be dropped and recreated. + * At the end of the health reload the complete alarm metadata + * info will be sent + */ + if (unlikely(aclk_disable_single_updates)) + return 0; + + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *msg_id = create_uuid(); + + buffer_flush(local_buffer); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); + health_alarm_entry2json_nolock(local_buffer, ae, host); + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); + + buffer_sprintf(local_buffer, "\n}"); + + if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue alarm_command on alarm_update"); + } + } + + freez(msg_id); + buffer_free(local_buffer); + + return 0; +} diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h new file mode 100644 index 000000000..e777e0b19 --- /dev/null +++ b/aclk/legacy/agent_cloud_link.h @@ -0,0 +1,93 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_AGENT_CLOUD_LINK_H +#define NETDATA_AGENT_CLOUD_LINK_H + +#include "../../daemon/common.h" +#include "mqtt.h" +#include "aclk_common.h" + +#define ACLK_THREAD_NAME "ACLK_Query" +#define ACLK_CHART_TOPIC "outbound/meta" +#define ACLK_ALARMS_TOPIC "outbound/alarms" +#define ACLK_METADATA_TOPIC "outbound/meta" +#define ACLK_COMMAND_TOPIC "inbound/cmd" +#define ACLK_TOPIC_STRUCTURE "/agent/%s" + +#define ACLK_MAX_BACKOFF_DELAY 1024 // maximum backoff delay in seconds + +#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg) +#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds +#define ACLK_QOS 1 +#define ACLK_PING_INTERVAL 60 +#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop + +#define ACLK_MAX_TOPIC 255 + +#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff stragegy fow now +#define ACLK_DEFAULT_PORT 9002 +#define ACLK_DEFAULT_HOST "localhost" + +#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" + +struct aclk_request { + char *type_id; + char *msg_id; + char *callback_topic; + char *payload; + int version; + int min_version; + int max_version; +}; + +typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION; + +void *aclk_main(void *ptr); + +#define NETDATA_ACLK_HOOK \ + { .name = "ACLK_Main", \ + .config_section = NULL, \ + .config_name = NULL, \ + .enabled = 1, \ + .thread = NULL, \ + .init_routine = NULL, \ + .start_routine = aclk_main }, + +extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); +extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id); + +extern char *is_agent_claimed(void); +extern void aclk_lws_wss_mqtt_layer_disconect_notif(); +char *create_uuid(); + +// callbacks for agent cloud link +int aclk_subscribe(char *topic, int qos); +int cloud_to_agent_parse(JSON_ENTRY *e); +void aclk_disconnect(); +void aclk_connect(); + +int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host); +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); +void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); + +int aclk_wait_for_initialization(); +char *create_publish_base_topic(); + +int aclk_send_single_chart(RRDHOST *host, char *chart); +int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); +void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version); +int aclk_handle_cloud_message(char *payload); +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_alarm_reload(); +unsigned long int aclk_reconnect_delay(int mode); +extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); +void aclk_single_update_enable(); +void aclk_single_update_disable(); + +void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd); +int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd); +void aclk_update_next_child_to_popcorn(void); + +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c new file mode 100644 index 000000000..6f38a20dc --- /dev/null +++ b/aclk/legacy/mqtt.c @@ -0,0 +1,366 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "../../daemon/common.h" +#include "mqtt.h" +#include "aclk_lws_wss_client.h" +#include "aclk_stats.h" +#include "aclk_rx_msgs.h" + +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; + +inline const char *_link_strerror(int rc) +{ + return mosquitto_strerror(rc); +} + +#ifdef NETDATA_INTERNAL_CHECKS +static struct timeval sendTimes[1024]; +#endif + +static struct mosquitto *mosq = NULL; + + +void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) +{ + UNUSED(mosq); + UNUSED(obj); + + aclk_handle_cloud_message(msg->payload); +} + +void publish_callback(struct mosquitto *mosq, void *obj, int rc) +{ + UNUSED(mosq); + UNUSED(obj); + UNUSED(rc); +#ifdef NETDATA_INTERNAL_CHECKS + struct timeval now, *orig; + now_realtime_timeval(&now); + orig = &sendTimes[ rc & 0x3ff ]; + int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec); + diff /= 1000; + + info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff); + + aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff); +#endif + return; +} + +void connect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + UNUSED(mosq); + UNUSED(obj); + UNUSED(rc); + + info("Connection to cloud estabilished"); + aclk_connect(); + + return; +} + +void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + UNUSED(mosq); + UNUSED(obj); + UNUSED(rc); + + if (netdata_exit) + info("Connection to cloud terminated due to agent shutdown"); + else { + errno = 0; + error("Connection to cloud failed"); + } + aclk_disconnect(); + + aclk_lws_wss_mqtt_layer_disconect_notif(); + + return; +} + +void _show_mqtt_info() +{ + int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + + info( + "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, + libmosq_revision); +} + +size_t _mqtt_external_write_hook(void *buf, size_t count) +{ + return aclk_lws_wss_client_write(buf, count); +} + +size_t _mqtt_external_read_hook(void *buf, size_t count) +{ + return aclk_lws_wss_client_read(buf, count); +} + +int _mqtt_lib_init() +{ + int rc; + //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; + /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version. + char *ca_crt; + char *server_crt; + char *server_key; + + // show library info so can have it in the logfile + //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); + ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*"); + server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*"); + server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*"); + + if (ca_crt[0] == '*') { + freez(ca_crt); + ca_crt = NULL; + } + + if (server_crt[0] == '*') { + freez(server_crt); + server_crt = NULL; + } + + if (server_key[0] == '*') { + freez(server_key); + server_key = NULL; + } + */ + + // info( + // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, + // libmosq_revision); + + rc = mosquitto_lib_init(); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + error("Failed to initialize MQTT (libmosquitto library)"); + return 1; + } + return 0; +} + +static int _mqtt_create_connection(char *username, char *password) +{ + if (mosq != NULL) + mosquitto_destroy(mosq); + mosq = mosquitto_new(username, true, NULL); + if (unlikely(!mosq)) { + mosquitto_lib_cleanup(); + error("MQTT new structure -- %s", mosquitto_strerror(errno)); + return MOSQ_ERR_UNKNOWN; + } + + // Record the session start time to allow a nominal LWT timestamp + usec_t now = now_realtime_usec(); + aclk_session_sec = now / USEC_PER_SEC; + aclk_session_us = now % USEC_PER_SEC; + + _link_set_lwt("outbound/meta", 2); + + mosquitto_connect_callback_set(mosq, connect_callback); + mosquitto_disconnect_callback_set(mosq, disconnect_callback); + mosquitto_publish_callback_set(mosq, publish_callback); + + info("Using challenge-response: %s / %s", username, password); + mosquitto_username_pw_set(mosq, username, password); + + int rc = mosquitto_threaded_set(mosq, 1); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc)); + +#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000 + rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc)); + + rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1); + info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); +#endif + + return rc; +} + +static int _link_mqtt_connect(char *aclk_hostname, int aclk_port) +{ + int rc; + + rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + error( + "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc, + mosquitto_strerror(rc)); + else + info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port); + + return rc; +} + +static inline void _link_mosquitto_write() +{ + int rc; + + if (unlikely(!mosq)) { + return; + } + + rc = mosquitto_loop_misc(mosq); + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc)); + + if (likely(mosquitto_want_write(mosq))) { + rc = mosquitto_loop_write(mosq, 1); + if (rc != MOSQ_ERR_SUCCESS) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc)); + } +} + +void aclk_lws_connection_established(char *hostname, int port) +{ + _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected. + _link_mosquitto_write(); +} + +void aclk_lws_connection_data_received() +{ + int rc = mosquitto_loop_read(mosq, 1); + if (rc != MOSQ_ERR_SUCCESS) + debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc)); +} + +void aclk_lws_connection_closed() +{ + aclk_disconnect(); + +} + + +int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password) +{ + if(aclk_lws_wss_connect(aclk_hostname, aclk_port)) + return MOSQ_ERR_UNKNOWN; + aclk_lws_wss_service_loop(); + + int rc = _mqtt_create_connection(username, password); + if (rc!= MOSQ_ERR_SUCCESS) + return rc; + + mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook); + return rc; +} + +inline int _link_event_loop() +{ + + // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1). + _link_mosquitto_write(); + aclk_lws_wss_service_loop(); + + // this is because if use LWS we don't want + // mqtt to reconnect by itself + return MOSQ_ERR_SUCCESS; +} + +void _link_shutdown() +{ + int rc; + + if (likely(!mosq)) + return; + + rc = mosquitto_disconnect(mosq); + switch (rc) { + case MOSQ_ERR_SUCCESS: + info("MQTT disconnected from broker"); + break; + default: + info("MQTT invalid structure"); + break; + }; +} + + +int _link_set_lwt(char *sub_topic, int qos) +{ + int rc; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION); + buffer_strcat(b, ", \"payload\": \"unexpected\" }"); + rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); + buffer_free(b); + + return rc; +} + +int _link_subscribe(char *topic, int qos) +{ + int rc; + + if (unlikely(!mosq)) + return 1; + + mosquitto_message_callback_set(mosq, mqtt_message_callback); + + rc = mosquitto_subscribe(mosq, NULL, topic, qos); + if (unlikely(rc)) { + errno = 0; + error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc)); + return 1; + } + + _link_mosquitto_write(); + return 0; +} + +/* + * Send a message to the cloud to specific topic + * + */ + +int _link_send_message(char *topic, const void *message, size_t len, int *mid) +{ + int rc; + size_t write_q, write_q_bytes, read_q; + + rc = mosquitto_pub_topic_check(topic); + + if (unlikely(rc != MOSQ_ERR_SUCCESS)) + return rc; + + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + rc = mosquitto_publish(mosq, mid, topic, len, message, ACLK_QOS, 0); + +#ifdef NETDATA_INTERNAL_CHECKS + char msg_head[64]; + memset(msg_head, 0, sizeof(msg_head)); + strncpy(msg_head, (char*)message, 60); + for (size_t i = 0; i < sizeof(msg_head); i++) + if(msg_head[i] == '\n') msg_head[i] = ' '; + info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", (int)len, + *mid, write_q, write_q_bytes, read_q, msg_head); + now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]); +#endif + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc != MOSQ_ERR_SUCCESS)) { + errno = 0; + error("MQTT message failed : %s", mosquitto_strerror(rc)); + } + _link_mosquitto_write(); + return rc; +} diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h new file mode 100644 index 000000000..cc4765d62 --- /dev/null +++ b/aclk/legacy/mqtt.h @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_MQTT_H +#define NETDATA_MQTT_H + +#ifdef ENABLE_ACLK +#include "externaldeps/mosquitto/mosquitto.h" +#endif + +void _show_mqtt_info(); +int _link_event_loop(); +void _link_shutdown(); +int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password); +//int _link_lib_init(); +int _mqtt_lib_init(); +int _link_subscribe(char *topic, int qos); +int _link_send_message(char *topic, const void *message, size_t len, int *mid); +const char *_link_strerror(int rc); +int _link_set_lwt(char *topic, int qos); + + +int aclk_handle_cloud_message(char *); +extern char *get_topic(char *sub_topic, char *final_topic, int max_size); + +#endif //NETDATA_MQTT_H diff --git a/aclk/legacy/tests/fake-charts.d.plugin b/aclk/legacy/tests/fake-charts.d.plugin new file mode 100644 index 000000000..a13c6bab8 --- /dev/null +++ b/aclk/legacy/tests/fake-charts.d.plugin @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +sleep 45 # Wait until popcorning finishes + +echo "CHART aclk_test.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1" +sleep 5 +echo "DIMENSION aclk1 '' percentage-of-absolute 1 1" +sleep 5 +echo "BEGIN aclk_test.newcol 1000000" +echo "SET aclk1 = 3" +echo "END" +sleep 5 +echo "DIMENSION aclk2 '' percentage-of-absolute 1 1" +sleep 5 +echo "BEGIN aclk_test.newcol 1000000" +echo "SET aclk1 = 3" +echo "SET aclk2 = 3" +echo "END" +sleep 5 +echo "CHART aclk_test2.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1" +echo "DIMENSION aclk1 '' percentage-of-absolute 1 1" + +sleep 5 +exit 0 # Signal that we are done diff --git a/aclk/legacy/tests/install-fake-charts.d.sh.in b/aclk/legacy/tests/install-fake-charts.d.sh.in new file mode 100644 index 000000000..ac002a2bd --- /dev/null +++ b/aclk/legacy/tests/install-fake-charts.d.sh.in @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +TARGET="@pluginsdir_POST@" +BASE="$(cd "$(dirname "$0")" && pwd)" + +cp "$BASE/fake-charts.d.plugin" "$TARGET/charts.d.plugin" diff --git a/aclk/legacy/tests/launch-paho.sh b/aclk/legacy/tests/launch-paho.sh new file mode 100755 index 000000000..1c2cb5f2c --- /dev/null +++ b/aclk/legacy/tests/launch-paho.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client +docker run -it paho-client diff --git a/aclk/legacy/tests/paho-inspection.py b/aclk/legacy/tests/paho-inspection.py new file mode 100644 index 000000000..20ab523d4 --- /dev/null +++ b/aclk/legacy/tests/paho-inspection.py @@ -0,0 +1,59 @@ +import ssl +import paho.mqtt.client as mqtt +import json +import time +import sys + +def on_connect(mqttc, obj, flags, rc): + if rc==0: + print("Successful connection", flush=True) + else : + print(f"Connection error rc={rc}", flush=True) + mqttc.subscribe("/agent/#",0) + +def on_disconnect(mqttc, obj, flags, rc): + print("disconnected rc: "+str(rc), flush=True) + +def on_message(mqttc, obj, msg): + print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True) + try: + print(f"Trying decode of {msg.payload[:60]}",flush=True) + api_msg = json.loads(msg.payload) + except Exception as e: + print(e,flush=True) + return + ts = api_msg["timestamp"] + mtype = api_msg["type"] + print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True) + now = time.time() + print(f"Current {now} -> Delay {now-ts}", flush=True) + if mtype=="disconnect": + print(f"Message dump: {api_msg}", flush=True) + +def on_publish(mqttc, obj, mid): + print("mid: "+str(mid), flush=True) + +def on_subscribe(mqttc, obj, mid, granted_qos): + print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True) + +def on_log(mqttc, obj, level, string): + print(string) + +print(f"Starting paho-inspection on {sys.argv[1]}", flush=True) +mqttc = mqtt.Client(transport='websockets',client_id="paho") +#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None) +mqttc.tls_insecure_set(True) +mqttc.on_message = on_message +mqttc.on_connect = on_connect +mqttc.on_disconnect = on_disconnect +mqttc.on_publish = on_publish +mqttc.on_subscribe = on_subscribe +mqttc.username_pw_set("paho","paho") +mqttc.connect(sys.argv[1], 8443, 60) + +#mqttc.publish("/agent/mine","Test1") +#mqttc.subscribe("$SYS/#", 0) +print("Connected succesfully, monitoring /agent/#", flush=True) +mqttc.loop_forever() diff --git a/aclk/legacy/tests/paho.Dockerfile b/aclk/legacy/tests/paho.Dockerfile new file mode 100644 index 000000000..d67cc4cb0 --- /dev/null +++ b/aclk/legacy/tests/paho.Dockerfile @@ -0,0 +1,14 @@ +FROM archlinux/base:latest + +RUN pacman -Syyu --noconfirm +RUN pacman --noconfirm --needed -S python-pip + +RUN pip install paho-mqtt + +RUN mkdir -p /opt/paho +COPY paho-inspection.py /opt/paho/ + +WORKDIR /opt/paho +ARG HOST_HOSTNAME +RUN echo $HOST_HOSTNAME >host +CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"] -- cgit v1.2.3