summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-03-31 12:59:21 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-03-31 12:59:21 +0000
commitbb8713bbc1c4594366fc735c04910edbf4c61aab (patch)
treed7da56c0b89aa371dd8ad986995dd145fdf6670a /aclk
parentReleasing debian version 1.29.3-4. (diff)
downloadnetdata-bb8713bbc1c4594366fc735c04910edbf4c61aab.tar.xz
netdata-bb8713bbc1c4594366fc735c04910edbf4c61aab.zip
Merging upstream version 1.30.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--aclk/aclk.c821
-rw-r--r--aclk/aclk.h100
-rw-r--r--aclk/aclk_collector_list.c193
-rw-r--r--aclk/aclk_collector_list.h39
-rw-r--r--aclk/aclk_otp.c261
-rw-r--r--aclk/aclk_otp.h10
-rw-r--r--aclk/aclk_query.c295
-rw-r--r--aclk/aclk_query.h32
-rw-r--r--aclk/aclk_query_queue.c128
-rw-r--r--aclk/aclk_query_queue.h71
-rw-r--r--aclk/aclk_rx_msgs.c343
-rw-r--r--aclk/aclk_rx_msgs.h14
-rw-r--r--aclk/aclk_stats.c274
-rw-r--r--aclk/aclk_stats.h64
-rw-r--r--aclk/aclk_tx_msgs.c395
-rw-r--r--aclk/aclk_tx_msgs.h24
-rw-r--r--aclk/aclk_util.c347
-rw-r--r--aclk/aclk_util.h52
-rw-r--r--aclk/https_client.c246
-rw-r--r--aclk/https_client.h11
-rw-r--r--aclk/legacy/aclk_common.c1
-rw-r--r--aclk/legacy/aclk_lws_https_client.c4
-rw-r--r--aclk/legacy/aclk_lws_wss_client.c4
-rw-r--r--aclk/legacy/aclk_query.c60
-rw-r--r--aclk/legacy/aclk_query.h4
-rw-r--r--aclk/legacy/aclk_rx_msgs.c36
-rw-r--r--aclk/legacy/aclk_stats.c125
-rw-r--r--aclk/legacy/aclk_stats.h13
-rw-r--r--aclk/legacy/agent_cloud_link.c3
29 files changed, 3951 insertions, 19 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
new file mode 100644
index 000000000..889fa1e4d
--- /dev/null
+++ b/aclk/aclk.c
@@ -0,0 +1,821 @@
+#include "aclk.h"
+
+#include "aclk_stats.h"
+#include "mqtt_wss_client.h"
+#include "aclk_otp.h"
+#include "aclk_tx_msgs.h"
+#include "aclk_query.h"
+#include "aclk_query_queue.h"
+#include "aclk_util.h"
+#include "aclk_rx_msgs.h"
+#include "aclk_collector_list.h"
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#endif
+
+#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+
+//TODO remove most (as in 99.999999999%) of this crap
+int aclk_connected = 0;
+int aclk_disable_runtime = 0;
+int aclk_disable_single_updates = 0;
+int aclk_kill_link = 0;
+
+int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+
+usec_t aclk_session_us = 0; // Used by the mqtt layer
+time_t aclk_session_sec = 0; // Used by the mqtt layer
+
+mqtt_wss_client mqttwss_client;
+
+netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+
+struct aclk_shared_state aclk_shared_state = {
+ .agent_state = AGENT_INITIALIZING,
+ .last_popcorn_interrupt = 0,
+ .version_neg = 0,
+ .version_neg_wait_till = 0,
+ .mqtt_shutdown_msg_id = -1,
+ .mqtt_shutdown_msg_rcvd = 0
+};
+
+void aclk_single_update_disable()
+{
+ aclk_disable_single_updates = 1;
+}
+
+void aclk_single_update_enable()
+{
+ aclk_disable_single_updates = 0;
+}
+
+//ENDTODO
+
+static RSA *aclk_private_key = NULL;
+static int load_private_key()
+{
+ if (aclk_private_key != NULL)
+ RSA_free(aclk_private_key);
+ aclk_private_key = NULL;
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
+
+ long bytes_read;
+ char *private_key = read_by_filename(filename, &bytes_read);
+ if (!private_key) {
+ error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
+ return 1;
+ }
+ debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
+
+ BIO *key_bio = BIO_new_mem_buf(private_key, -1);
+ if (key_bio==NULL) {
+ error("Claimed agent cannot establish ACLK - failed to create BIO for key");
+ goto biofailed;
+ }
+
+ aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
+ BIO_free(key_bio);
+ if (aclk_private_key!=NULL)
+ {
+ freez(private_key);
+ return 0;
+ }
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
+
+biofailed:
+ freez(private_key);
+ return 1;
+}
+
+static int wait_till_cloud_enabled()
+{
+ info("Waiting for Cloud to be enabled");
+ while (!netdata_cloud_setting) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (netdata_exit)
+ return 1;
+ }
+ return 0;
+}
+
+/**
+ * Will block until agent is claimed. Returns only if agent claimed
+ * or if agent needs to shutdown.
+ *
+ * @return `0` if agent has been claimed,
+ * `1` if interrupted due to agent shutting down
+ */
+static int wait_till_agent_claimed(void)
+{
+ //TODO prevent malloc and freez
+ char *agent_id = is_agent_claimed();
+ while (likely(!agent_id)) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (netdata_exit)
+ return 1;
+ agent_id = is_agent_claimed();
+ }
+ freez(agent_id);
+ return 0;
+}
+
+/**
+ * Checks everything is ready for connection
+ * agent claimed, cloud url set and private key available
+ *
+ * @param aclk_hostname points to location where string pointer to hostname will be set
+ * @param ackl_port port to int where port will be saved
+ *
+ * @return If non 0 returned irrecoverable error happened and ACLK should be terminated
+ */
+static int wait_till_agent_claim_ready()
+{
+ int port;
+ char *hostname = NULL;
+ while (!netdata_exit) {
+ if (wait_till_agent_claimed())
+ return 1;
+
+ // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
+ // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (cloud_base_url == NULL) {
+ error("Do not move the cloud base url out of post_conf_load!!");
+ return 1;
+ }
+
+ // We just check configuration is valid here
+ // TODO make it without malloc/free
+ if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) {
+ error("Agent is claimed but the configuration is invalid, please fix");
+ freez(hostname);
+ hostname = NULL;
+ sleep(5);
+ continue;
+ }
+ freez(hostname);
+ hostname = NULL;
+
+ if (!load_private_key()) {
+ sleep(5);
+ break;
+ }
+ }
+
+ return 0;
+}
+
+void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
+{
+ switch(log_type) {
+ case MQTT_WSS_LOG_ERROR:
+ case MQTT_WSS_LOG_FATAL:
+ case MQTT_WSS_LOG_WARN:
+ error("%s", str);
+ return;
+ case MQTT_WSS_LOG_INFO:
+ info("%s", str);
+ return;
+ case MQTT_WSS_LOG_DEBUG:
+ debug(D_ACLK, "%s", str);
+ return;
+ default:
+ error("Unknown log type from mqtt_wss");
+ }
+}
+
+//TODO prevent big buffer on stack
+#define RX_MSGLEN_MAX 4096
+static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ char cmsg[RX_MSGLEN_MAX];
+ size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
+
+ if (msglen > RX_MSGLEN_MAX - 1)
+ error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
+
+ memcpy(cmsg,
+ msg,
+ len);
+ cmsg[len] = 0;
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+ char filename[FN_MAX_LEN];
+ int logfd;
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT());
+ logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+ if(logfd < 0)
+ error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ write(logfd, msg, msglen);
+ close(logfd);
+#endif
+
+ debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg);
+
+ if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic))
+ error("Received message on unexpected topic %s", topic);
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+ error("Link is shutting down. Ignoring message.");
+ return;
+ }
+
+ aclk_handle_cloud_message(cmsg);
+}
+
+static void puback_callback(uint16_t packet_id)
+{
+ if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
+ aclk_reconnect_delay(0);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_puback(packet_id);
+#endif
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
+ error("Got PUBACK for shutdown message. Can exit gracefully.");
+ aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
+ }
+}
+
+static int read_query_thread_count()
+{
+ int threads = MIN(processors/2, 6);
+ threads = MAX(threads, 2);
+ threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ if(threads < 1) {
+ error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
+ threads = 1;
+ config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ }
+ return threads;
+}
+
+/* Keeps connection alive and handles all network comms.
+ * Returns on error or when netdata is shutting down.
+ * @param client instance of mqtt_wss_client
+ * @returns 0 - Netdata Exits
+ * >0 - Error happened. Reconnect and start over.
+ */
+static int handle_connection(mqtt_wss_client client)
+{
+ time_t last_periodic_query_wakeup = now_monotonic_sec();
+ while (!netdata_exit) {
+ // timeout 1000 to check at least once a second
+ // for netdata_exit
+ if (mqtt_wss_service(client, 1000) < 0){
+ error("Connection Error or Dropped");
+ return 1;
+ }
+
+ // mqtt_wss_service will return faster than in one second
+ // if there is enough work to do
+ time_t now = now_monotonic_sec();
+ if (last_periodic_query_wakeup < now) {
+ // wake up at least one Query Thread at least
+ // once per second
+ last_periodic_query_wakeup = now;
+ QUERY_THREAD_WAKEUP;
+ }
+ }
+ return 0;
+}
+
+inline static int aclk_popcorn_check_bump()
+{
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
+ ACLK_SHARED_STATE_UNLOCK;
+ return 1;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ return 0;
+}
+
+static inline void queue_connect_payloads(void)
+{
+ aclk_query_t query = aclk_query_new(METADATA_INFO);
+ query->data.metadata_info.host = localhost;
+ query->data.metadata_info.initial_on_connect = 1;
+ aclk_queue_query(query);
+ query = aclk_query_new(METADATA_ALARMS);
+ query->data.metadata_alarms.initial_on_connect = 1;
+ aclk_queue_query(query);
+}
+
+static inline void mqtt_connected_actions(mqtt_wss_client client)
+{
+ // TODO global vars?
+ usec_t now = now_realtime_usec();
+ aclk_session_sec = now / USEC_PER_SEC;
+ aclk_session_us = now % USEC_PER_SEC;
+
+ mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1);
+
+ aclk_stats_upd_online(1);
+ aclk_connected = 1;
+ aclk_pubacks_per_conn = 0;
+ aclk_hello_msg(client);
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
+ error("Sending `connect` payload immediatelly as popcorning was finished already.");
+ queue_connect_payloads();
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+}
+
+/* Waits until agent is ready or needs to exit
+ * @param client instance of mqtt_wss_client
+ * @param query_threads pointer to aclk_query_threads
+ * structure where to store data about started query threads
+ * @return 0 - Popcorning Finished - Agent STABLE,
+ * !0 - netdata_exit
+ */
+static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads)
+{
+ time_t elapsed;
+ int need_wait;
+ while (!netdata_exit) {
+ ACLK_SHARED_STATE_LOCK;
+ if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) {
+ ACLK_SHARED_STATE_UNLOCK;
+ return 0;
+ }
+ elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
+ if (elapsed >= ACLK_STABLE_TIMEOUT) {
+ aclk_shared_state.agent_state = AGENT_STABLE;
+ ACLK_SHARED_STATE_UNLOCK;
+ error("ACLK localhost popocorn finished");
+ if (unlikely(!query_threads->thread_list))
+ aclk_query_threads_start(query_threads, client);
+ queue_connect_payloads();
+ return 0;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ need_wait = ACLK_STABLE_TIMEOUT - elapsed;
+ error("ACLK localhost popocorn wait %d seconds longer", need_wait);
+ sleep(need_wait);
+ }
+ return 1;
+}
+
+void aclk_graceful_disconnect(mqtt_wss_client client)
+{
+ error("Preparing to Gracefully Shutdown the ACLK");
+ aclk_queue_lock();
+ aclk_queue_flush();
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+ time_t t = now_monotonic_sec();
+ while (!mqtt_wss_service(client, 100)) {
+ if (now_monotonic_sec() - t >= 2) {
+ error("Wasn't able to gracefully shutdown ACLK in time!");
+ break;
+ }
+ if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
+ error("MQTT App Layer `disconnect` message sent successfully");
+ break;
+ }
+ }
+ aclk_stats_upd_online(0);
+ aclk_connected = 0;
+
+ error("Attempting to Gracefully Shutdown MQTT/WSS connection");
+ mqtt_wss_disconnect(client, 1000);
+}
+
+/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled
+ * @return 0 - Go ahead and connect (delay expired)
+ * 1 - netdata_exit
+ */
+#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
+static int aclk_block_till_recon_allowed() {
+ // Handle reconnect exponential backoff
+ // fnc aclk_reconnect_delay comes from ACLK Legacy @amoss
+ // but has been modifed slightly (more randomness)
+ unsigned long recon_delay = aclk_reconnect_delay(1);
+ info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
+ // we want to wake up from time to time to check netdata_exit
+ while (recon_delay)
+ {
+ if (netdata_exit)
+ return 1;
+ if (recon_delay > NETDATA_EXIT_POLL_MS) {
+ sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
+ recon_delay -= NETDATA_EXIT_POLL_MS;
+ continue;
+ }
+ sleep_usec(recon_delay * USEC_PER_MS);
+ recon_delay = 0;
+ }
+ return 0;
+}
+
+#define HTTP_PROXY_PREFIX "http://"
+static void set_proxy(struct mqtt_wss_proxy *out)
+{
+ ACLK_PROXY_TYPE pt;
+ const char *ptr = aclk_get_proxy(&pt);
+ char *tmp;
+ char *host;
+ if (pt != PROXY_TYPE_HTTP)
+ return;
+
+ out->port = 0;
+
+ if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
+ ptr += strlen(HTTP_PROXY_PREFIX);
+
+ if ((tmp = strchr(ptr, '@')))
+ ptr = tmp;
+
+ if ((tmp = strchr(ptr, '/'))) {
+ host = mallocz((tmp - ptr) + 1);
+ memcpy(host, ptr, (tmp - ptr));
+ host[tmp - ptr] = 0;
+ } else
+ host = strdupz(ptr);
+
+ if ((tmp = strchr(host, ':'))) {
+ *tmp = 0;
+ tmp++;
+ out->port = atoi(tmp);
+ }
+
+ if (out->port <= 0 || out->port > 65535)
+ out->port = 8080;
+
+ out->host = host;
+
+ out->type = MQTT_WSS_PROXY_HTTP;
+}
+
+/* Attempts to make a connection to MQTT broker over WSS
+ * @param client instance of mqtt_wss_client
+ * @return 0 - Successfull Connection,
+ * <0 - Irrecoverable Error -> Kill ACLK,
+ * >0 - netdata_exit
+ */
+#define CLOUD_BASE_URL_READ_RETRY 30
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+#define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
+#else
+#define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
+#endif
+static int aclk_attempt_to_connect(mqtt_wss_client client)
+{
+ char *aclk_hostname = NULL;
+ int aclk_port;
+
+#ifndef ACLK_DISABLE_CHALLENGE
+ char *mqtt_otp_user = NULL;
+ char *mqtt_otp_pass = NULL;
+#endif
+
+ json_object *lwt;
+
+ while (!netdata_exit) {
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (cloud_base_url == NULL) {
+ error("Do not move the cloud base url out of post_conf_load!!");
+ return -1;
+ }
+
+ if (aclk_block_till_recon_allowed())
+ return 1;
+
+ info("Attempting connection now");
+ if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
+ error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
+ sleep(CLOUD_BASE_URL_READ_RETRY);
+ continue;
+ }
+
+ struct mqtt_wss_proxy proxy_conf;
+ proxy_conf.type = MQTT_WSS_DIRECT;
+ set_proxy(&proxy_conf);
+
+ struct mqtt_connect_params mqtt_conn_params = {
+ .clientid = "anon",
+ .username = "anon",
+ .password = "anon",
+ .will_topic = aclk_get_topic(ACLK_TOPICID_METADATA),
+ .will_msg = NULL,
+ .will_flags = MQTT_WSS_PUB_QOS2,
+ .keep_alive = 60
+ };
+#ifndef ACLK_DISABLE_CHALLENGE
+ aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass);
+ mqtt_conn_params.clientid = mqtt_otp_user;
+ mqtt_conn_params.username = mqtt_otp_user;
+ mqtt_conn_params.password = mqtt_otp_pass;
+#endif
+
+ lwt = aclk_generate_disconnect(NULL);
+ mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
+
+ mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) {
+ json_object_put(lwt);
+ freez(aclk_hostname);
+ aclk_hostname = NULL;
+ info("MQTTWSS connection succeeded");
+ mqtt_connected_actions(client);
+ return 0;
+ }
+
+ freez(aclk_hostname);
+ aclk_hostname = NULL;
+ json_object_put(lwt);
+ error("Connect failed\n");
+ }
+
+ return 1;
+}
+
+/**
+ * Main agent cloud link thread
+ *
+ * This thread will simply call the main event loop that handles
+ * pending requests - both inbound and outbound
+ *
+ * @param ptr is a pointer to the netdata_static_thread structure.
+ *
+ * @return It always returns NULL
+ */
+void *aclk_main(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+
+ struct aclk_stats_thread *stats_thread = NULL;
+
+ struct aclk_query_threads query_threads;
+ query_threads.thread_list = NULL;
+
+ ACLK_PROXY_TYPE proxy_type;
+ aclk_get_proxy(&proxy_type);
+ if (proxy_type == PROXY_TYPE_SOCKS5) {
+ error("SOCKS5 proxy is not supported by ACLK-NG yet.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+ }
+
+ // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
+ // as it must notify the far end that it shutdown gracefully and avoid the LWT.
+ netdata_thread_disable_cancelability();
+
+#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
+ info("Killing ACLK thread -> cloud functionality has been disabled");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+#endif
+ aclk_popcorn_check_bump(); // start localhost popcorn timer
+ query_threads.count = read_query_thread_count();
+
+ if (wait_till_cloud_enabled())
+ goto exit;
+
+ if (wait_till_agent_claim_ready())
+ goto exit;
+
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ error("Couldn't initialize MQTT_WSS network library");
+ goto exit;
+ }
+
+ aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
+ if (aclk_stats_enabled) {
+ stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
+ stats_thread->thread = mallocz(sizeof(netdata_thread_t));
+ stats_thread->query_thread_count = query_threads.count;
+ netdata_thread_create(
+ stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread);
+ }
+
+ // Keep reconnecting and talking until our time has come
+ // and the Grim Reaper (netdata_exit) calls
+ do {
+ if (aclk_attempt_to_connect(mqttwss_client))
+ goto exit_full;
+
+ // warning this assumes the popcorning is relative short (3s)
+ // if that changes call mqtt_wss_service from within
+ // to keep OpenSSL, WSS and MQTT connection alive
+ if (wait_popcorning_finishes(mqttwss_client, &query_threads))
+ goto exit_full;
+
+ if (!handle_connection(mqttwss_client)) {
+ aclk_stats_upd_online(0);
+ aclk_connected = 0;
+ }
+ } while (!netdata_exit);
+
+ aclk_graceful_disconnect(mqttwss_client);
+
+exit_full:
+// Tear Down
+ QUERY_THREAD_WAKEUP_ALL;
+
+ aclk_query_threads_cleanup(&query_threads);
+
+ if (aclk_stats_enabled) {
+ netdata_thread_join(*stats_thread->thread, NULL);
+ aclk_stats_thread_cleanup();
+ freez(stats_thread->thread);
+ freez(stats_thread);
+ }
+ free_topic_cache();
+ mqtt_wss_destroy(mqttwss_client);
+exit:
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+}
+
+// TODO this is taken over as workaround from old ACLK
+// fix this in both old and new ACLK
+extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
+
+void aclk_alarm_reload(void)
+{
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ ACLK_SHARED_STATE_UNLOCK;
+ return;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+
+ aclk_queue_query(aclk_query_new(METADATA_ALARMS));
+}
+
+int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
+{
+ BUFFER *local_buffer;
+ json_object *msg;
+
+ if (host != localhost)
+ return 0;
+
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ ACLK_SHARED_STATE_UNLOCK;
+ return 0;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+
+ netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
+ health_alarm_entry2json_nolock(local_buffer, ae, host);
+ netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
+
+ msg = json_tokener_parse(local_buffer->buffer);
+
+ struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE);
+ query->data.alarm_update = msg;
+ aclk_queue_query(query);
+
+ buffer_free(local_buffer);
+ return 0;
+}
+
+int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
+{
+ struct aclk_query *query;
+
+ if (aclk_popcorn_check_bump())
+ return 0;
+
+ query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
+ if(create) {
+ query->data.chart_add_del.host = host;
+ query->data.chart_add_del.chart_name = strdupz(chart_name);
+ } else {
+ query->data.metadata_info.host = host;
+ query->data.metadata_info.initial_on_connect = 0;
+ }
+
+ aclk_queue_query(query);
+ return 0;
+}
+
+/*
+ * Add a new collector to the list
+ * If it exists, update the chart count
+ */
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+{
+ struct aclk_query *query;
+ struct _collector *tmp_collector;
+ if (unlikely(!netdata_ready)) {
+ return;
+ }
+
+ COLLECTOR_LOCK;
+
+ tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
+
+ if (unlikely(tmp_collector->count != 1)) {
+ COLLECTOR_UNLOCK;
+ return;
+ }
+
+ COLLECTOR_UNLOCK;
+
+ if (aclk_popcorn_check_bump())
+ return;
+
+ if (host != localhost)
+ return;
+
+ query = aclk_query_new(METADATA_INFO);
+ query->data.metadata_info.host = localhost; //TODO
+ query->data.metadata_info.initial_on_connect = 0;
+ aclk_queue_query(query);
+
+ query = aclk_query_new(METADATA_ALARMS);
+ query->data.metadata_alarms.initial_on_connect = 0;
+ aclk_queue_query(query);
+}
+
+/*
+ * Delete a collector from the list
+ * If the chart count reaches zero the collector will be removed
+ * from the list by calling del_collector.
+ *
+ * This function will release the memory used and schedule
+ * a cloud update
+ */
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+{
+ struct aclk_query *query;
+ struct _collector *tmp_collector;
+ if (unlikely(!netdata_ready)) {
+ return;
+ }
+
+ COLLECTOR_LOCK;
+
+ tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
+
+ if (unlikely(!tmp_collector || tmp_collector->count)) {
+ COLLECTOR_UNLOCK;
+ return;
+ }
+
+ debug(
+ D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
+ tmp_collector->count);
+
+ COLLECTOR_UNLOCK;
+
+ _free_collector(tmp_collector);
+
+ if (aclk_popcorn_check_bump())
+ return;
+
+ if (host != localhost)
+ return;
+
+ query = aclk_query_new(METADATA_INFO);
+ query->data.metadata_info.host = localhost; //TODO
+ query->data.metadata_info.initial_on_connect = 0;
+ aclk_queue_query(query);
+
+ query = aclk_query_new(METADATA_ALARMS);
+ query->data.metadata_alarms.initial_on_connect = 0;
+ aclk_queue_query(query);
+}
+
+struct label *add_aclk_host_labels(struct label *label) {
+#ifdef ENABLE_ACLK
+ ACLK_PROXY_TYPE aclk_proxy;
+ char *proxy_str;
+ aclk_get_proxy(&aclk_proxy);
+
+ switch(aclk_proxy) {
+ case PROXY_TYPE_SOCKS5:
+ proxy_str = "SOCKS5";
+ break;
+ case PROXY_TYPE_HTTP:
+ proxy_str = "HTTP";
+ break;
+ default:
+ proxy_str = "none";
+ break;
+ }
+ label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
+ return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
+#else
+ return label;
+#endif
+}
diff --git a/aclk/aclk.h b/aclk/aclk.h
new file mode 100644
index 000000000..29626c7f4
--- /dev/null
+++ b/aclk/aclk.h
@@ -0,0 +1,100 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_H
+#define ACLK_H
+
+typedef struct aclk_rrdhost_state {
+ char *claimed_id; // Claimed ID if host has one otherwise NULL
+} aclk_rrdhost_state;
+
+#include "../daemon/common.h"
+#include "aclk_util.h"
+
+// minimum and maximum supported version of ACLK
+// in this version of agent
+#define ACLK_VERSION_MIN 2
+#define ACLK_VERSION_MAX 2
+
+// Version negotiation messages have they own versioning
+// this is also used for LWT message as we set that up
+// before version negotiation
+#define ACLK_VERSION_NEG_VERSION 1
+
+// Maximum time to wait for version negotiation before aborting
+// and defaulting to oldest supported version
+#define VERSION_NEG_TIMEOUT 3
+
+#if ACLK_VERSION_MIN > ACLK_VERSION_MAX
+#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
+#endif
+
+// Define ACLK Feature Version Boundaries Here
+#define ACLK_V_COMPRESSION 2
+
+// How many MQTT PUBACKs we need to get to consider connection
+// stable for the purposes of TBEB (truncated binary exponential backoff)
+#define ACLK_PUBACKS_CONN_STABLE 3
+
+// TODO get rid of this shit
+extern int aclk_disable_runtime;
+extern int aclk_disable_single_updates;
+extern int aclk_kill_link;
+extern int aclk_connected;
+
+extern usec_t aclk_session_us;
+extern time_t aclk_session_sec;
+
+void *aclk_main(void *ptr);
+void aclk_single_update_disable();
+void aclk_single_update_enable();
+
+#define NETDATA_ACLK_HOOK \
+ { .name = "ACLK_Main", \
+ .config_section = NULL, \
+ .config_name = NULL, \
+ .enabled = 1, \
+ .thread = NULL, \
+ .init_routine = NULL, \
+ .start_routine = aclk_main },
+
+extern netdata_mutex_t aclk_shared_state_mutex;
+#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+
+typedef enum aclk_agent_state {
+ AGENT_INITIALIZING,
+ AGENT_STABLE
+} ACLK_AGENT_STATE;
+extern struct aclk_shared_state {
+ ACLK_AGENT_STATE agent_state;
+ time_t last_popcorn_interrupt;
+
+ // read only while ACLK connected
+ // protect by lock otherwise
+ int version_neg;
+ usec_t version_neg_wait_till;
+
+ // To wait for `disconnect` message PUBACK
+ // when shuting down
+ // at the same time if > 0 we know link is
+ // shutting down
+ int mqtt_shutdown_msg_id;
+ int mqtt_shutdown_msg_rcvd;
+} aclk_shared_state;
+
+void aclk_alarm_reload(void);
+int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
+
+// TODO this is for bacward compatibility with ACLK legacy
+#define ACLK_CMD_CHART 1
+#define ACLK_CMD_CHARTDEL 0
+/* Informs ACLK about created/deleted chart
+ * @param create 0 - if chart was deleted, other if chart created
+ */
+int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
+
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+
+struct label *add_aclk_host_labels(struct label *label);
+
+#endif /* ACLK_H */
diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c
new file mode 100644
index 000000000..a251a23a8
--- /dev/null
+++ b/aclk/aclk_collector_list.c
@@ -0,0 +1,193 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+// This is copied from Legacy ACLK, Original Autor: amoss
+
+// TODO unmess this
+
+#include "aclk_collector_list.h"
+
+netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
+
+struct _collector *collector_list = NULL;
+
+/*
+ * Free a collector structure
+ */
+void _free_collector(struct _collector *collector)
+{
+ if (likely(collector->plugin_name))
+ freez(collector->plugin_name);
+
+ if (likely(collector->module_name))
+ freez(collector->module_name);
+
+ if (likely(collector->hostname))
+ freez(collector->hostname);
+
+ freez(collector);
+}
+
+/*
+ * This will report the collector list
+ *
+ */
+#ifdef ACLK_DEBUG
+static void _dump_collector_list()
+{
+ struct _collector *tmp_collector;
+
+ COLLECTOR_LOCK;
+
+ info("DUMPING ALL COLLECTORS");
+
+ if (unlikely(!collector_list || !collector_list->next)) {
+ COLLECTOR_UNLOCK;
+ info("DUMPING ALL COLLECTORS -- nothing found");
+ return;
+ }
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+
+ while (tmp_collector) {
+ info(
+ "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
+ tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
+ tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
+
+ tmp_collector = tmp_collector->next;
+ }
+ info("DUMPING ALL COLLECTORS DONE");
+ COLLECTOR_UNLOCK;
+}
+#endif
+
+/*
+ * This will cleanup the collector list
+ *
+ */
+void _reset_collector_list()
+{
+ struct _collector *tmp_collector, *next_collector;
+
+ COLLECTOR_LOCK;
+
+ if (unlikely(!collector_list || !collector_list->next)) {
+ COLLECTOR_UNLOCK;
+ return;
+ }
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+ collector_list->count = 0;
+ collector_list->next = NULL;
+
+ // We broke the link; we can unlock
+ COLLECTOR_UNLOCK;
+
+ while (tmp_collector) {
+ next_collector = tmp_collector->next;
+ _free_collector(tmp_collector);
+ tmp_collector = next_collector;
+ }
+}
+
+/*
+ * Find a collector (if it exists)
+ * Must lock before calling this
+ * If last_collector is not null, it will return the previous collector in the linked
+ * list (used in collector delete)
+ */
+static struct _collector *_find_collector(
+ const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
+{
+ struct _collector *tmp_collector, *prev_collector;
+ uint32_t plugin_hash;
+ uint32_t module_hash;
+ uint32_t hostname_hash;
+
+ if (unlikely(!collector_list)) {
+ collector_list = callocz(1, sizeof(struct _collector));
+ return NULL;
+ }
+
+ if (unlikely(!collector_list->next))
+ return NULL;
+
+ plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
+ module_hash = module_name ? simple_hash(module_name) : 1;
+ hostname_hash = simple_hash(hostname);
+
+ // Note that the first entry is "dummy"
+ tmp_collector = collector_list->next;
+ prev_collector = collector_list;
+ while (tmp_collector) {
+ if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
+ hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
+ (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
+ (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
+ if (unlikely(last_collector))
+ *last_collector = prev_collector;
+
+ return tmp_collector;
+ }
+
+ prev_collector = tmp_collector;
+ tmp_collector = tmp_collector->next;
+ }
+
+ return tmp_collector;
+}
+
+/*
+ * Called to delete a collector
+ * It will reduce the count (chart_count) and will remove it
+ * from the linked list if the count reaches zero
+ * The structure will be returned to the caller to free
+ * the resources
+ *
+ */
+struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
+{
+ struct _collector *tmp_collector, *prev_collector = NULL;
+
+ tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
+
+ if (likely(tmp_collector)) {
+ --tmp_collector->count;
+ if (unlikely(!tmp_collector->count))
+ prev_collector->next = tmp_collector->next;
+ }
+ return tmp_collector;
+}
+
+/*
+ * Add a new collector (plugin / module) to the list
+ * If it already exists just update the chart count
+ *
+ * Lock before calling
+ */
+struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
+{
+ struct _collector *tmp_collector;
+
+ tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
+
+ if (unlikely(!tmp_collector)) {
+ tmp_collector = callocz(1, sizeof(struct _collector));
+ tmp_collector->hostname_hash = simple_hash(hostname);
+ tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
+ tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
+
+ tmp_collector->hostname = strdupz(hostname);
+ tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
+ tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
+
+ tmp_collector->next = collector_list->next;
+ collector_list->next = tmp_collector;
+ }
+ tmp_collector->count++;
+ debug(
+ D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
+ module_name ? module_name : "*", tmp_collector->count);
+ return tmp_collector;
+}
diff --git a/aclk/aclk_collector_list.h b/aclk/aclk_collector_list.h
new file mode 100644
index 000000000..98d30ba94
--- /dev/null
+++ b/aclk/aclk_collector_list.h
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+// This is copied from Legacy ACLK, Original Autor: amoss
+
+// TODO unmess this
+
+#ifndef ACLK_COLLECTOR_LIST_H
+#define ACLK_COLLECTOR_LIST_H
+
+#include "libnetdata/libnetdata.h"
+
+extern netdata_mutex_t collector_mutex;
+
+#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
+#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
+
+/*
+ * Maintain a list of collectors and chart count
+ * If all the charts of a collector are deleted
+ * then a new metadata dataset must be send to the cloud
+ *
+ */
+struct _collector {
+ time_t created;
+ uint32_t count; //chart count
+ uint32_t hostname_hash;
+ uint32_t plugin_hash;
+ uint32_t module_hash;
+ char *hostname;
+ char *plugin_name;
+ char *module_name;
+ struct _collector *next;
+};
+
+struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
+struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
+void _reset_collector_list();
+void _free_collector(struct _collector *collector);
+
+#endif /* ACLK_COLLECTOR_LIST_H */
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
new file mode 100644
index 000000000..fcb9d600c
--- /dev/null
+++ b/aclk/aclk_otp.c
@@ -0,0 +1,261 @@
+
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_otp.h"
+
+#include "https_client.h"
+
+#include "../daemon/common.h"
+
+#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h"
+
+struct dictionary_singleton {
+ char *key;
+ char *result;
+};
+
+static int json_extract_singleton(JSON_ENTRY *e)
+{
+ struct dictionary_singleton *data = e->callback_data;
+
+ switch (e->type) {
+ case JSON_OBJECT:
+ case JSON_ARRAY:
+ break;
+ case JSON_STRING:
+ if (!strcmp(e->name, data->key)) {
+ data->result = strdupz(e->data.string);
+ break;
+ }
+ break;
+ case JSON_NUMBER:
+ case JSON_BOOLEAN:
+ case JSON_NULL:
+ break;
+ }
+ return 0;
+}
+
+// Base-64 decoder.
+// Note: This is non-validating, invalid input will be decoded without an error.
+// Challenges are packed into json strings so we don't skip newlines.
+// Size errors (i.e. invalid input size or insufficient output space) are caught.
+static size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
+{
+ static char lookup[256];
+ static int first_time=1;
+ if (first_time)
+ {
+ first_time = 0;
+ for(int i=0; i<256; i++)
+ lookup[i] = -1;
+ for(int i='A'; i<='Z'; i++)
+ lookup[i] = i-'A';
+ for(int i='a'; i<='z'; i++)
+ lookup[i] = i-'a' + 26;
+ for(int i='0'; i<='9'; i++)
+ lookup[i] = i-'0' + 52;
+ lookup['+'] = 62;
+ lookup['/'] = 63;
+ }
+ if ((input_size & 3) != 0)
+ {
+ error("Can't decode base-64 input length %zu", input_size);
+ return 0;
+ }
+ size_t unpadded_size = (input_size/4) * 3;
+ if ( unpadded_size > output_size )
+ {
+ error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
+ return 0;
+ }
+ // Don't check padding within full quantums
+ for (size_t i = 0 ; i < input_size-4 ; i+=4 )
+ {
+ uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
+ output[0] = value >> 16;
+ output[1] = value >> 8;
+ output[2] = value;
+ //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
+ output += 3;
+ input += 4;
+ }
+ // Handle padding only in last quantum
+ if (input[2] == '=') {
+ uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
+ output[0] = value >> 4;
+ //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
+ return unpadded_size-2;
+ }
+ else if (input[3] == '=') {
+ uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
+ output[0] = value >> 10;
+ output[1] = value >> 2;
+ //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
+ return unpadded_size-1;
+ }
+ else
+ {
+ uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
+ output[0] = value >> 16;
+ output[1] = value >> 8;
+ output[2] = value;
+ //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
+ return unpadded_size;
+ }
+}
+
+static size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
+{
+ uint32_t value;
+ static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz"
+ "0123456789+/";
+ if ((input_size/3+1)*4 >= output_size)
+ {
+ error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
+ return 0;
+ }
+ size_t count = 0;
+ while (input_size>3)
+ {
+ value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
+ output[0] = lookup[value >> 18];
+ output[1] = lookup[(value >> 12) & 0x3f];
+ output[2] = lookup[(value >> 6) & 0x3f];
+ output[3] = lookup[value & 0x3f];
+ //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
+ output += 4;
+ input += 3;
+ input_size -= 3;
+ count += 4;
+ }
+ switch (input_size)
+ {
+ case 2:
+ value = (input[0] << 10) + (input[1] << 2);
+ output[0] = lookup[(value >> 12) & 0x3f];
+ output[1] = lookup[(value >> 6) & 0x3f];
+ output[2] = lookup[value & 0x3f];
+ output[3] = '=';
+ //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
+ count += 4;
+ break;
+ case 1:
+ value = input[0] << 4;
+ output[0] = lookup[(value >> 6) & 0x3f];
+ output[1] = lookup[value & 0x3f];
+ output[2] = '=';
+ output[3] = '=';
+ //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
+ count += 4;
+ break;
+ case 0:
+ break;
+ }
+ return count;
+}
+
+static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char *decrypted)
+{
+ int result = RSA_private_decrypt( data_len, enc_data, decrypted, p_key, RSA_PKCS1_OAEP_PADDING);
+ if (result == -1) {
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ error("Decryption of the challenge failed: %s", err);
+ }
+ return result;
+}
+
+// aclk_get_mqtt_otp is slightly modified original code from @amoss
+void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass)
+{
+ char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ debug(D_ACLK, "Performing challenge-response sequence");
+ if (*mqtt_pass != NULL)
+ {
+ freez(*mqtt_pass);
+ *mqtt_pass = NULL;
+ }
+ // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
+ // TODO - target host?
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
+ {
+ error("Agent was not claimed - cannot perform challenge/response");
+ goto CLEANUP;
+ }
+ char url[1024];
+ sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
+ info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url);
+ if (https_request(HTTP_REQ_GET, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
+ {
+ error("Challenge failed: %s", data_buffer);
+ goto CLEANUP;
+ }
+ struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
+
+ debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
+ if (json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
+ {
+ freez(challenge.result);
+ error("Could not parse the json response with the challenge: %s", data_buffer);
+ goto CLEANUP;
+ }
+ if (challenge.result == NULL) {
+ error("Could not retrieve challenge from auth response: %s", data_buffer);
+ goto CLEANUP;
+ }
+
+
+ size_t challenge_len = strlen(challenge.result);
+ unsigned char decoded[512];
+ size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
+
+ unsigned char plaintext[4096]={};
+ int decrypted_length = private_decrypt(p_key, decoded, decoded_len, plaintext);
+ freez(challenge.result);
+ char encoded[512];
+ size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
+ encoded[encoded_len] = 0;
+ debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
+
+ char response_json[4096]={};
+ sprintf(response_json, "{\"response\":\"%s\"}", encoded);
+ debug(D_ACLK, "Password phase: %s",response_json);
+ // TODO - host
+ sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
+ if (https_request(HTTP_REQ_POST, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
+ {
+ error("Challenge-response failed: %s", data_buffer);
+ goto CLEANUP;
+ }
+
+ debug(D_ACLK, "Password response from cloud: %s", data_buffer);
+
+ struct dictionary_singleton password = { .key = "password", .result = NULL };
+ if (json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
+ {
+ freez(password.result);
+ error("Could not parse the json response with the password: %s", data_buffer);
+ goto CLEANUP;
+ }
+
+ if (password.result == NULL ) {
+ error("Could not retrieve password from auth response");
+ goto CLEANUP;
+ }
+ if (*mqtt_pass != NULL )
+ freez(*mqtt_pass);
+ *mqtt_pass = password.result;
+ if (*mqtt_usr != NULL)
+ freez(*mqtt_usr);
+ *mqtt_usr = agent_id;
+ agent_id = NULL;
+
+CLEANUP:
+ if (agent_id != NULL)
+ freez(agent_id);
+ freez(data_buffer);
+ return;
+}
diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h
new file mode 100644
index 000000000..31e81c5a1
--- /dev/null
+++ b/aclk/aclk_otp.h
@@ -0,0 +1,10 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_OTP_H
+#define ACLK_OTP_H
+
+#include "../daemon/common.h"
+
+void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass);
+
+#endif /* ACLK_OTP_H */
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
new file mode 100644
index 000000000..71c63f647
--- /dev/null
+++ b/aclk/aclk_query.c
@@ -0,0 +1,295 @@
+#include "aclk_query.h"
+#include "aclk_stats.h"
+#include "aclk_query_queue.h"
+#include "aclk_tx_msgs.h"
+
+#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
+
+#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
+
+pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
+#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+
+typedef struct aclk_query_handler {
+ aclk_query_type_t type;
+ char *name; // for logging purposes
+ int(*fnc)(mqtt_wss_client client, aclk_query_t query);
+} aclk_query_handler;
+
+static int info_metadata(mqtt_wss_client client, aclk_query_t query)
+{
+ aclk_send_info_metadata(client,
+ !query->data.metadata_info.initial_on_connect,
+ query->data.metadata_info.host);
+ return 0;
+}
+
+static int alarms_metadata(mqtt_wss_client client, aclk_query_t query)
+{
+ aclk_send_alarm_metadata(client,
+ !query->data.metadata_info.initial_on_connect);
+ return 0;
+}
+
+static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url)
+{
+ usec_t t;
+
+ t = now_monotonic_high_precision_usec();
+ w->response.code = web_client_api_request_v1(host, w, url);
+ t = now_monotonic_high_precision_usec() - t;
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_q_process_total += t;
+ aclk_metrics_per_sample.cloud_q_process_count++;
+ if (aclk_metrics_per_sample.cloud_q_process_max < t)
+ aclk_metrics_per_sample.cloud_q_process_max = t;
+ ACLK_STATS_UNLOCK;
+ }
+
+ return t;
+}
+
+static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
+{
+ int retval = 0;
+ usec_t t;
+ BUFFER *local_buffer = NULL;
+
+#ifdef NETDATA_WITH_ZLIB
+ int z_ret;
+ BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ char *start, *end;
+#endif
+
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = 0x1f;
+
+ char *mysep = strchr(query->data.http_api_v2.query, '?');
+ if (mysep) {
+ url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+ *mysep = '\0';
+ } else
+ url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+
+ mysep = strrchr(query->data.http_api_v2.query, '/');
+
+ // execute the query
+ t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop");
+
+#ifdef NETDATA_WITH_ZLIB
+ // check if gzip encoding can and should be used
+ if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
+ start += strlen(WEB_HDR_ACCEPT_ENC);
+ end = strstr(start, "\x0D\x0A");
+ start = strstr(start, "gzip");
+
+ if (start && start < end) {
+ w->response.zstream.zalloc = Z_NULL;
+ w->response.zstream.zfree = Z_NULL;
+ w->response.zstream.opaque = Z_NULL;
+ if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
+ w->response.zinitialized = 1;
+ w->response.zoutput = 1;
+ } else
+ error("Failed to initialize zlib. Proceeding without compression.");
+ }
+ }
+
+ if (w->response.data->len && w->response.zinitialized) {
+ w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
+ w->response.zstream.avail_in = w->response.data->len;
+ do {
+ w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
+ w->response.zstream.next_out = w->response.zbuffer;
+ z_ret = deflate(&w->response.zstream, Z_FINISH);
+ if(z_ret < 0) {
+ if(w->response.zstream.msg)
+ error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg);
+ else
+ error("Unknown error during zlib compression.");
+ retval = 1;
+ goto cleanup;
+ }
+ int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
+ buffer_need_bytes(z_buffer, bytes_to_cpy);
+ memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
+ z_buffer->len += bytes_to_cpy;
+ } while(z_ret != Z_STREAM_END);
+ // so that web_client_build_http_header
+ // puts correct content lenght into header
+ buffer_free(w->response.data);
+ w->response.data = z_buffer;
+ z_buffer = NULL;
+ }
+#endif
+
+ now_realtime_timeval(&w->tv_ready);
+ w->response.data->date = w->tv_ready.tv_sec;
+ web_client_build_http_header(w);
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ buffer_strcat(local_buffer, w->response.header_output->buffer);
+
+ if (w->response.data->len) {
+#ifdef NETDATA_WITH_ZLIB
+ if (w->response.zinitialized) {
+ buffer_need_bytes(local_buffer, w->response.data->len);
+ memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
+ local_buffer->len += w->response.data->len;
+ } else {
+#endif
+ buffer_strcat(local_buffer, w->response.data->buffer);
+#ifdef NETDATA_WITH_ZLIB
+ }
+#endif
+ }
+
+ aclk_http_msg_v2(client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
+
+cleanup:
+#ifdef NETDATA_WITH_ZLIB
+ if(w->response.zinitialized)
+ deflateEnd(&w->response.zstream);
+ buffer_free(z_buffer);
+#endif
+ buffer_free(w->response.data);
+ buffer_free(w->response.header);
+ buffer_free(w->response.header_output);
+ freez(w);
+ buffer_free(local_buffer);
+ return retval;
+}
+
+static int chart_query(mqtt_wss_client client, aclk_query_t query)
+{
+ aclk_chart_msg(client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name);
+ return 0;
+}
+
+static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query)
+{
+ aclk_alarm_state_msg(client, query->data.alarm_update);
+ // aclk_alarm_state_msg frees the json object including the header it generates
+ query->data.alarm_update = NULL;
+ return 0;
+}
+
+aclk_query_handler aclk_query_handlers[] = {
+ { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
+ { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
+ { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata },
+ { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
+ { .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
+ { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
+ { .type = UNKNOWN, .name = NULL, .fnc = NULL }
+};
+
+
+static void aclk_query_process_msg(struct aclk_query_thread *info, aclk_query_t query)
+{
+ for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
+ if (aclk_query_handlers[i].type == query->type) {
+ debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
+ aclk_query_handlers[i].fnc(info->client, query);
+ aclk_query_free(query);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_dispatched++;
+ aclk_queries_per_thread[info->idx]++;
+ ACLK_STATS_UNLOCK;
+ }
+ return;
+ }
+ }
+ fatal("Unknown query in query queue. %u", query->type);
+}
+
+/* Processes messages from queue. Compete for work with other threads
+ */
+int aclk_query_process_msgs(struct aclk_query_thread *info)
+{
+ aclk_query_t query;
+ while ((query = aclk_queue_pop()))
+ aclk_query_process_msg(info, query);
+
+ return 0;
+}
+
+/**
+ * Main query processing thread
+ */
+void *aclk_query_main_thread(void *ptr)
+{
+ struct aclk_query_thread *info = ptr;
+ while (!netdata_exit) {
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(!aclk_shared_state.version_neg)) {
+ if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
+ ACLK_SHARED_STATE_UNLOCK;
+ info("Waiting for ACLK Version Negotiation message from Cloud");
+ sleep(1);
+ continue;
+ }
+ errno = 0;
+ error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
+ " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
+ aclk_shared_state.version_neg = ACLK_VERSION_MIN;
+// When ACLK v3 is implemented you will need this
+// aclk_set_rx_handlers(aclk_shared_state.version_neg);
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+
+ aclk_query_process_msgs(info);
+
+ QUERY_THREAD_LOCK;
+
+ if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ sleep_usec(USEC_PER_SEC * 1);
+
+ QUERY_THREAD_UNLOCK;
+ }
+ return NULL;
+}
+
+#define TASK_LEN_MAX 16
+void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client)
+{
+ info("Starting %d query threads.", query_threads->count);
+
+ char thread_name[TASK_LEN_MAX];
+ query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
+ for (int i = 0; i < query_threads->count; i++) {
+ query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
+
+ if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
+ error("snprintf encoding error");
+ netdata_thread_create(
+ &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
+ &query_threads->thread_list[i]);
+
+ query_threads->thread_list[i].client = client;
+ }
+}
+
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
+{
+ if (query_threads && query_threads->thread_list) {
+ for (int i = 0; i < query_threads->count; i++) {
+ netdata_thread_join(query_threads->thread_list[i].thread, NULL);
+ }
+ freez(query_threads->thread_list);
+ }
+ aclk_queue_lock();
+ aclk_queue_flush();
+}
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
new file mode 100644
index 000000000..43741fb32
--- /dev/null
+++ b/aclk/aclk_query.h
@@ -0,0 +1,32 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_QUERY_H
+#define NETDATA_ACLK_QUERY_H
+
+#include "libnetdata/libnetdata.h"
+
+#include "mqtt_wss_client.h"
+
+extern pthread_cond_t query_cond_wait;
+extern pthread_mutex_t query_lock_wait;
+#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
+
+// TODO
+//extern volatile int aclk_connected;
+
+struct aclk_query_thread {
+ netdata_thread_t thread;
+ int idx;
+ mqtt_wss_client client;
+};
+
+struct aclk_query_threads {
+ struct aclk_query_thread *thread_list;
+ int count;
+};
+
+void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
+
+#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
new file mode 100644
index 000000000..c9461b233
--- /dev/null
+++ b/aclk/aclk_query_queue.c
@@ -0,0 +1,128 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_query_queue.h"
+#include "aclk_query.h"
+#include "aclk_stats.h"
+
+static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
+#define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex)
+#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex)
+
+static struct aclk_query_queue {
+ aclk_query_t head;
+ aclk_query_t tail;
+ int block_push;
+} aclk_query_queue = {
+ .head = NULL,
+ .tail = NULL,
+ .block_push = 0
+};
+
+static inline int _aclk_queue_query(aclk_query_t query)
+{
+ query->created = now_realtime_usec();
+ ACLK_QUEUE_LOCK;
+ if (aclk_query_queue.block_push) {
+ ACLK_QUEUE_UNLOCK;
+ if(!netdata_exit)
+ error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
+ aclk_query_free(query);
+ return 1;
+ }
+ if (!aclk_query_queue.head) {
+ aclk_query_queue.head = query;
+ aclk_query_queue.tail = query;
+ ACLK_QUEUE_UNLOCK;
+ return 0;
+ }
+ // TODO deduplication
+ aclk_query_queue.tail->next = query;
+ aclk_query_queue.tail = query;
+ ACLK_QUEUE_UNLOCK;
+ return 0;
+
+}
+
+int aclk_queue_query(aclk_query_t query)
+{
+ int ret = _aclk_queue_query(query);
+ if (!ret) {
+ QUERY_THREAD_WAKEUP;
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_queued++;
+ ACLK_STATS_UNLOCK;
+ }
+ }
+ return ret;
+}
+
+aclk_query_t aclk_queue_pop(void)
+{
+ aclk_query_t ret;
+
+ ACLK_QUEUE_LOCK;
+ if (aclk_query_queue.block_push) {
+ ACLK_QUEUE_UNLOCK;
+ if(!netdata_exit)
+ error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
+ return NULL;
+ }
+
+ ret = aclk_query_queue.head;
+ if (!ret) {
+ ACLK_QUEUE_UNLOCK;
+ return ret;
+ }
+
+ aclk_query_queue.head = ret->next;
+ if (unlikely(!aclk_query_queue.head))
+ aclk_query_queue.tail = aclk_query_queue.head;
+ ACLK_QUEUE_UNLOCK;
+
+ ret->next = NULL;
+ return ret;
+}
+
+void aclk_queue_flush(void)
+{
+ aclk_query_t query = aclk_queue_pop();
+ while (query) {
+ aclk_query_free(query);
+ query = aclk_queue_pop();
+ };
+}
+
+aclk_query_t aclk_query_new(aclk_query_type_t type)
+{
+ aclk_query_t query = callocz(1, sizeof(struct aclk_query));
+ query->type = type;
+ return query;
+}
+
+void aclk_query_free(aclk_query_t query)
+{
+ if (query->type == HTTP_API_V2) {
+ freez(query->data.http_api_v2.payload);
+ if (query->data.http_api_v2.query != query->dedup_id)
+ freez(query->data.http_api_v2.query);
+ }
+
+ if (query->type == CHART_NEW)
+ freez(query->data.chart_add_del.chart_name);
+
+ if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update)
+ json_object_put(query->data.alarm_update);
+
+ freez(query->dedup_id);
+ freez(query->callback_topic);
+ freez(query->msg_id);
+ freez(query);
+}
+
+void aclk_queue_lock(void)
+{
+ ACLK_QUEUE_LOCK;
+ aclk_query_queue.block_push = 1;
+ ACLK_QUEUE_UNLOCK;
+}
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
new file mode 100644
index 000000000..c46513567
--- /dev/null
+++ b/aclk/aclk_query_queue.h
@@ -0,0 +1,71 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_QUERY_QUEUE_H
+#define NETDATA_ACLK_QUERY_QUEUE_H
+
+#include "libnetdata/libnetdata.h"
+#include "../daemon/common.h"
+
+typedef enum {
+ UNKNOWN,
+ METADATA_INFO,
+ METADATA_ALARMS,
+ HTTP_API_V2,
+ CHART_NEW,
+ CHART_DEL,
+ ALARM_STATE_UPDATE
+} aclk_query_type_t;
+
+struct aclk_query_metadata {
+ RRDHOST *host;
+ int initial_on_connect;
+};
+
+struct aclk_query_chart_add_del {
+ RRDHOST *host;
+ char* chart_name;
+};
+
+struct aclk_query_http_api_v2 {
+ char *payload;
+ char *query;
+};
+
+typedef struct aclk_query *aclk_query_t;
+struct aclk_query {
+ aclk_query_type_t type;
+
+ // dedup_id is used to deduplicate queries in the list
+ // if type and dedup_id is the same message is deduplicated
+ // set dedup_id to NULL to never deduplicate the message
+ // set dedup_id to constant (e.g. empty string "") to make
+ // message of this type ever exist only once in the list
+ char *dedup_id;
+ char *callback_topic;
+ char *msg_id;
+
+ usec_t created;
+
+ aclk_query_t next;
+
+ // TODO maybe remove?
+ int version;
+ union {
+ struct aclk_query_metadata metadata_info;
+ struct aclk_query_metadata metadata_alarms;
+ struct aclk_query_http_api_v2 http_api_v2;
+ struct aclk_query_chart_add_del chart_add_del;
+ json_object *alarm_update;
+ } data;
+};
+
+aclk_query_t aclk_query_new(aclk_query_type_t type);
+void aclk_query_free(aclk_query_t query);
+
+int aclk_queue_query(aclk_query_t query);
+aclk_query_t aclk_queue_pop(void);
+void aclk_queue_flush(void);
+
+void aclk_queue_lock(void);
+
+#endif /* NETDATA_ACLK_QUERY_QUEUE_H */
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
new file mode 100644
index 000000000..fcb8d9968
--- /dev/null
+++ b/aclk/aclk_rx_msgs.c
@@ -0,0 +1,343 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_rx_msgs.h"
+
+#include "aclk_stats.h"
+#include "aclk_query_queue.h"
+
+#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
+#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/"
+
+struct aclk_request {
+ char *type_id;
+ char *msg_id;
+ char *callback_topic;
+ char *payload;
+ int version;
+ int min_version;
+ int max_version;
+};
+
+int cloud_to_agent_parse(JSON_ENTRY *e)
+{
+ struct aclk_request *data = e->callback_data;
+
+ switch (e->type) {
+ case JSON_OBJECT:
+ case JSON_ARRAY:
+ break;
+ case JSON_STRING:
+ if (!strcmp(e->name, "msg-id")) {
+ data->msg_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "type")) {
+ data->type_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "callback-topic")) {
+ data->callback_topic = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "payload")) {
+ if (likely(e->data.string)) {
+ size_t len = strlen(e->data.string);
+ data->payload = mallocz(len+1);
+ if (!url_decode_r(data->payload, e->data.string, len + 1))
+ strcpy(data->payload, e->data.string);
+ }
+ break;
+ }
+ break;
+ case JSON_NUMBER:
+ if (!strcmp(e->name, "version")) {
+ data->version = e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "min-version")) {
+ data->min_version = e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "max-version")) {
+ data->max_version = e->data.number;
+ break;
+ }
+
+ break;
+
+ case JSON_BOOLEAN:
+ break;
+
+ case JSON_NULL:
+ break;
+ }
+ return 0;
+}
+
+static inline int aclk_extract_v2_data(char *payload, char **data)
+{
+ char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR);
+ if(!ptr)
+ return 1;
+ ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR);
+ *data = strdupz(ptr);
+ return 0;
+}
+
+static inline int aclk_v2_payload_get_query(const char *payload, char **query_url)
+{
+ const char *start, *end;
+
+ if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) {
+ errno = 0;
+ error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
+ return 1;
+ }
+ start = payload + 4;
+
+ if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) {
+ errno = 0;
+ error("Doesn't look like HTTP GET request.");
+ return 1;
+ }
+
+ *query_url = mallocz((end - start) + 1);
+ strncpyz(*query_url, start, end - start);
+
+ return 0;
+}
+
+#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
+ if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\
+ debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
+ ACLK_SHARED_STATE_UNLOCK;\
+ return 1;\
+ }\
+ ACLK_SHARED_STATE_UNLOCK;
+
+static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ HTTP_CHECK_AGENT_INITIALIZED();
+
+ aclk_query_t query;
+
+ errno = 0;
+ if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
+ error(
+ "This handler cannot reply to request with version older than %d, received %d.",
+ ACLK_V_COMPRESSION,
+ cloud_to_agent->version);
+ return 1;
+ }
+
+ query = aclk_query_new(HTTP_API_V2);
+
+ if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) {
+ error("Error extracting payload expected after the JSON dictionary.");
+ goto error;
+ }
+
+ if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) {
+ error("Could not extract payload from query");
+ goto error;
+ }
+
+ if (unlikely(!cloud_to_agent->callback_topic)) {
+ error("Missing callback_topic");
+ goto error;
+ }
+
+ if (unlikely(!cloud_to_agent->msg_id)) {
+ error("Missing msg_id");
+ goto error;
+ }
+
+ // aclk_queue_query takes ownership of data pointer
+ query->callback_topic = cloud_to_agent->callback_topic;
+ // for clarity and code readability as when we process the request
+ // it would be strange to get URL from `dedup_id`
+ query->data.http_api_v2.query = query->dedup_id;
+ query->msg_id = cloud_to_agent->msg_id;
+ aclk_queue_query(query);
+ return 0;
+
+error:
+ aclk_query_free(query);
+ return 1;
+}
+
+// This handles `version` message from cloud used to negotiate
+// protocol version we will use
+static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ UNUSED(raw_payload);
+ int version = -1;
+ errno = 0;
+
+ if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
+ error(
+ "Unsuported version of \"version\" message from cloud. Expected %d, Got %d",
+ ACLK_VERSION_NEG_VERSION,
+ cloud_to_agent->version);
+ return 1;
+ }
+ if (unlikely(!cloud_to_agent->min_version)) {
+ error("Min version missing or 0");
+ return 1;
+ }
+ if (unlikely(!cloud_to_agent->max_version)) {
+ error("Max version missing or 0");
+ return 1;
+ }
+ if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
+ error(
+ "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version,
+ cloud_to_agent->min_version);
+ return 1;
+ }
+
+ if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
+ error(
+ "Agent too old for this cloud. Minimum version required by cloud %d."
+ " Maximum version supported by this agent %d.",
+ cloud_to_agent->min_version, ACLK_VERSION_MAX);
+ aclk_kill_link = 1;
+ aclk_disable_runtime = 1;
+ return 1;
+ }
+ if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
+ error(
+ "Cloud version is too old for this agent. Maximum version supported by cloud %d."
+ " Minimum (oldest) version supported by this agent %d.",
+ cloud_to_agent->max_version, ACLK_VERSION_MIN);
+ aclk_kill_link = 1;
+ return 1;
+ }
+
+ version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
+
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
+ errno = 0;
+ error("The \"version\" message came too late ignoring.");
+ goto err_cleanup;
+ }
+ if (unlikely(aclk_shared_state.version_neg)) {
+ errno = 0;
+ error("Version has already been set to %d", aclk_shared_state.version_neg);
+ goto err_cleanup;
+ }
+ aclk_shared_state.version_neg = version;
+ ACLK_SHARED_STATE_UNLOCK;
+
+ info("Choosing version %d of ACLK", version);
+
+ aclk_set_rx_handlers(version);
+
+ return 0;
+
+err_cleanup:
+ ACLK_SHARED_STATE_UNLOCK;
+ return 1;
+}
+
+typedef struct aclk_incoming_msg_type{
+ char *name;
+ int(*fnc)(struct aclk_request *, char *);
+}aclk_incoming_msg_type;
+
+aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
+ { .name = "http", .fnc = aclk_handle_cloud_request_v2 },
+ { .name = "version", .fnc = aclk_handle_version_response },
+ { .name = NULL, .fnc = NULL }
+};
+
+struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
+
+void aclk_set_rx_handlers(int version)
+{
+// ACLK_NG ACLK version support starts at 2
+// TODO ACLK v3
+ UNUSED(version);
+ aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
+}
+
+int aclk_handle_cloud_message(char *payload)
+{
+ struct aclk_request cloud_to_agent;
+ memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ if (unlikely(!payload)) {
+ errno = 0;
+ error("ACLK incoming message is empty");
+ goto err_cleanup_nojson;
+ }
+
+ debug(D_ACLK, "ACLK incoming message (%s)", payload);
+
+ int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+
+ if (unlikely(rc != JSON_OK)) {
+ errno = 0;
+ error("Malformed json request (%s)", payload);
+ goto err_cleanup;
+ }
+
+ if (!cloud_to_agent.type_id) {
+ errno = 0;
+ error("Cloud message is missing compulsory key \"type\"");
+ goto err_cleanup;
+ }
+
+ if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
+ error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
+ goto err_cleanup;
+ }
+
+ for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
+ if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
+ if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
+ // in case of success handler is supposed to clean up after itself
+ // or as in the case of aclk_handle_cloud_request take
+ // ownership of the pointers (done to avoid copying)
+ // see what `aclk_queue_query` parameter `internal` does
+
+ // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
+ // msg handlers (namely aclk_handle_version_responce)
+ // can freely change what aclk_incoming_msg_types points to
+ // so either exit or restart this for loop
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
+ goto err_cleanup;
+ }
+ }
+
+ errno = 0;
+ error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
+
+err_cleanup:
+ if (cloud_to_agent.payload)
+ freez(cloud_to_agent.payload);
+ if (cloud_to_agent.type_id)
+ freez(cloud_to_agent.type_id);
+ if (cloud_to_agent.msg_id)
+ freez(cloud_to_agent.msg_id);
+ if (cloud_to_agent.callback_topic)
+ freez(cloud_to_agent.callback_topic);
+
+err_cleanup_nojson:
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ return 1;
+}
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
new file mode 100644
index 000000000..c9f0bd37a
--- /dev/null
+++ b/aclk/aclk_rx_msgs.h
@@ -0,0 +1,14 @@
+
+
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_RX_MSGS_H
+#define ACLK_RX_MSGS_H
+
+#include "../daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+int aclk_handle_cloud_message(char *payload);
+void aclk_set_rx_handlers(int version);
+
+#endif /* ACLK_RX_MSGS_H */
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
new file mode 100644
index 000000000..b61ac05f7
--- /dev/null
+++ b/aclk/aclk_stats.c
@@ -0,0 +1,274 @@
+#include "aclk_stats.h"
+
+netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
+
+int aclk_stats_enabled;
+
+int query_thread_count;
+
+// data ACLK stats need per query thread
+struct aclk_qt_data {
+ RRDDIM *dim;
+} *aclk_qt_data = NULL;
+
+uint32_t *aclk_queries_per_thread = NULL;
+uint32_t *aclk_queries_per_thread_sample = NULL;
+
+struct aclk_metrics aclk_metrics = {
+ .online = 0,
+};
+
+struct aclk_metrics_per_sample aclk_metrics_per_sample;
+
+static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent)
+{
+ static RRDSET *st_aclkstats = NULL;
+ static RRDDIM *rd_online_status = NULL;
+
+ if (unlikely(!st_aclkstats)) {
+ st_aclkstats = rrdset_create_localhost(
+ "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status",
+ "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st_aclkstats);
+
+ rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online);
+
+ rrdset_done(st_aclkstats);
+}
+
+static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st_query_thread = NULL;
+ static RRDDIM *rd_queued = NULL;
+ static RRDDIM *rd_dispatched = NULL;
+
+ if (unlikely(!st_query_thread)) {
+ st_query_thread = rrdset_create_localhost(
+ "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s",
+ "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA);
+
+ rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st_query_thread);
+
+ rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued);
+ rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched);
+
+ rrdset_done(st_query_thread);
+}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_avg = NULL;
+ static RRDDIM *rd_max = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms",
+ "netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+ if(per_sample->latency_count)
+ rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count));
+ else
+ rrddim_set_by_pointer(st, rd_avg, 0);
+
+ rrddim_set_by_pointer(st, rd_max, per_sample->latency_max);
+
+ rrdset_done(st);
+}
+#endif
+
+static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_rcvd = NULL;
+ static RRDDIM *rd_rq_err = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
+ "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err);
+ rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
+
+ rrdset_done(st);
+}
+
+#define MAX_DIM_NAME 16
+static void aclk_stats_query_threads(uint32_t *queries_per_thread)
+{
+ static RRDSET *st = NULL;
+
+ char dim_name[MAX_DIM_NAME];
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
+ "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < query_thread_count; i++) {
+ if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
+ error("snprintf encoding error");
+ aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+ } else
+ rrdset_next(st);
+
+ for (int i = 0; i < query_thread_count; i++) {
+ rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
+ }
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_avg = NULL;
+ static RRDDIM *rd_rq_max = NULL;
+ static RRDDIM *rd_rq_total = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us",
+ "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ if(per_sample->cloud_q_process_count)
+ rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count));
+ else
+ rrddim_set_by_pointer(st, rd_rq_avg, 0);
+ rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max);
+ rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total);
+
+ rrdset_done(st);
+}
+
+void aclk_stats_thread_cleanup()
+{
+ freez(aclk_qt_data);
+ freez(aclk_queries_per_thread);
+ freez(aclk_queries_per_thread_sample);
+}
+
+void *aclk_stats_main_thread(void *ptr)
+{
+ struct aclk_stats_thread *args = ptr;
+
+ query_thread_count = args->query_thread_count;
+ aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
+ aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
+ aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+ usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
+
+ memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+ struct aclk_metrics_per_sample per_sample;
+ struct aclk_metrics permanent;
+
+ while (!netdata_exit) {
+ netdata_thread_testcancel();
+ // ------------------------------------------------------------------------
+ // Wait for the next iteration point.
+
+ heartbeat_next(&hb, step_ut);
+ if (netdata_exit) break;
+
+ ACLK_STATS_LOCK;
+ // to not hold lock longer than necessary, especially not to hold it
+ // during database rrd* operations
+ memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
+ memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
+ memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+ memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
+ memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
+ ACLK_STATS_UNLOCK;
+
+ aclk_stats_collect(&per_sample, &permanent);
+ aclk_stats_query_queue(&per_sample);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_latency(&per_sample);
+#endif
+
+ aclk_stats_cloud_req(&per_sample);
+ aclk_stats_query_threads(aclk_queries_per_thread_sample);
+
+ aclk_stats_query_time(&per_sample);
+ }
+
+ return 0;
+}
+
+void aclk_stats_upd_online(int online) {
+ if(!aclk_stats_enabled)
+ return;
+
+ ACLK_STATS_LOCK;
+ aclk_metrics.online = online;
+
+ if(!online)
+ aclk_metrics_per_sample.offline_during_sample = 1;
+ ACLK_STATS_UNLOCK;
+}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+static usec_t pub_time[UINT16_MAX];
+void aclk_stats_msg_published(uint16_t id)
+{
+ ACLK_STATS_LOCK;
+ pub_time[id] = now_boottime_usec();
+ ACLK_STATS_UNLOCK;
+}
+
+void aclk_stats_msg_puback(uint16_t id)
+{
+ ACLK_STATS_LOCK;
+ usec_t t;
+
+ if (!aclk_stats_enabled) {
+ ACLK_STATS_UNLOCK;
+ return;
+ }
+
+ if (unlikely(!pub_time[id])) {
+ ACLK_STATS_UNLOCK;
+ error("Received PUBACK for unknown message?!");
+ return;
+ }
+
+ t = now_boottime_usec() - pub_time[id];
+ t /= USEC_PER_MS;
+ pub_time[id] = 0;
+ if (aclk_metrics_per_sample.latency_max < t)
+ aclk_metrics_per_sample.latency_max = t;
+
+ aclk_metrics_per_sample.latency_total += t;
+ aclk_metrics_per_sample.latency_count++;
+ ACLK_STATS_UNLOCK;
+}
+#endif /* NETDATA_INTERNAL_CHECKS */
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
new file mode 100644
index 000000000..33d016965
--- /dev/null
+++ b/aclk/aclk_stats.h
@@ -0,0 +1,64 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_STATS_H
+#define NETDATA_ACLK_STATS_H
+
+#include "../daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
+
+extern netdata_mutex_t aclk_stats_mutex;
+
+#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
+#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
+
+extern int aclk_stats_enabled;
+
+struct aclk_stats_thread {
+ netdata_thread_t *thread;
+ int query_thread_count;
+};
+
+// preserve between samples
+struct aclk_metrics {
+ volatile uint8_t online;
+};
+
+// reset to 0 on every sample
+extern struct aclk_metrics_per_sample {
+ /* in the unlikely event of ACLK disconnecting
+ and reconnecting under 1 sampling rate
+ we want to make sure we record the disconnection
+ despite it being then seemingly longer in graph */
+ volatile uint8_t offline_during_sample;
+
+ volatile uint32_t queries_queued;
+ volatile uint32_t queries_dispatched;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ volatile uint32_t latency_max;
+ volatile uint32_t latency_total;
+ volatile uint32_t latency_count;
+#endif
+
+ volatile uint32_t cloud_req_recvd;
+ volatile uint32_t cloud_req_err;
+
+ volatile uint32_t cloud_q_process_total;
+ volatile uint32_t cloud_q_process_count;
+ volatile uint32_t cloud_q_process_max;
+} aclk_metrics_per_sample;
+
+extern uint32_t *aclk_queries_per_thread;
+
+void *aclk_stats_main_thread(void *ptr);
+void aclk_stats_thread_cleanup();
+void aclk_stats_upd_online(int online);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+void aclk_stats_msg_published(uint16_t id);
+void aclk_stats_msg_puback(uint16_t id);
+#endif /* NETDATA_INTERNAL_CHECKS */
+
+#endif /* NETDATA_ACLK_STATS_H */
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
new file mode 100644
index 000000000..158fc4e26
--- /dev/null
+++ b/aclk/aclk_tx_msgs.c
@@ -0,0 +1,395 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_tx_msgs.h"
+#include "../daemon/common.h"
+#include "aclk_util.h"
+#include "aclk_stats.h"
+
+#ifndef __GNUC__
+#pragma region aclk_tx_msgs helper functions
+#endif
+
+static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
+{
+ uint16_t packet_id;
+ const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+
+ mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 1024
+ char filename[FN_MAX_LEN];
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
+ json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
+#endif
+}
+
+static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
+{
+ uint16_t packet_id;
+ const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+
+ mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 1024
+ char filename[FN_MAX_LEN];
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
+ json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
+#endif
+ return packet_id;
+}
+
+/* UNUSED now but can be used soon MVP1?
+static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
+{
+ if (unlikely(!topic || topic[0] != '/')) {
+ error ("Full topic required!");
+ return;
+ }
+
+ const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+
+ mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published();
+#endif
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 1024
+ char filename[FN_MAX_LEN];
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
+ json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
+#endif
+}
+*/
+
+#define TOPIC_MAX_LEN 512
+#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
+static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
+{
+ uint16_t packet_id;
+ const char *str;
+ char *full_msg;
+ int len;
+
+ if (unlikely(!topic || topic[0] != '/')) {
+ error ("Full topic required!");
+ return;
+ }
+
+ str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+ len = strlen(str);
+
+ full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
+
+ memcpy(full_msg, str, len);
+ memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
+ len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
+ memcpy(&full_msg[len], payload, payload_len);
+ len += payload_len;
+
+/* TODO
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 1024
+ char filename[FN_MAX_LEN];
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
+ json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
+#endif */
+
+ mqtt_wss_publish_pid(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+ freez(full_msg);
+}
+
+/*
+ * Creates universal header common for all ACLK messages. User gets ownership of json object created.
+ * Usually this is freed by send function after message has been sent.
+ */
+static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
+{
+ uuid_t uuid;
+ char uuid_str[36 + 1];
+ json_object *tmp;
+ json_object *obj = json_object_new_object();
+
+ tmp = json_object_new_string(type);
+ json_object_object_add(obj, "type", tmp);
+
+ if (unlikely(!msg_id)) {
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+ msg_id = uuid_str;
+ }
+
+ if (ts_secs == 0) {
+ ts_us = now_realtime_usec();
+ ts_secs = ts_us / USEC_PER_SEC;
+ ts_us = ts_us % USEC_PER_SEC;
+ }
+
+ tmp = json_object_new_string(msg_id);
+ json_object_object_add(obj, "msg-id", tmp);
+
+ tmp = json_object_new_int64(ts_secs);
+ json_object_object_add(obj, "timestamp", tmp);
+
+// TODO handle this somehow on older json-c
+// tmp = json_object_new_uint64(ts_us);
+// probably jso->_to_json_strinf -> custom function
+// jso->o.c_uint64 -> map this with pointer to signed int
+// commit that implements json_object_new_uint64 is 3c3b592
+// between 0.14 and 0.15
+ tmp = json_object_new_int64(ts_us);
+ json_object_object_add(obj, "timestamp-offset-usec", tmp);
+
+ tmp = json_object_new_int64(aclk_session_sec);
+ json_object_object_add(obj, "connect", tmp);
+
+// TODO handle this somehow see above
+// tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
+ tmp = json_object_new_int64(aclk_session_us);
+ json_object_object_add(obj, "connect-offset-usec", tmp);
+
+ tmp = json_object_new_int(version);
+ json_object_object_add(obj, "version", tmp);
+
+ return obj;
+}
+
+static char *create_uuid()
+{
+ uuid_t uuid;
+ char *uuid_str = mallocz(36 + 1);
+
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+
+ return uuid_str;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+#ifndef __GNUC__
+#pragma region aclk_tx_msgs message generators
+#endif
+
+/*
+ * This will send the /api/v1/info
+ */
+#define BUFFER_INITIAL_SIZE (1024 * 16)
+void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
+{
+ BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
+ json_object *msg, *payload, *tmp;
+
+ char *msg_id = create_uuid();
+ buffer_flush(local_buffer);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+ if (metadata_submitted)
+ msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg);
+ else
+ msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+
+ payload = json_object_new_object();
+ json_object_object_add(msg, "payload", payload);
+
+ web_client_api_request_v1_info_fill_buffer(host, local_buffer);
+ tmp = json_tokener_parse(local_buffer->buffer);
+ json_object_object_add(payload, "info", tmp);
+
+ buffer_flush(local_buffer);
+
+ charts2json(host, local_buffer, 1, 0);
+ tmp = json_tokener_parse(local_buffer->buffer);
+ json_object_object_add(payload, "charts", tmp);
+
+ aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
+
+ json_object_put(msg);
+ freez(msg_id);
+ buffer_free(local_buffer);
+}
+
+// TODO should include header instead
+void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
+
+void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
+{
+ BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
+ json_object *msg, *payload, *tmp;
+
+ char *msg_id = create_uuid();
+ buffer_flush(local_buffer);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ // on_connect messages are sent on a health reload, if the on_connect message is real then we
+ // use the session time as the fake timestamp to indicate that it starts the session. If it is
+ // a fake on_connect message then use the real timestamp to indicate it is within the existing
+ // session.
+
+ if (metadata_submitted)
+ msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
+ else
+ msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+
+ payload = json_object_new_object();
+ json_object_object_add(msg, "payload", payload);
+
+ health_alarms2json(localhost, local_buffer, 1);
+ tmp = json_tokener_parse(local_buffer->buffer);
+ json_object_object_add(payload, "configured-alarms", tmp);
+
+ buffer_flush(local_buffer);
+
+ health_active_log_alarms_2json(localhost, local_buffer);
+ tmp = json_tokener_parse(local_buffer->buffer);
+ json_object_object_add(payload, "alarms-active", tmp);
+
+ aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
+
+ json_object_put(msg);
+ freez(msg_id);
+ buffer_free(local_buffer);
+}
+
+void aclk_hello_msg(mqtt_wss_client client)
+{
+ json_object *tmp, *msg;
+
+ char *msg_id = create_uuid();
+
+ ACLK_SHARED_STATE_LOCK;
+ aclk_shared_state.version_neg = 0;
+ aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
+ ACLK_SHARED_STATE_UNLOCK;
+
+ //Hello message is versioned separatelly from the rest of the protocol
+ msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
+
+ tmp = json_object_new_int(ACLK_VERSION_MIN);
+ json_object_object_add(msg, "min-version", tmp);
+
+ tmp = json_object_new_int(ACLK_VERSION_MAX);
+ json_object_object_add(msg, "max-version", tmp);
+
+#ifdef ACLK_NG
+ tmp = json_object_new_string("Next Generation");
+#else
+ tmp = json_object_new_string("Legacy");
+#endif
+ json_object_object_add(msg, "aclk-implementation", tmp);
+
+ aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
+
+ json_object_put(msg);
+ freez(msg_id);
+}
+
+void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
+{
+ json_object *tmp, *msg;
+
+ msg = create_hdr("http", msg_id, 0, 0, 2);
+
+ tmp = json_object_new_int64(t_exec);
+ json_object_object_add(msg, "t-exec", tmp);
+
+ tmp = json_object_new_int64(created);
+ json_object_object_add(msg, "t-rx", tmp);
+
+ tmp = json_object_new_int(http_code);
+ json_object_object_add(msg, "http-code", tmp);
+
+ aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
+ json_object_put(msg);
+}
+
+void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
+{
+ json_object *msg, *payload;
+ BUFFER *tmp_buffer;
+ RRDSET *st;
+
+ st = rrdset_find(host, chart);
+ if (!st)
+ st = rrdset_find_byname(host, chart);
+ if (!st) {
+ info("FAILED to find chart %s", chart);
+ return;
+ }
+
+ tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
+ rrdset2json(st, tmp_buffer, NULL, NULL, 1);
+ payload = json_tokener_parse(tmp_buffer->buffer);
+ if (!payload) {
+ error("Failed to parse JSON from rrdset2json");
+ buffer_free(tmp_buffer);
+ return;
+ }
+
+ msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg);
+ json_object_object_add(msg, "payload", payload);
+
+ aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
+
+ buffer_free(tmp_buffer);
+ json_object_put(msg);
+}
+
+void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
+{
+ // we create header here on purpose (and not send message with it already as `msg` param)
+ // one is version_neg is guaranteed to be done here
+ // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
+ // send message with timestamps already to Query Queue they would be incorrect at time
+ // when query queue would get to send them)
+ json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg);
+ json_object_object_add(obj, "payload", msg);
+
+ aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
+ json_object_put(obj);
+}
+
+/*
+ * Will generate disconnect message.
+ * @param message if NULL it will generate LWT message (unexpected).
+ * Otherwise string pointed to by this parameter will be used as
+ * reason.
+ */
+json_object *aclk_generate_disconnect(const char *message)
+{
+ json_object *tmp, *msg;
+
+ msg = create_hdr("disconnect", NULL, 0, 0, 2);
+
+ tmp = json_object_new_string(message ? message : "unexpected");
+ json_object_object_add(msg, "payload", tmp);
+
+ return msg;
+}
+
+int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
+{
+ int pid;
+ json_object *msg = aclk_generate_disconnect(message);
+ pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
+ json_object_put(msg);
+ return pid;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
new file mode 100644
index 000000000..cb4d44c96
--- /dev/null
+++ b/aclk/aclk_tx_msgs.h
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_TX_MSGS_H
+#define ACLK_TX_MSGS_H
+
+#include <json-c/json.h>
+#include "libnetdata/libnetdata.h"
+#include "../daemon/common.h"
+#include "mqtt_wss_client.h"
+
+void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
+void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
+
+void aclk_hello_msg(mqtt_wss_client client);
+
+void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
+
+void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart);
+
+void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg);
+
+json_object *aclk_generate_disconnect(const char *message);
+int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message);
+
+#endif
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
new file mode 100644
index 000000000..a5347c466
--- /dev/null
+++ b/aclk/aclk_util.c
@@ -0,0 +1,347 @@
+#include "aclk_util.h"
+
+#include <stdio.h>
+
+#include "../daemon/common.h"
+
+// CentOS 7 has older version that doesn't define this
+// same goes for MacOS
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN 37
+#endif
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+volatile int aclk_conversation_log_counter = 0;
+#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
+netdata_mutex_t aclk_conversation_log_mutex = NETDATA_MUTEX_INITIALIZER;
+int aclk_get_conv_log_next()
+{
+ int ret;
+ netdata_mutex_lock(&aclk_conversation_log_mutex);
+ ret = aclk_conversation_log_counter++;
+ netdata_mutex_unlock(&aclk_conversation_log_mutex);
+ return ret;
+}
+#endif
+#endif
+
+#define ACLK_TOPIC_PREFIX "/agent/"
+
+struct aclk_topic {
+ const char *topic_suffix;
+ char *topic;
+};
+
+// This helps to cache finalized topics (assembled with claim_id)
+// to not have to alloc or create buffer and construct topic every
+// time message is sent as in old ACLK
+static struct aclk_topic aclk_topic_cache[] = {
+ { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_CHART
+ { .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS
+ { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_METADATA
+ { .topic_suffix = "inbound/cmd", .topic = NULL }, // ACLK_TOPICID_COMMAND
+ { .topic_suffix = NULL, .topic = NULL }
+};
+
+void free_topic_cache(void)
+{
+ struct aclk_topic *tc = aclk_topic_cache;
+ while (tc->topic_suffix) {
+ if (tc->topic) {
+ freez(tc->topic);
+ tc->topic = NULL;
+ }
+ tc++;
+ }
+}
+
+static inline void generate_topic_cache(void)
+{
+ struct aclk_topic *tc = aclk_topic_cache;
+ char *ptr;
+ if (unlikely(!tc->topic)) {
+ rrdhost_aclk_state_lock(localhost);
+ while(tc->topic_suffix) {
+ tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix));
+ ptr = tc->topic;
+ strcpy(ptr, ACLK_TOPIC_PREFIX);
+ ptr += strlen(ACLK_TOPIC_PREFIX);
+ strcpy(ptr, localhost->aclk_state.claimed_id);
+ ptr += (UUID_STR_LEN - 1);
+ *ptr++ = '/';
+ strcpy(ptr, tc->topic_suffix);
+ tc++;
+ }
+ rrdhost_aclk_state_unlock(localhost);
+ }
+}
+
+/*
+ * Build a topic based on sub_topic and final_topic
+ * if the sub topic starts with / assume that is an absolute topic
+ *
+ */
+const char *aclk_get_topic(enum aclk_topics topic)
+{
+ generate_topic_cache();
+
+ return aclk_topic_cache[topic].topic;
+}
+
+int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
+{
+ int pos = 0;
+ if (!strncmp("https://", url, 8)) {
+ pos = 8;
+ } else if (!strncmp("http://", url, 7)) {
+ error("Cannot connect ACLK over %s -> unencrypted link is not supported", url);
+ return 1;
+ }
+ int host_end = pos;
+ while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':')
+ host_end++;
+ if (url[host_end] == 0) {
+ *aclk_hostname = strdupz(url + pos);
+ *aclk_port = 443;
+ info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
+ return 0;
+ }
+ if (url[host_end] == ':') {
+ *aclk_hostname = callocz(host_end - pos + 1, 1);
+ strncpy(*aclk_hostname, url + pos, host_end - pos);
+ int port_end = host_end + 1;
+ while (url[port_end] >= '0' && url[port_end] <= '9')
+ port_end++;
+ if (port_end - host_end > 6) {
+ error("Port specified in %s is invalid", url);
+ freez(*aclk_hostname);
+ *aclk_hostname = NULL;
+ return 1;
+ }
+ *aclk_port = atoi(&url[host_end+1]);
+ }
+ if (url[host_end] == '/') {
+ *aclk_port = 443;
+ *aclk_hostname = callocz(1, host_end - pos + 1);
+ strncpy(*aclk_hostname, url+pos, host_end - pos);
+ }
+ info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
+ return 0;
+}
+
+/*
+ * TBEB with randomness
+ *
+ * @param mode 0 - to reset the delay,
+ * 1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ * @returns delay in ms
+ *
+ */
+#define ACLK_MAX_BACKOFF_DELAY 1024
+unsigned long int aclk_reconnect_delay(int mode)
+{
+ static int fail = -1;
+ unsigned long int delay;
+
+ if (!mode || fail == -1) {
+ srandom(time(NULL));
+ fail = mode - 1;
+ return 0;
+ }
+
+ delay = (1 << fail);
+
+ if (delay >= ACLK_MAX_BACKOFF_DELAY) {
+ delay = ACLK_MAX_BACKOFF_DELAY * 1000;
+ } else {
+ fail++;
+ delay *= 1000;
+ delay += (random() % (MAX(1000, delay/2)));
+ }
+
+ return delay;
+}
+
+#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
+#define ACLK_PROXY_ENV "env"
+#define ACLK_PROXY_CONFIG_VAR "proxy"
+
+struct {
+ ACLK_PROXY_TYPE type;
+ const char *url_str;
+} supported_proxy_types[] = {
+ { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
+};
+
+const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
+{
+ switch (*type) {
+ case PROXY_DISABLED:
+ return "disabled";
+ case PROXY_TYPE_HTTP:
+ return "HTTP";
+ case PROXY_TYPE_SOCKS5:
+ return "SOCKS";
+ default:
+ return "Unknown";
+ }
+}
+
+static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
+{
+ int i = 0;
+ while (supported_proxy_types[i].url_str) {
+ if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
+ return supported_proxy_types[i].type;
+ i++;
+ }
+ return PROXY_TYPE_UNKNOWN;
+}
+
+ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
+{
+ if (!string)
+ return PROXY_TYPE_UNKNOWN;
+
+ while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove)
+ string++;
+
+ if (!*string)
+ return PROXY_TYPE_UNKNOWN;
+
+ return aclk_find_proxy(string);
+}
+
+// helper function to censor user&password
+// for logging purposes
+void safe_log_proxy_censor(char *proxy)
+{
+ size_t length = strlen(proxy);
+ char *auth = proxy + length - 1;
+ char *cur;
+
+ while ((auth >= proxy) && (*auth != '@'))
+ auth--;
+
+ //if not found or @ is first char do nothing
+ if (auth <= proxy)
+ return;
+
+ cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+ if (!cur)
+ cur = proxy;
+ else
+ cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+
+ while (cur < auth) {
+ *cur = 'X';
+ cur++;
+ }
+}
+
+static inline void safe_log_proxy_error(char *str, const char *proxy)
+{
+ char *log = strdupz(proxy);
+ safe_log_proxy_censor(log);
+ error("%s Provided Value:\"%s\"", str, log);
+ freez(log);
+}
+
+static inline int check_socks_enviroment(const char **proxy)
+{
+ char *tmp = getenv("socks_proxy");
+
+ if (!tmp)
+ return 1;
+
+ if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
+ *proxy = tmp;
+ return 0;
+ }
+
+ safe_log_proxy_error(
+ "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
+ tmp);
+ return 1;
+}
+
+static inline int check_http_enviroment(const char **proxy)
+{
+ char *tmp = getenv("http_proxy");
+
+ if (!tmp)
+ return 1;
+
+ if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
+ *proxy = tmp;
+ return 0;
+ }
+
+ safe_log_proxy_error(
+ "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
+ tmp);
+ return 1;
+}
+
+const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
+{
+ const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
+ *type = PROXY_DISABLED;
+
+ if (strcmp(proxy, "none") == 0)
+ return proxy;
+
+ if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
+ if (check_socks_enviroment(&proxy) == 0) {
+#ifdef LWS_WITH_SOCKS5
+ *type = PROXY_TYPE_SOCKS5;
+ return proxy;
+#else
+ safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
+ "but Libwebsockets used doesn't have SOCKS5 support built in. "
+ "Ignoring and checking for other options.",
+ proxy);
+#endif
+ }
+ if (check_http_enviroment(&proxy) == 0)
+ *type = PROXY_TYPE_HTTP;
+ return proxy;
+ }
+
+ *type = aclk_verify_proxy(proxy);
+#ifndef LWS_WITH_SOCKS5
+ if (*type == PROXY_TYPE_SOCKS5) {
+ safe_log_proxy_error(
+ "Config var \"" ACLK_PROXY_CONFIG_VAR
+ "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
+ proxy);
+ }
+#endif
+ if (*type == PROXY_TYPE_UNKNOWN) {
+ *type = PROXY_DISABLED;
+ safe_log_proxy_error(
+ "Config var \"" ACLK_PROXY_CONFIG_VAR
+ "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
+ proxy);
+ }
+
+ return proxy;
+}
+
+// helper function to read settings only once (static)
+// as claiming, challenge/response and ACLK
+// read the same thing, no need to parse again
+const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
+{
+ static const char *proxy = NULL;
+ static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
+
+ if (proxy_type == PROXY_NOT_SET)
+ proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
+
+ *type = proxy_type;
+ return proxy;
+}
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
new file mode 100644
index 000000000..c72329791
--- /dev/null
+++ b/aclk/aclk_util.h
@@ -0,0 +1,52 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_UTIL_H
+#define ACLK_UTIL_H
+
+#include "libnetdata/libnetdata.h"
+
+// Helper stuff which should not have any further inside ACLK dependency
+// and are supposed not to be needed outside of ACLK
+
+int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
+
+enum aclk_topics {
+ ACLK_TOPICID_CHART = 0,
+ ACLK_TOPICID_ALARMS = 1,
+ ACLK_TOPICID_METADATA = 2,
+ ACLK_TOPICID_COMMAND = 3,
+};
+
+const char *aclk_get_topic(enum aclk_topics topic);
+void free_topic_cache(void);
+// TODO
+// aclk_topics_reload //when claim id changes
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+extern volatile int aclk_conversation_log_counter;
+#if defined(HAVE_C___ATOMIC) && !defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
+#define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST)
+#else
+extern netdata_mutex_t aclk_conversation_log_mutex;
+int aclk_get_conv_log_next();
+#define ACLK_GET_CONV_LOG_NEXT() aclk_get_conv_log_next()
+#endif
+#endif
+
+unsigned long int aclk_reconnect_delay(int mode);
+
+typedef enum aclk_proxy_type {
+ PROXY_TYPE_UNKNOWN = 0,
+ PROXY_TYPE_SOCKS5,
+ PROXY_TYPE_HTTP,
+ PROXY_DISABLED,
+ PROXY_NOT_SET,
+} ACLK_PROXY_TYPE;
+
+const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
+ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
+const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
+void safe_log_proxy_censor(char *proxy);
+int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
+const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
+
+#endif /* ACLK_UTIL_H */
diff --git a/aclk/https_client.c b/aclk/https_client.c
new file mode 100644
index 000000000..1b9546d77
--- /dev/null
+++ b/aclk/https_client.c
@@ -0,0 +1,246 @@
+#include "libnetdata/libnetdata.h"
+
+#include "https_client.h"
+
+#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h"
+
+enum http_parse_state {
+ HTTP_PARSE_INITIAL = 0,
+ HTTP_PARSE_HEADERS,
+ HTTP_PARSE_CONTENT
+};
+
+typedef struct {
+ enum http_parse_state state;
+ int content_length;
+ int http_code;
+} http_parse_ctx;
+
+#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 }
+
+#define NEED_MORE_DATA 0
+#define PARSE_SUCCESS 1
+#define PARSE_ERROR -1
+#define HTTP_LINE_TERM "\x0D\x0A"
+#define RESP_PROTO "HTTP/1.1 "
+#define HTTP_KEYVAL_SEPARATOR ": "
+#define HTTP_HDR_BUFFER_SIZE 256
+#define PORT_STR_MAX_BYTES 7
+
+static void process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const char *val)
+{
+ // currently we care only about content-length
+ // but in future the way this is written
+ // it can be extended
+ if (!strcmp("content-length", key)) {
+ parse_ctx->content_length = atoi(val);
+ }
+}
+
+static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx)
+{
+ int idx, idx_end;
+ char buf_key[HTTP_HDR_BUFFER_SIZE];
+ char buf_val[HTTP_HDR_BUFFER_SIZE];
+ char *ptr = buf_key;
+ if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx_end)) {
+ error("CRLF expected");
+ return 1;
+ }
+
+ char *separator = rbuf_find_bytes(buf, HTTP_KEYVAL_SEPARATOR, strlen(HTTP_KEYVAL_SEPARATOR), &idx);
+ if (!separator) {
+ error("Missing Key/Value separator");
+ return 1;
+ }
+ if (idx >= HTTP_HDR_BUFFER_SIZE) {
+ error("Key name is too long");
+ return 1;
+ }
+
+ rbuf_pop(buf, buf_key, idx);
+ buf_key[idx] = 0;
+
+ rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR));
+ idx_end -= strlen(HTTP_KEYVAL_SEPARATOR) + idx;
+ if (idx_end >= HTTP_HDR_BUFFER_SIZE) {
+ error("Value of key \"%s\" too long", buf_key);
+ return 1;
+ }
+
+ rbuf_pop(buf, buf_val, idx_end);
+ buf_val[idx_end] = 0;
+
+ rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR));
+
+ for (ptr = buf_key; *ptr; ptr++)
+ *ptr = tolower(*ptr);
+
+ process_http_hdr(parse_ctx, buf_key, buf_val);
+
+ return 0;
+}
+
+static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
+{
+ int idx;
+ char rc[4];
+
+ do {
+ if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx))
+ return NEED_MORE_DATA;
+ switch (parse_ctx->state) {
+ case HTTP_PARSE_INITIAL:
+ if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) {
+ error("Expected response to start with \"%s\"", RESP_PROTO);
+ return PARSE_ERROR;
+ }
+ rbuf_bump_tail(buf, strlen(RESP_PROTO));
+ if (rbuf_pop(buf, rc, 4) != 4) {
+ error("Expected HTTP status code");
+ return PARSE_ERROR;
+ }
+ if (rc[3] != ' ') {
+ error("Expected space after HTTP return code");
+ return PARSE_ERROR;
+ }
+ rc[3] = 0;
+ parse_ctx->http_code = atoi(rc);
+ if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) {
+ error("HTTP code not in range 100 to 599");
+ return PARSE_ERROR;
+ }
+
+ rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
+
+ rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
+
+ parse_ctx->state = HTTP_PARSE_HEADERS;
+ break;
+ case HTTP_PARSE_HEADERS:
+ if (!idx) {
+ parse_ctx->state = HTTP_PARSE_CONTENT;
+ rbuf_bump_tail(buf, strlen(HTTP_LINE_TERM));
+ break;
+ }
+ if (parse_http_hdr(buf, parse_ctx))
+ return PARSE_ERROR;
+ rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
+ rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
+ break;
+ case HTTP_PARSE_CONTENT:
+ if (parse_ctx->content_length < 0) {
+ error("content-length missing and http headers ended");
+ return PARSE_ERROR;
+ }
+ if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length)
+ return PARSE_SUCCESS;
+ return NEED_MORE_DATA;
+ }
+ } while(1);
+}
+
+int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload)
+{
+ struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
+ char sport[PORT_STR_MAX_BYTES];
+ size_t len = 0;
+ int rc = 1;
+ int ret;
+ char *ptr;
+ http_parse_ctx parse_ctx = HTTP_PARSE_CTX_INITIALIZER;
+
+ rbuf_t buffer = rbuf_create(b_size);
+ if (!buffer)
+ return 1;
+
+ snprintf(sport, PORT_STR_MAX_BYTES, "%d", port);
+
+ if (payload != NULL)
+ len = strlen(payload);
+
+ snprintf(
+ b,
+ b_size,
+ "%s %s HTTP/1.1\r\nHost: %s\r\nAccept: application/json\r\nContent-length: %zu\r\nAccept-Language: en-us\r\n"
+ "User-Agent: Netdata/rocks\r\n\r\n",
+ (method == HTTP_REQ_GET ? "GET" : "POST"), url, host, len);
+
+ if (payload != NULL)
+ strncat(b, payload, b_size - len);
+
+ len = strlen(b);
+
+ debug(D_ACLK, "Sending HTTPS req (%zu bytes): '%s'", len, b);
+ int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, sport, &timeout);
+
+ if (unlikely(sock == -1)) {
+ error("Handshake failed");
+ goto exit_buf;
+ }
+
+ SSL_CTX *ctx = security_initialize_openssl_client();
+ if (ctx==NULL) {
+ error("Cannot allocate SSL context");
+ goto exit_sock;
+ }
+ // Certificate chain: not updating the stores - do we need private CA roots?
+ // Calls to SSL_CTX_load_verify_locations would go here.
+ SSL *ssl = SSL_new(ctx);
+ if (ssl==NULL) {
+ error("Cannot allocate SSL");
+ goto exit_CTX;
+ }
+ SSL_set_fd(ssl, sock);
+ ret = SSL_connect(ssl);
+ if (ret != 1) {
+ error("SSL_connect() failed with err=%d", ret);
+ goto exit_SSL;
+ }
+
+ ret = SSL_write(ssl, b, len);
+ if (ret <= 0)
+ {
+ error("SSL_write() failed with err=%d", ret);
+ goto exit_SSL;
+ }
+
+ b[0] = 0;
+
+ do {
+ ptr = rbuf_get_linear_insert_range(buffer, &len);
+ ret = SSL_read(ssl, ptr, len - 1);
+ if (ret)
+ rbuf_bump_head(buffer, ret);
+ if (ret <= 0)
+ {
+ error("No response available - SSL_read()=%d", ret);
+ goto exit_FULL;
+ }
+ } while (!(ret = parse_http_response(buffer, &parse_ctx)));
+
+ if (ret != PARSE_SUCCESS) {
+ error("Error parsing HTTP response");
+ goto exit_FULL;
+ }
+
+ if (parse_ctx.http_code < 200 || parse_ctx.http_code >= 300) {
+ error("HTTP Response not Success (got %d)", parse_ctx.http_code);
+ goto exit_FULL;
+ }
+
+ len = rbuf_pop(buffer, b, b_size);
+ b[MIN(len, b_size-1)] = 0;
+
+ rc = 0;
+exit_FULL:
+exit_SSL:
+ SSL_free(ssl);
+exit_CTX:
+ SSL_CTX_free(ctx);
+exit_sock:
+ close(sock);
+exit_buf:
+ rbuf_free(buffer);
+ return rc;
+}
diff --git a/aclk/https_client.h b/aclk/https_client.h
new file mode 100644
index 000000000..0d2e0dba7
--- /dev/null
+++ b/aclk/https_client.h
@@ -0,0 +1,11 @@
+#ifndef NETDATA_HTTPS_CLIENT_H
+#define NETDATA_HTTPS_CLIENT_H
+
+typedef enum http_req_type {
+ HTTP_REQ_GET,
+ HTTP_REQ_POST
+} http_req_type_t;
+
+int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload);
+
+#endif /* NETDATA_HTTPS_CLIENT_H */
diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c
index d7188b1f0..43455393a 100644
--- a/aclk/legacy/aclk_common.c
+++ b/aclk/legacy/aclk_common.c
@@ -252,6 +252,7 @@ struct label *add_aclk_host_labels(struct label *label) {
proxy_str = "none";
break;
}
+ label = add_label_to_list(label, "_aclk_impl", "Legacy", LABEL_SOURCE_AUTO);
return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#else
return label;
diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c
index c1856ed2c..f41a230db 100644
--- a/aclk/legacy/aclk_lws_https_client.c
+++ b/aclk/legacy/aclk_lws_https_client.c
@@ -3,7 +3,11 @@
#define ACLK_LWS_HTTPS_CLIENT_INTERNAL
#include "aclk_lws_https_client.h"
+#ifndef ACLK_NG
#include "aclk_common.h"
+#else
+#include "../aclk.h"
+#endif
#include "aclk_lws_wss_client.h"
diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c
index f06df3f42..df221dd60 100644
--- a/aclk/legacy/aclk_lws_wss_client.c
+++ b/aclk/legacy/aclk_lws_wss_client.c
@@ -348,6 +348,7 @@ static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data,
return 1;
}
+#ifdef ACLK_TRP_DEBUG_VERBOSE
static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
{
switch (reason) {
@@ -377,12 +378,11 @@ static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
return "LWS_CALLBACK_EVENT_WAIT_CANCELLED";
default:
// Not using an internal buffer here for thread-safety with unknown calling context.
-#ifdef ACLK_TRP_DEBUG_VERBOSE
error("Unknown LWS callback %u", reason);
-#endif
return "unknown";
}
}
+#endif
void aclk_lws_wss_fail_report()
{
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c
index 7ab534f16..27ad9ac16 100644
--- a/aclk/legacy/aclk_query.c
+++ b/aclk/legacy/aclk_query.c
@@ -22,6 +22,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
struct aclk_query {
usec_t created;
+ struct timeval tv_in;
usec_t created_boot_time;
time_t run_after; // Delay run until after this time
ACLK_CMD cmd; // What command is this
@@ -30,6 +31,7 @@ struct aclk_query {
char *msg_id; // msg_id generated by the cloud (NULL if internal)
char *query; // The actual query
u_char deleted; // Mark deleted for garbage collect
+ int idx; // index of query thread
struct aclk_query *next;
};
@@ -62,6 +64,7 @@ static void aclk_query_free(struct aclk_query *this_query)
freez(this_query->query);
if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) {
struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data;
+ freez(del->query_endpoint);
freez(del->data);
freez(del);
}
@@ -236,7 +239,8 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
new_query->data = data;
new_query->next = NULL;
- new_query->created = now_realtime_usec();
+ now_realtime_timeval(&new_query->tv_in);
+ new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec;
new_query->created_boot_time = now_boottime_usec();
new_query->run_after = run_after;
@@ -324,6 +328,7 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
#pragma region ACLK_QUERY
#endif
+
static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
{
usec_t t = now_boottime_usec();
@@ -359,8 +364,11 @@ static int aclk_execute_query(struct aclk_query *this_query)
mysep = strrchr(this_query->query, '/');
// TODO: handle bad response perhaps in a different way. For now it does to the payload
- aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+ w->tv_in = this_query->tv_in;
now_realtime_timeval(&w->tv_ready);
+ aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+ size_t size = w->response.data->len;
+ size_t sent = size;
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -382,6 +390,24 @@ static int aclk_execute_query(struct aclk_query *this_query)
aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
+ struct timeval tv;
+ now_realtime_timeval(&tv);
+
+ log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
+ w->id
+ , gettid()
+ , this_query->idx
+ , "DATA"
+ , sent
+ , size
+ , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
+ , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
+ , dt_usec(&tv, &w->tv_ready) / 1000.0
+ , dt_usec(&tv, &w->tv_in) / 1000.0
+ , w->response.code
+ , strip_control_characters(this_query->query)
+ );
+
buffer_free(w->response.data);
buffer_free(w->response.header);
buffer_free(w->response.header_output);
@@ -426,7 +452,11 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
mysep = strrchr(this_query->query, '/');
// execute the query
+ w->tv_in = this_query->tv_in;
+ now_realtime_timeval(&w->tv_ready);
t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+ size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
+ size_t sent = size;
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
@@ -475,7 +505,6 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
}
#endif
- now_realtime_timeval(&w->tv_ready);
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w);
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -492,6 +521,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
buffer_need_bytes(local_buffer, w->response.data->len);
memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
local_buffer->len += w->response.data->len;
+ sent = sent - size + w->response.data->len;
} else {
#endif
buffer_strcat(local_buffer, w->response.data->buffer);
@@ -502,6 +532,23 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
+ struct timeval tv;
+ now_realtime_timeval(&tv);
+
+ log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
+ w->id
+ , gettid()
+ , this_query->idx
+ , "DATA"
+ , sent
+ , size
+ , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
+ , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
+ , dt_usec(&tv, &w->tv_ready) / 1000.0
+ , dt_usec(&tv, &w->tv_in) / 1000.0
+ , w->response.code
+ , strip_control_characters(this_query->query)
+ );
cleanup:
#ifdef NETDATA_WITH_ZLIB
if(w->response.zinitialized)
@@ -550,6 +597,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
query_count++;
host = (RRDHOST*)this_query->data;
+ this_query->idx = t_info->idx;
debug(
D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
@@ -629,6 +677,12 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[t_info->idx]++;
ACLK_STATS_UNLOCK;
+
+ if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) {
+ getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]);
+ getrusage_called_this_tick[t_info->idx]++;
+ }
+
}
aclk_query_free(this_query);
diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h
index 53eef1392..026985c8d 100644
--- a/aclk/legacy/aclk_query.h
+++ b/aclk/legacy/aclk_query.h
@@ -8,8 +8,11 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+#define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread.
+
extern pthread_cond_t query_cond_wait;
extern pthread_mutex_t query_lock_wait;
+extern uint8_t *getrusage_called_this_tick;
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
@@ -28,6 +31,7 @@ struct aclk_query_threads {
struct aclk_cloud_req_v2 {
char *data;
RRDHOST *host;
+ char *query_endpoint;
};
void *aclk_query_main_thread(void *ptr);
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
index 99fa9d987..2681445b4 100644
--- a/aclk/legacy/aclk_rx_msgs.c
+++ b/aclk/legacy/aclk_rx_msgs.c
@@ -25,7 +25,7 @@ static inline int aclk_extract_v2_data(char *payload, char **data)
#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref))
static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req)
{
- const char *start, *end, *ptr;
+ const char *start, *end, *ptr, *query_type;
char uuid_str[UUID_STR_LEN];
uuid_t uuid;
@@ -66,6 +66,8 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req,
error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
return 1;
}
+ ptr += strlen(ACLK_CLOUD_REQ_V2_PREFIX);
+ query_type = ptr;
if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) {
errno = 0;
@@ -73,6 +75,11 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req,
return 1;
}
+ if(!(ptr = strchr(ptr, '?')) || ptr > end)
+ ptr = end;
+ cloud_req->query_endpoint = mallocz((ptr - query_type) + 1);
+ strncpyz(cloud_req->query_endpoint, query_type, ptr - query_type);
+
req->payload = mallocz((end - start) + 1);
strncpyz(req->payload, start, end - start);
@@ -122,6 +129,13 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_v1++;
+ aclk_metrics_per_sample.cloud_req_ok++;
+ ACLK_STATS_UNLOCK;
+ }
+
return 0;
}
@@ -131,6 +145,7 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
struct aclk_cloud_req_v2 *cloud_req;
char *data;
+ int stat_idx;
errno = 0;
if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
@@ -165,6 +180,10 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
goto cleanup;
}
+ // we do this here due to cloud_req being taken over by query thread
+ // which if crazy quick can free it after aclk_queue_query
+ stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint);
+
// aclk_queue_query takes ownership of data pointer
if (unlikely(aclk_queue_query(
cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
@@ -173,8 +192,17 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
goto cleanup;
}
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_v2++;
+ aclk_metrics_per_sample.cloud_req_ok++;
+ aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
+ ACLK_STATS_UNLOCK;
+ }
+
return 0;
cleanup:
+ freez(cloud_req->query_endpoint);
freez(cloud_req->data);
freez(cloud_req);
return 1;
@@ -289,12 +317,6 @@ int aclk_handle_cloud_message(char *payload)
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
-
if (unlikely(!payload)) {
errno = 0;
error("ACLK incoming message is empty");
diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c
index 2a57cd6f0..7124380a2 100644
--- a/aclk/legacy/aclk_stats.c
+++ b/aclk/legacy/aclk_stats.c
@@ -11,8 +11,17 @@ struct aclk_qt_data {
RRDDIM *dim;
} *aclk_qt_data = NULL;
+// ACLK per query thread cpu stats
+struct aclk_cpu_data {
+ RRDDIM *user;
+ RRDDIM *system;
+ RRDSET *st;
+} *aclk_cpu_data = NULL;
+
uint32_t *aclk_queries_per_thread = NULL;
uint32_t *aclk_queries_per_thread_sample = NULL;
+struct rusage *rusage_per_thread;
+uint8_t *getrusage_called_this_tick = NULL;
struct aclk_metrics aclk_metrics = {
.online = 0,
@@ -153,7 +162,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
- static RRDDIM *rd_rq_rcvd = NULL;
+ static RRDDIM *rd_rq_ok = NULL;
static RRDDIM *rd_rq_err = NULL;
if (unlikely(!st)) {
@@ -161,17 +170,82 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
"netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
"netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
- rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_ok = rrddim_add(st, "accepted", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_err = rrddim_add(st, "rejected", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st);
- rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err);
+ rrddim_set_by_pointer(st, rd_rq_ok, per_sample->cloud_req_ok);
rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
rrdset_done(st);
}
+static void aclk_stats_cloud_req_version(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_v1 = NULL;
+ static RRDDIM *rd_rq_v2 = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req_version", NULL, "aclk", NULL, "Requests received from cloud by their version", "req/s",
+ "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_rq_v1 = rrddim_add(st, "v1", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_v2 = rrddim_add(st, "v2+", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_rq_v1, per_sample->cloud_req_v1);
+ rrddim_set_by_pointer(st, rd_rq_v2, per_sample->cloud_req_v2);
+
+ rrdset_done(st);
+}
+
+static char *cloud_req_type_names[ACLK_STATS_CLOUD_REQ_TYPE_CNT] = {
+ "other",
+ "info",
+ "data",
+ "alarms",
+ "alarm_log",
+ "chart",
+ "charts"
+ // if you change update:
+ // #define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
+};
+
+int aclk_cloud_req_type_to_idx(const char *name)
+{
+ for (int i = 1; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
+ if (!strcmp(cloud_req_type_names[i], name))
+ return i;
+ return 0;
+}
+
+static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st;
+ static int initialized = 0;
+ static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_REQ_TYPE_CNT];
+
+ if (unlikely(!initialized)) {
+ initialized = 1;
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req_cmd", NULL, "aclk", NULL, "Requests received from cloud by their type (api endpoint queried)", "req/s",
+ "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
+ rd_rq_types[i] = rrddim_add(st, cloud_req_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
+ rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_by_type[i]);
+
+ rrdset_done(st);
+}
+
#define MAX_DIM_NAME 16
static void aclk_stats_query_threads(uint32_t *queries_per_thread)
{
@@ -182,7 +256,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
if (unlikely(!st)) {
st = rrdset_create_localhost(
"netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
- "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+ "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
for (int i = 0; i < query_thread_count; i++) {
if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
@@ -222,11 +296,42 @@ static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct
rrdset_done(metric->st);
}
+static void aclk_stats_cpu_threads(void)
+{
+ char id[100 + 1];
+ char title[100 + 1];
+
+ for (int i = 0; i < query_thread_count; i++) {
+ if (unlikely(!aclk_cpu_data[i].st)) {
+
+ snprintfz(id, 100, "aclk_thread%d_cpu", i);
+ snprintfz(title, 100, "Cpu Usage For Thread No %d", i);
+
+ aclk_cpu_data[i].st = rrdset_create_localhost(
+ "netdata", id, NULL, "aclk", NULL, title, "milliseconds/s",
+ "netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ aclk_cpu_data[i].user = rrddim_add(aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ aclk_cpu_data[i].system = rrddim_add(aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+
+ } else
+ rrdset_next(aclk_cpu_data[i].st);
+ }
+
+ for (int i = 0; i < query_thread_count; i++) {
+ rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec);
+ rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec);
+ rrdset_done(aclk_cpu_data[i].st);
+ }
+}
+
void aclk_stats_thread_cleanup()
{
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
+ freez(aclk_cpu_data);
+ freez(rusage_per_thread);
}
void *aclk_stats_main_thread(void *ptr)
@@ -235,8 +340,11 @@ void *aclk_stats_main_thread(void *ptr)
query_thread_count = args->query_thread_count;
aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
+ aclk_cpu_data = callocz(query_thread_count, sizeof(struct aclk_cpu_data));
aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+ rusage_per_thread = callocz(query_thread_count, sizeof(struct rusage));
+ getrusage_called_this_tick = callocz(query_thread_count, sizeof(uint8_t));
heartbeat_t hb;
heartbeat_init(&hb);
@@ -264,6 +372,7 @@ void *aclk_stats_main_thread(void *ptr)
memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
+ memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * query_thread_count);
ACLK_STATS_UNLOCK;
aclk_stats_collect(&per_sample, &permanent);
@@ -273,8 +382,14 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_read_q(&per_sample);
aclk_stats_cloud_req(&per_sample);
+ aclk_stats_cloud_req_version(&per_sample);
+
+ aclk_stats_cloud_req_cmd(&per_sample);
+
aclk_stats_query_threads(aclk_queries_per_thread_sample);
+ aclk_stats_cpu_threads();
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency);
#endif
diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h
index 7e74fdf88..5e50a2272 100644
--- a/aclk/legacy/aclk_stats.h
+++ b/aclk/legacy/aclk_stats.h
@@ -55,6 +55,11 @@ extern struct aclk_mat_metrics {
void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
+#define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
+// if you change update cloud_req_type_names
+
+int aclk_cloud_req_type_to_idx(const char *name);
+
// reset to 0 on every sample
extern struct aclk_metrics_per_sample {
/* in the unlikely event of ACLK disconnecting
@@ -72,9 +77,14 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t read_q_added;
volatile uint32_t read_q_consumed;
- volatile uint32_t cloud_req_recvd;
+ volatile uint32_t cloud_req_ok;
volatile uint32_t cloud_req_err;
+ volatile uint16_t cloud_req_v1;
+ volatile uint16_t cloud_req_v2;
+
+ volatile uint16_t cloud_req_by_type[ACLK_STATS_CLOUD_REQ_TYPE_CNT];
+
#ifdef NETDATA_INTERNAL_CHECKS
struct aclk_metric_mat_data latency;
#endif
@@ -83,6 +93,7 @@ extern struct aclk_metrics_per_sample {
} aclk_metrics_per_sample;
extern uint32_t *aclk_queries_per_thread;
+extern struct rusage *rusage_per_thread;
void *aclk_stats_main_thread(void *ptr);
void aclk_stats_thread_cleanup();
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c
index e51a01308..5767df3a7 100644
--- a/aclk/legacy/agent_cloud_link.c
+++ b/aclk/legacy/agent_cloud_link.c
@@ -189,7 +189,8 @@ unsigned long int aclk_reconnect_delay(int mode)
delay = ACLK_MAX_BACKOFF_DELAY * 1000;
} else {
fail++;
- delay = (delay * 1000) + (random() % 1000);
+ delay *= 1000;
+ delay += (random() % (MAX(1000, delay/2)));
}
return delay;