summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
commitbe1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /aclk/aclk.c
parentInitial commit. (diff)
downloadnetdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz
netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c1361
1 files changed, 1361 insertions, 0 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
new file mode 100644
index 00000000..e95d7d6a
--- /dev/null
+++ b/aclk/aclk.c
@@ -0,0 +1,1361 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk.h"
+
+#ifdef ENABLE_ACLK
+#include "aclk_stats.h"
+#include "mqtt_wss_client.h"
+#include "aclk_otp.h"
+#include "aclk_tx_msgs.h"
+#include "aclk_query.h"
+#include "aclk_query_queue.h"
+#include "aclk_util.h"
+#include "aclk_rx_msgs.h"
+#include "https_client.h"
+#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_capas.h"
+
+#include "aclk_proxy.h"
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#endif
+
+#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+
+#endif /* ENABLE_ACLK */
+
+int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+int aclk_rcvd_cloud_msgs = 0;
+int aclk_connection_counter = 0;
+int disconnect_req = 0;
+
+int aclk_connected = 0;
+int aclk_ctx_based = 0;
+int aclk_disable_runtime = 0;
+int aclk_stats_enabled;
+int aclk_kill_link = 0;
+
+usec_t aclk_session_us = 0;
+time_t aclk_session_sec = 0;
+
+time_t last_conn_time_mqtt = 0;
+time_t last_conn_time_appl = 0;
+time_t last_disconnect_time = 0;
+time_t next_connection_attempt = 0;
+float last_backoff_value = 0;
+
+time_t aclk_block_until = 0;
+
+#ifdef ENABLE_ACLK
+mqtt_wss_client mqttwss_client;
+
+netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+
+struct aclk_shared_state aclk_shared_state = {
+ .mqtt_shutdown_msg_id = -1,
+ .mqtt_shutdown_msg_rcvd = 0
+};
+
+#ifdef MQTT_WSS_DEBUG
+#include <openssl/ssl.h>
+#define DEFAULT_SSKEYLOGFILE_NAME "SSLKEYLOGFILE"
+const char *ssl_log_filename = NULL;
+FILE *ssl_log_file = NULL;
+static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line)
+{
+ (void)ssl;
+ if (!ssl_log_file)
+ ssl_log_file = fopen(ssl_log_filename, "a");
+ if (!ssl_log_file) {
+ netdata_log_error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
+ return;
+ }
+ fputs(line, ssl_log_file);
+ putc('\n', ssl_log_file);
+ fflush(ssl_log_file);
+}
+#endif
+
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+OSSL_DECODER_CTX *aclk_dctx = NULL;
+EVP_PKEY *aclk_private_key = NULL;
+#else
+static RSA *aclk_private_key = NULL;
+#endif
+static int load_private_key()
+{
+ if (aclk_private_key != NULL) {
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ EVP_PKEY_free(aclk_private_key);
+ if (aclk_dctx)
+ OSSL_DECODER_CTX_free(aclk_dctx);
+
+ aclk_dctx = NULL;
+#else
+ RSA_free(aclk_private_key);
+#endif
+ }
+ aclk_private_key = NULL;
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
+
+ long bytes_read;
+ char *private_key = read_by_filename(filename, &bytes_read);
+ if (!private_key) {
+ netdata_log_error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
+ return 1;
+ }
+ netdata_log_debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
+
+ BIO *key_bio = BIO_new_mem_buf(private_key, -1);
+ if (key_bio==NULL) {
+ netdata_log_error("Claimed agent cannot establish ACLK - failed to create BIO for key");
+ goto biofailed;
+ }
+
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL,
+ "RSA",
+ OSSL_KEYMGMT_SELECT_PRIVATE_KEY,
+ NULL, NULL);
+
+ if (!aclk_dctx) {
+ netdata_log_error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
+ goto biofailed;
+ }
+
+ // this is necesseary to avoid RSA key with wrong size
+ if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
+ netdata_log_error("Decoding private key (from claiming) failed - invalid format.");
+ goto biofailed;
+ }
+#else
+ aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
+#endif
+ BIO_free(key_bio);
+ if (aclk_private_key!=NULL)
+ {
+ freez(private_key);
+ return 0;
+ }
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ netdata_log_error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
+
+biofailed:
+ freez(private_key);
+ return 1;
+}
+
+static int wait_till_cloud_enabled()
+{
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "Waiting for Cloud to be enabled");
+
+ while (!netdata_cloud_enabled) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (!service_running(SERVICE_ACLK))
+ return 1;
+ }
+ return 0;
+}
+
+/**
+ * Will block until agent is claimed. Returns only if agent claimed
+ * or if agent needs to shutdown.
+ *
+ * @return `0` if agent has been claimed,
+ * `1` if interrupted due to agent shutting down
+ */
+static int wait_till_agent_claimed(void)
+{
+ //TODO prevent malloc and freez
+ char *agent_id = get_agent_claimid();
+ while (likely(!agent_id)) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (!service_running(SERVICE_ACLK))
+ return 1;
+ agent_id = get_agent_claimid();
+ }
+ freez(agent_id);
+ return 0;
+}
+
+/**
+ * Checks everything is ready for connection
+ * agent claimed, cloud url set and private key available
+ *
+ * @param aclk_hostname points to location where string pointer to hostname will be set
+ * @param aclk_port port to int where port will be saved
+ *
+ * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
+ */
+static int wait_till_agent_claim_ready()
+{
+ url_t url;
+ while (service_running(SERVICE_ACLK)) {
+ if (wait_till_agent_claimed())
+ return 1;
+
+ // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
+ // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (cloud_base_url == NULL) {
+ netdata_log_error("Do not move the cloud base url out of post_conf_load!!");
+ return 1;
+ }
+
+ // We just check configuration is valid here
+ // TODO make it without malloc/free
+ memset(&url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &url)) {
+ netdata_log_error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
+ url_t_destroy(&url);
+ sleep(5);
+ continue;
+ }
+ url_t_destroy(&url);
+
+ if (!load_private_key())
+ return 0;
+
+ sleep(5);
+ }
+
+ return 1;
+}
+
+void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
+{
+ switch(log_type) {
+ case MQTT_WSS_LOG_ERROR:
+ case MQTT_WSS_LOG_FATAL:
+ nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str);
+ return;
+
+ case MQTT_WSS_LOG_WARN:
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str);
+ return;
+
+ case MQTT_WSS_LOG_INFO:
+ nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str);
+ return;
+
+ case MQTT_WSS_LOG_DEBUG:
+ return;
+
+ default:
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss");
+ }
+}
+
+static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ UNUSED(qos);
+ aclk_rcvd_cloud_msgs++;
+
+ netdata_log_debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+ netdata_log_error("Link is shutting down. Ignoring incoming message.");
+ return;
+ }
+
+ const char *msgtype = strrchr(topic, '/');
+ if (unlikely(!msgtype)) {
+ error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+ msgtype++;
+ if (unlikely(!*msgtype)) {
+ error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+ char filename[FN_MAX_LEN];
+ int logfd;
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
+ logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+ if(logfd < 0)
+ netdata_log_error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ write(logfd, msg, msglen);
+ close(logfd);
+#endif
+
+ aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic);
+}
+
+static void puback_callback(uint16_t packet_id)
+{
+ if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
+ last_conn_time_appl = now_realtime_sec();
+ aclk_tbeb_reset();
+ }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_puback(packet_id);
+#endif
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Shutdown message has been acknowledged by the cloud. Exiting gracefully");
+
+ aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
+ }
+}
+
+static int read_query_thread_count()
+{
+ int threads = MIN(get_netdata_cpus()/2, 6);
+ threads = MAX(threads, 2);
+ threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ if(threads < 1) {
+ netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
+ threads = 1;
+ config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ }
+ return threads;
+}
+
+void aclk_graceful_disconnect(mqtt_wss_client client);
+
+/* Keeps connection alive and handles all network communications.
+ * Returns on error or when netdata is shutting down.
+ * @param client instance of mqtt_wss_client
+ * @returns 0 - Netdata Exits
+ * >0 - Error happened. Reconnect and start over.
+ */
+static int handle_connection(mqtt_wss_client client)
+{
+ time_t last_periodic_query_wakeup = now_monotonic_sec();
+ while (service_running(SERVICE_ACLK)) {
+ // timeout 1000 to check at least once a second
+ // for netdata_exit
+ if (mqtt_wss_service(client, 1000) < 0){
+ error_report("Connection Error or Dropped");
+ return 1;
+ }
+
+ if (disconnect_req || aclk_kill_link) {
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
+ disconnect_req ? "true" : "false",
+ aclk_kill_link ? "true" : "false");
+
+ disconnect_req = 0;
+ aclk_kill_link = 0;
+ aclk_graceful_disconnect(client);
+ aclk_queue_unlock();
+ aclk_shared_state.mqtt_shutdown_msg_id = -1;
+ aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
+ return 1;
+ }
+
+ // mqtt_wss_service will return faster than in one second
+ // if there is enough work to do
+ time_t now = now_monotonic_sec();
+ if (last_periodic_query_wakeup < now) {
+ // wake up at least one Query Thread at least
+ // once per second
+ last_periodic_query_wakeup = now;
+ QUERY_THREAD_WAKEUP;
+ }
+ }
+ return 0;
+}
+
+static inline void mqtt_connected_actions(mqtt_wss_client client)
+{
+ char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
+
+ if (!topic)
+ netdata_log_error("Unable to fetch topic for COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+
+ topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ netdata_log_error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+
+ aclk_stats_upd_online(1);
+ aclk_connected = 1;
+ aclk_pubacks_per_conn = 0;
+ aclk_rcvd_cloud_msgs = 0;
+ aclk_connection_counter++;
+
+ aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
+ while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
+ mqtt_wss_set_topic_alias(client, topic);
+
+ aclk_send_agent_connection_update(client, 1);
+}
+
+void aclk_graceful_disconnect(mqtt_wss_client client)
+{
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Preparing to gracefully shutdown ACLK connection");
+
+ aclk_queue_lock();
+ aclk_queue_flush();
+
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
+
+ time_t t = now_monotonic_sec();
+ while (!mqtt_wss_service(client, 100)) {
+ if (now_monotonic_sec() - t >= 2) {
+ netdata_log_error("Wasn't able to gracefully shutdown ACLK in time!");
+ break;
+ }
+ if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "MQTT App Layer `disconnect` message sent successfully");
+ break;
+ }
+ }
+
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "ACLK link is down");
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
+
+ aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
+ aclk_connected = 0;
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Attempting to gracefully shutdown the MQTT/WSS connection");
+
+ mqtt_wss_disconnect(client, 1000);
+}
+
+static unsigned long aclk_reconnect_delay() {
+ unsigned long recon_delay;
+ time_t now;
+
+ if (aclk_disable_runtime) {
+ aclk_tbeb_reset();
+ return 60 * MSEC_PER_SEC;
+ }
+
+ now = now_monotonic_sec();
+ if (aclk_block_until) {
+ if (now < aclk_block_until) {
+ recon_delay = aclk_block_until - now;
+ recon_delay *= MSEC_PER_SEC;
+ aclk_block_until = 0;
+ aclk_tbeb_reset();
+ return recon_delay;
+ }
+ aclk_block_until = 0;
+ }
+
+ if (!aclk_env || !aclk_env->backoff.base)
+ return aclk_tbeb_delay(0, 2, 0, 1024);
+
+ return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
+}
+
+/* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
+ * @return 0 - Go ahead and connect (delay expired)
+ * 1 - netdata_exit
+ */
+#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
+static int aclk_block_till_recon_allowed() {
+ unsigned long recon_delay = aclk_reconnect_delay();
+
+ next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
+ last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
+
+ // we want to wake up from time to time to check netdata_exit
+ while (recon_delay)
+ {
+ if (!service_running(SERVICE_ACLK))
+ return 1;
+ if (recon_delay > NETDATA_EXIT_POLL_MS) {
+ sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
+ recon_delay -= NETDATA_EXIT_POLL_MS;
+ continue;
+ }
+ sleep_usec(recon_delay * USEC_PER_MS);
+ recon_delay = 0;
+ }
+ return !service_running(SERVICE_ACLK);
+}
+
+#ifndef ACLK_DISABLE_CHALLENGE
+/* Cloud returns transport list ordered with highest
+ * priority first. This function selects highest prio
+ * transport that we can actually use (support)
+ */
+static int aclk_get_transport_idx(aclk_env_t *env) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ // currently we support only MQTT 5
+ // therefore select first transport that matches
+ if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
+ return i;
+ }
+ }
+ return -1;
+}
+#endif
+
+ACLK_STATUS aclk_status = ACLK_STATUS_NONE;
+
+const char *aclk_status_to_string(void) {
+ switch(aclk_status) {
+ case ACLK_STATUS_CONNECTED:
+ return "connected";
+
+ case ACLK_STATUS_NONE:
+ return "none";
+
+ case ACLK_STATUS_DISABLED:
+ return "disabled";
+
+ case ACLK_STATUS_NO_CLOUD_URL:
+ return "no_cloud_url";
+
+ case ACLK_STATUS_INVALID_CLOUD_URL:
+ return "invalid_cloud_url";
+
+ case ACLK_STATUS_NOT_CLAIMED:
+ return "not_claimed";
+
+ case ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE:
+ return "env_endpoint_unreachable";
+
+ case ACLK_STATUS_ENV_RESPONSE_NOT_200:
+ return "env_response_not_200";
+
+ case ACLK_STATUS_ENV_RESPONSE_EMPTY:
+ return "env_response_empty";
+
+ case ACLK_STATUS_ENV_RESPONSE_NOT_JSON:
+ return "env_response_not_json";
+
+ case ACLK_STATUS_ENV_FAILED:
+ return "env_failed";
+
+ case ACLK_STATUS_BLOCKED:
+ return "blocked";
+
+ case ACLK_STATUS_NO_OLD_PROTOCOL:
+ return "no_old_protocol";
+
+ case ACLK_STATUS_NO_PROTOCOL_CAPABILITY:
+ return "no_protocol_capability";
+
+ case ACLK_STATUS_INVALID_ENV_AUTH_URL:
+ return "invalid_env_auth_url";
+
+ case ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX:
+ return "invalid_env_transport_idx";
+
+ case ACLK_STATUS_INVALID_ENV_TRANSPORT_URL:
+ return "invalid_env_transport_url";
+
+ case ACLK_STATUS_INVALID_OTP:
+ return "invalid_otp";
+
+ case ACLK_STATUS_NO_LWT_TOPIC:
+ return "no_lwt_topic";
+
+ default:
+ return "unknown";
+ }
+}
+
+const char *aclk_cloud_base_url = NULL;
+
+/* Attempts to make a connection to MQTT broker over WSS
+ * @param client instance of mqtt_wss_client
+ * @return 0 - Successful Connection,
+ * <0 - Irrecoverable Error -> Kill ACLK,
+ * >0 - netdata_exit
+ */
+#define CLOUD_BASE_URL_READ_RETRY 30
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+#define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
+#else
+#define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
+#endif
+static int aclk_attempt_to_connect(mqtt_wss_client client)
+{
+ int ret;
+
+ url_t base_url;
+
+#ifndef ACLK_DISABLE_CHALLENGE
+ url_t auth_url;
+ url_t mqtt_url;
+#endif
+
+ while (service_running(SERVICE_ACLK)) {
+ aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (aclk_cloud_base_url == NULL) {
+ error_report("Do not move the cloud base url out of post_conf_load!!");
+ aclk_status = ACLK_STATUS_NO_CLOUD_URL;
+ return -1;
+ }
+
+ if (aclk_block_till_recon_allowed()) {
+ aclk_status = ACLK_STATUS_BLOCKED;
+ return 1;
+ }
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Attempting connection now");
+
+ memset(&base_url, 0, sizeof(url_t));
+ if (url_parse(aclk_cloud_base_url, &base_url)) {
+ aclk_status = ACLK_STATUS_INVALID_CLOUD_URL;
+ error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
+ sleep(CLOUD_BASE_URL_READ_RETRY);
+ url_t_destroy(&base_url);
+ continue;
+ }
+
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type);
+
+ struct mqtt_connect_params mqtt_conn_params = {
+ .clientid = "anon",
+ .username = "anon",
+ .password = "anon",
+ .will_topic = "lwt",
+ .will_msg = NULL,
+ .will_flags = MQTT_WSS_PUB_QOS2,
+ .keep_alive = 60,
+ .drop_on_publish_fail = 1
+ };
+
+#ifndef ACLK_DISABLE_CHALLENGE
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ aclk_env = callocz(1, sizeof(aclk_env_t));
+
+ ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
+ url_t_destroy(&base_url);
+ if(ret) switch(ret) {
+ case 1:
+ aclk_status = ACLK_STATUS_NOT_CLAIMED;
+ error_report("Failed to Get ACLK environment (agent is not claimed)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 2:
+ aclk_status = ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE;
+ error_report("Failed to Get ACLK environment (cannot contact ENV endpoint)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 3:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_200;
+ error_report("Failed to Get ACLK environment (ENV response code is not 200)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 4:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_EMPTY;
+ error_report("Failed to Get ACLK environment (ENV response is empty)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 5:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_JSON;
+ error_report("Failed to Get ACLK environment (ENV response is not JSON)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ default:
+ aclk_status = ACLK_STATUS_ENV_FAILED;
+ error_report("Failed to Get ACLK environment (unknown error)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+ }
+
+ if (!service_running(SERVICE_ACLK)) {
+ aclk_status = ACLK_STATUS_DISABLED;
+ return 1;
+ }
+
+ if (aclk_env->encoding != ACLK_ENC_PROTO) {
+ aclk_status = ACLK_STATUS_NO_OLD_PROTOCOL;
+ error_report("This agent can only use the new cloud protocol but cloud requested old one.");
+ continue;
+ }
+
+ if (!aclk_env_has_capa("proto")) {
+ aclk_status = ACLK_STATUS_NO_PROTOCOL_CAPABILITY;
+ error_report("Can't use encoding=proto without at least \"proto\" capability.");
+ continue;
+ }
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "New ACLK protobuf protocol negotiated successfully (/env response).");
+
+ memset(&auth_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ aclk_status = ACLK_STATUS_INVALID_ENV_AUTH_URL;
+ error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ url_t_destroy(&auth_url);
+ continue;
+ }
+
+ ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
+ url_t_destroy(&auth_url);
+ if (ret) {
+ aclk_status = ACLK_STATUS_INVALID_OTP;
+ error_report("Error passing Challenge/Response to get OTP");
+ continue;
+ }
+
+ // aclk_get_topic moved here as during OTP we
+ // generate the topic cache
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
+
+ if (!mqtt_conn_params.will_topic) {
+ aclk_status = ACLK_STATUS_NO_LWT_TOPIC;
+ error_report("Couldn't get LWT topic. Will not send LWT.");
+ continue;
+ }
+
+ // Do the MQTT connection
+ ret = aclk_get_transport_idx(aclk_env);
+ if (ret < 0) {
+ aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX;
+ error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ continue;
+ }
+
+ memset(&mqtt_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_URL;
+ error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ url_t_destroy(&mqtt_url);
+ continue;
+ }
+#endif
+
+ aclk_session_newarch = now_realtime_usec();
+ aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
+ aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
+
+#ifdef ACLK_DISABLE_CHALLENGE
+ ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&base_url);
+#else
+ ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&mqtt_url);
+
+ freez((char*)mqtt_conn_params.clientid);
+ freez((char*)mqtt_conn_params.password);
+ freez((char*)mqtt_conn_params.username);
+#endif
+
+ freez((char*)mqtt_conn_params.will_msg);
+ freez((char*)proxy_conf.host);
+ freez((char*)proxy_conf.username);
+ freez((char*)proxy_conf.password);
+
+ if (!ret) {
+ last_conn_time_mqtt = now_realtime_sec();
+ nd_log(NDLS_DAEMON, NDLP_INFO, "ACLK connection successfully established");
+ aclk_status = ACLK_STATUS_CONNECTED;
+ nd_log(NDLS_ACCESS, NDLP_INFO, "ACLK CONNECTED");
+ mqtt_connected_actions(client);
+ return 0;
+ }
+
+ error_report("Connect failed");
+ }
+
+ aclk_status = ACLK_STATUS_DISABLED;
+ return 1;
+}
+
+/**
+ * Main agent cloud link thread
+ *
+ * This thread will simply call the main event loop that handles
+ * pending requests - both inbound and outbound
+ *
+ * @param ptr is a pointer to the netdata_static_thread structure.
+ *
+ * @return It always returns NULL
+ */
+void *aclk_main(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+
+ struct aclk_stats_thread *stats_thread = NULL;
+
+ struct aclk_query_threads query_threads;
+ query_threads.thread_list = NULL;
+
+ ACLK_PROXY_TYPE proxy_type;
+ aclk_get_proxy(&proxy_type);
+ if (proxy_type == PROXY_TYPE_SOCKS5) {
+ netdata_log_error("SOCKS5 proxy is not supported by ACLK-NG yet.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+ }
+
+ unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
+
+ // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
+ // as it must notify the far end that it shutdown gracefully and avoid the LWT.
+ netdata_thread_disable_cancelability();
+
+#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "Killing ACLK thread -> cloud functionality has been disabled");
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+#endif
+ query_threads.count = read_query_thread_count();
+
+ if (wait_till_cloud_enabled())
+ goto exit;
+
+ if (wait_till_agent_claim_ready())
+ goto exit;
+
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ netdata_log_error("Couldn't initialize MQTT_WSS network library");
+ goto exit;
+ }
+
+#ifdef MQTT_WSS_DEBUG
+ size_t default_ssl_log_filename_size = strlen(netdata_configured_log_dir) + strlen(DEFAULT_SSKEYLOGFILE_NAME) + 2;
+ char *default_ssl_log_filename = mallocz(default_ssl_log_filename_size);
+ snprintfz(default_ssl_log_filename, default_ssl_log_filename_size, "%s/%s", netdata_configured_log_dir, DEFAULT_SSKEYLOGFILE_NAME);
+ ssl_log_filename = config_get(CONFIG_SECTION_CLOUD, "aclk ssl keylog file", default_ssl_log_filename);
+ freez(default_ssl_log_filename);
+ if (ssl_log_filename) {
+ error_report("SSLKEYLOGFILE active (path:\"%s\")!", ssl_log_filename);
+ mqtt_wss_set_SSL_CTX_keylog_cb(mqttwss_client, aclk_ssl_keylog_cb);
+ }
+#endif
+
+ // Enable MQTT buffer growth if necessary
+ // e.g. old cloud architecture clients with huge nodes
+ // that send JSON payloads of 10 MB as single messages
+ mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
+
+ aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
+ if (aclk_stats_enabled) {
+ stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
+ stats_thread->thread = mallocz(sizeof(netdata_thread_t));
+ stats_thread->query_thread_count = query_threads.count;
+ stats_thread->client = mqttwss_client;
+ aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
+ netdata_thread_create(
+ stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread);
+ }
+
+ // Keep reconnecting and talking until our time has come
+ // and the Grim Reaper (netdata_exit) calls
+ do {
+ if (aclk_attempt_to_connect(mqttwss_client))
+ goto exit_full;
+
+ if (unlikely(!query_threads.thread_list))
+ aclk_query_threads_start(&query_threads, mqttwss_client);
+
+ if (handle_connection(mqttwss_client)) {
+ aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
+ aclk_connected = 0;
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
+ }
+ } while (service_running(SERVICE_ACLK));
+
+ aclk_graceful_disconnect(mqttwss_client);
+
+#ifdef MQTT_WSS_DEBUG
+ if (ssl_log_file)
+ fclose(ssl_log_file);
+#endif
+
+exit_full:
+// Tear Down
+ QUERY_THREAD_WAKEUP_ALL;
+
+ aclk_query_threads_cleanup(&query_threads);
+
+ if (aclk_stats_enabled) {
+ netdata_thread_join(*stats_thread->thread, NULL);
+ aclk_stats_thread_cleanup();
+ freez(stats_thread->thread);
+ freez(stats_thread);
+ }
+ free_topic_cache();
+ mqtt_wss_destroy(mqttwss_client);
+exit:
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+}
+
+void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
+{
+ uuid_t node_id;
+ int ret = 0;
+
+ if (!aclk_connected)
+ return;
+
+ if (host->node_id && !uuid_is_null(*host->node_id)) {
+ uuid_copy(node_id, *host->node_id);
+ }
+ else {
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ netdata_log_error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = rrdhost_hostname(host),
+ .machine_guid = host->machine_guid};
+ create_query->data.bin_payload.payload =
+ generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+
+ aclk_queue_query(create_query);
+ return;
+ }
+ }
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .hops = host->system_info->hops,
+ .live = cmd,
+ .queryable = queryable,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
+
+ node_state_update.capabilities = aclk_get_agent_capas();
+
+ rrdhost_aclk_state_lock(localhost);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ rrdhost_aclk_state_unlock(localhost);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing status update for node=%s, live=%d, hops=%u, queryable=%d",
+ (char*)node_state_update.node_id, cmd, host->system_info->hops, queryable);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
+}
+
+void aclk_send_node_instances()
+{
+ struct node_instance_list *list_head = get_node_list();
+ struct node_instance_list *list = list_head;
+ if (unlikely(!list)) {
+ error_report("Failure to get_node_list from DB!");
+ return;
+ }
+ while (!uuid_is_null(list->host_id)) {
+ if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .live = list->live,
+ .hops = list->hops,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
+
+ char host_id[UUID_STR_LEN];
+ uuid_unparse_lower(list->host_id, host_id);
+
+ RRDHOST *host = rrdhost_find_by_guid(host_id);
+ if (unlikely(!host)) {
+ freez((void*)node_state_update.node_id);
+ freez(query);
+ continue;
+ }
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
+
+ rrdhost_aclk_state_lock(localhost);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ rrdhost_aclk_state_unlock(localhost);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing status update for node=%s, live=%d, hops=%d, queryable=1",
+ (char*)node_state_update.node_id, list->live, list->hops);
+
+ freez((void*)node_state_update.capabilities);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
+ } else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ node_instance_creation_t node_instance_creation = {
+ .hops = list->hops,
+ .hostname = list->hostname,
+ };
+ node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing registration for host=%s, hops=%d",
+ (char*)node_instance_creation.machine_guid, list->hops);
+
+ freez((void *)node_instance_creation.machine_guid);
+ aclk_queue_query(create_query);
+ }
+ freez(list->hostname);
+
+ list++;
+ }
+ freez(list_head);
+}
+
+void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+{
+ aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
+}
+
+static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status)) {
+ buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tLast Submitted Seq ID: %"PRIu64,
+ status.alert_updates,
+ status.pending_min_sequence_id,
+ status.pending_max_sequence_id,
+ status.last_submitted_sequence_id
+ );
+}
+#endif /* ENABLE_ACLK */
+
+char *aclk_state(void)
+{
+#ifndef ENABLE_ACLK
+ return strdupz("ACLK Available: No");
+#else
+ BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
+ struct tm *tmptr, tmbuf;
+ char *ret;
+
+ buffer_strcat(wb,
+ "ACLK Available: Yes\n"
+ "ACLK Version: 2\n"
+ "Protocols Supported: Protobuf\n"
+ );
+ buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
+
+ char *agent_id = get_agent_claimid();
+ if (agent_id == NULL)
+ buffer_strcat(wb, "No\n");
+ else {
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
+ freez(agent_id);
+ }
+
+ buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
+ if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
+ }
+ if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
+ }
+ if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
+ }
+ if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
+ }
+
+ if (aclk_connected) {
+ buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
+
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
+
+ buffer_strcat(wb, "\tClaimed ID: ");
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ buffer_strcat(wb, host->aclk_state.claimed_id);
+ else
+ buffer_strcat(wb, "null");
+ rrdhost_aclk_state_unlock(host);
+
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ buffer_strcat(wb, "\n\tNode ID: null\n");
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
+ }
+
+ buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
+
+ if (host != localhost)
+ buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
+
+ buffer_strcat(wb, "\n\tAlert Streaming Status:");
+ fill_alert_status_for_host(wb, host);
+ }
+ rrd_unlock();
+ }
+
+ ret = strdupz(buffer_tostring(wb));
+ buffer_free(wb);
+ return ret;
+#endif /* ENABLE_ACLK */
+}
+
+#ifdef ENABLE_ACLK
+static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status))
+ return;
+
+ json_object *tmp = json_object_new_int(status.alert_updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(status.pending_min_sequence_id);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_max_sequence_id);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(status.last_submitted_sequence_id);
+ json_object_object_add(obj, "last-submitted-seq-id", tmp);
+}
+
+static json_object *timestamp_to_json(const time_t *t)
+{
+ struct tm *tmptr, tmbuf;
+ if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ return json_object_new_string(timebuf);
+ }
+ return NULL;
+}
+#endif /* ENABLE_ACLK */
+
+char *aclk_state_json(void)
+{
+#ifndef ENABLE_ACLK
+ return strdupz("{\"aclk-available\":false}");
+#else
+ json_object *tmp, *grp, *msg = json_object_new_object();
+
+ tmp = json_object_new_boolean(1);
+ json_object_object_add(msg, "aclk-available", tmp);
+
+ tmp = json_object_new_int(2);
+ json_object_object_add(msg, "aclk-version", tmp);
+
+ grp = json_object_new_array();
+ tmp = json_object_new_string("Protobuf");
+ json_object_array_add(grp, tmp);
+ json_object_object_add(msg, "protocols-supported", grp);
+
+ char *agent_id = get_agent_claimid();
+ tmp = json_object_new_boolean(agent_id != NULL);
+ json_object_object_add(msg, "agent-claimed", tmp);
+
+ if (agent_id) {
+ tmp = json_object_new_string(agent_id);
+ freez(agent_id);
+ } else
+ tmp = NULL;
+ json_object_object_add(msg, "claimed-id", tmp);
+
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
+ json_object_object_add(msg, "cloud-url", tmp);
+
+ tmp = json_object_new_boolean(aclk_connected);
+ json_object_object_add(msg, "online", tmp);
+
+ tmp = json_object_new_string("Protobuf");
+ json_object_object_add(msg, "used-cloud-protocol", tmp);
+
+ tmp = json_object_new_int(5);
+ json_object_object_add(msg, "mqtt-version", tmp);
+
+ tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
+ json_object_object_add(msg, "received-app-layer-msgs", tmp);
+
+ tmp = json_object_new_int(aclk_pubacks_per_conn);
+ json_object_object_add(msg, "received-mqtt-pubacks", tmp);
+
+ tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
+ json_object_object_add(msg, "reconnect-count", tmp);
+
+ json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
+ json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
+ json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
+ json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
+ tmp = NULL;
+ if (!aclk_connected && last_backoff_value)
+ tmp = json_object_new_double(last_backoff_value);
+ json_object_object_add(msg, "last-backoff-value", tmp);
+
+ tmp = json_object_new_boolean(aclk_disable_runtime);
+ json_object_object_add(msg, "banned-by-cloud", tmp);
+
+ grp = json_object_new_array();
+
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ json_object *nodeinstance = json_object_new_object();
+
+ tmp = json_object_new_string(rrdhost_hostname(host));
+ json_object_object_add(nodeinstance, "hostname", tmp);
+
+ tmp = json_object_new_string(host->machine_guid);
+ json_object_object_add(nodeinstance, "mguid", tmp);
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id) {
+ tmp = json_object_new_string(host->aclk_state.claimed_id);
+ json_object_object_add(nodeinstance, "claimed_id", tmp);
+ } else
+ json_object_object_add(nodeinstance, "claimed_id", NULL);
+ rrdhost_aclk_state_unlock(host);
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ json_object_object_add(nodeinstance, "node-id", NULL);
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ tmp = json_object_new_string(node_id);
+ json_object_object_add(nodeinstance, "node-id", tmp);
+ }
+
+ tmp = json_object_new_int(host->system_info->hops);
+ json_object_object_add(nodeinstance, "streaming-hops", tmp);
+
+ tmp = json_object_new_string(host == localhost ? "self" : "child");
+ json_object_object_add(nodeinstance, "relationship", tmp);
+
+ tmp = json_object_new_boolean((host->receiver || host == localhost));
+ json_object_object_add(nodeinstance, "streaming-online", tmp);
+
+ tmp = json_object_new_object();
+ fill_alert_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "alert-sync-status", tmp);
+
+ json_object_array_add(grp, nodeinstance);
+ }
+ rrd_unlock();
+ json_object_object_add(msg, "node-instances", grp);
+
+ char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
+ json_object_put(msg);
+ return str;
+#endif /* ENABLE_ACLK */
+}
+
+void add_aclk_host_labels(void) {
+ RRDLABELS *labels = localhost->rrdlabels;
+
+#ifdef ENABLE_ACLK
+ rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+ ACLK_PROXY_TYPE aclk_proxy;
+ char *proxy_str;
+ aclk_get_proxy(&aclk_proxy);
+
+ switch(aclk_proxy) {
+ case PROXY_TYPE_SOCKS5:
+ proxy_str = "SOCKS5";
+ break;
+ case PROXY_TYPE_HTTP:
+ proxy_str = "HTTP";
+ break;
+ default:
+ proxy_str = "none";
+ break;
+ }
+
+ rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#else
+ rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#endif
+}
+
+void aclk_queue_node_info(RRDHOST *host, bool immediate)
+{
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (likely(wc))
+ wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
+}