summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'aclk')
-rw-r--r--aclk/README.md147
-rw-r--r--aclk/aclk.c1173
-rw-r--r--aclk/aclk.h57
-rw-r--r--aclk/aclk_alarm_api.c44
-rw-r--r--aclk/aclk_alarm_api.h14
-rw-r--r--aclk/aclk_capas.c47
-rw-r--r--aclk/aclk_capas.h14
-rw-r--r--aclk/aclk_contexts_api.c41
-rw-r--r--aclk/aclk_contexts_api.h14
-rw-r--r--aclk/aclk_otp.c888
-rw-r--r--aclk/aclk_otp.h18
-rw-r--r--aclk/aclk_proxy.c186
-rw-r--r--aclk/aclk_proxy.h22
-rw-r--r--aclk/aclk_query.c383
-rw-r--r--aclk/aclk_query.h36
-rw-r--r--aclk/aclk_query_queue.c136
-rw-r--r--aclk/aclk_query_queue.h86
-rw-r--r--aclk/aclk_rrdhost_state.h11
-rw-r--r--aclk/aclk_rx_msgs.c551
-rw-r--r--aclk/aclk_rx_msgs.h17
-rw-r--r--aclk/aclk_stats.c408
-rw-r--r--aclk/aclk_stats.h79
-rw-r--r--aclk/aclk_tx_msgs.c276
-rw-r--r--aclk/aclk_tx_msgs.h20
-rw-r--r--aclk/aclk_util.c388
-rw-r--r--aclk/aclk_util.h112
-rw-r--r--aclk/helpers/mqtt_wss_pal.h19
-rw-r--r--aclk/helpers/ringbuffer_pal.h11
-rw-r--r--aclk/https_client.c688
-rw-r--r--aclk/https_client.h78
-rw-r--r--aclk/schema-wrappers/alarm_config.cc147
-rw-r--r--aclk/schema-wrappers/alarm_config.h69
-rw-r--r--aclk/schema-wrappers/alarm_stream.cc253
-rw-r--r--aclk/schema-wrappers/alarm_stream.h136
-rw-r--r--aclk/schema-wrappers/capability.cc11
-rw-r--r--aclk/schema-wrappers/capability.h24
-rw-r--r--aclk/schema-wrappers/connection.cc72
-rw-r--r--aclk/schema-wrappers/connection.h47
-rw-r--r--aclk/schema-wrappers/context.cc125
-rw-r--r--aclk/schema-wrappers/context.h53
-rw-r--r--aclk/schema-wrappers/context_stream.cc42
-rw-r--r--aclk/schema-wrappers/context_stream.h36
-rw-r--r--aclk/schema-wrappers/node_connection.cc46
-rw-r--r--aclk/schema-wrappers/node_connection.h32
-rw-r--r--aclk/schema-wrappers/node_creation.cc39
-rw-r--r--aclk/schema-wrappers/node_creation.h31
-rw-r--r--aclk/schema-wrappers/node_info.cc136
-rw-r--r--aclk/schema-wrappers/node_info.h79
-rw-r--r--aclk/schema-wrappers/proto_2_json.cc85
-rw-r--r--aclk/schema-wrappers/proto_2_json.h18
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.cc22
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.h24
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h18
53 files changed, 7509 insertions, 0 deletions
diff --git a/aclk/README.md b/aclk/README.md
new file mode 100644
index 0000000..af0f5fd
--- /dev/null
+++ b/aclk/README.md
@@ -0,0 +1,147 @@
+<!--
+title: "Agent-Cloud link (ACLK)"
+description: "The Agent-Cloud link (ACLK) is the mechanism responsible for connecting a Netdata agent to Netdata Cloud."
+date: 2020-05-11
+custom_edit_url: https://github.com/netdata/netdata/edit/master/aclk/README.md
+-->
+
+# Agent-cloud link (ACLK)
+
+The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting a Netdata Agent to your web browser
+through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port
+`443`. The ACLK is encrypted, safe, and _is only established if you connect your node_.
+
+The Cloud App lives at app.netdata.cloud which currently resolves to the following list of IPs:
+
+- 54.198.178.11
+- 44.207.131.212
+- 44.196.50.41
+
+:::caution
+
+This list of IPs can change without notice, we strongly advise you to whitelist following domains `api.netdata.cloud`, `mqtt.netdata.cloud`, if
+this is not an option in your case always verify the current domain resolution (e.g via the `host` command).
+
+:::
+
+For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [get
+started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [connect to Cloud
+documentation](/claim/README.md).
+
+## Data privacy
+[Data privacy](https://netdata.cloud/privacy/) is very important to us. We firmly believe that your data belongs to
+you. This is why **we don't store any metric data in Netdata Cloud**.
+
+All the data that you see in the web browser when using Netdata Cloud, is actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard.
+The data passes through our systems, but it isn't stored.
+
+However, to be able to offer the stunning visualizations and advanced functionality of Netdata Cloud, it does store a limited number of _metadata_.
+
+Read more about [Data privacy in the Netdata Cloud](https://learn.netdata.cloud/docs/cloud/data-privacy) in the documentation.
+
+
+## Enable and configure the ACLK
+
+The ACLK is enabled by default, with its settings automatically configured and stored in the Agent's memory. No file is
+created at `/var/lib/netdata/cloud.d/cloud.conf` until you either connect a node or create it yourself. The default
+configuration uses two settings:
+
+```conf
+[global]
+ enabled = yes
+ cloud base url = https://api.netdata.cloud
+```
+
+If your Agent needs to use a proxy to access the internet, you must [set up a proxy for
+connecting to cloud](/claim/README.md#connect-through-a-proxy).
+
+You can configure following keys in the `netdata.conf` section `[cloud]`:
+```
+[cloud]
+ statistics = yes
+ query thread count = 2
+```
+
+- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
+- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
+
+## Disable the ACLK
+
+You have two options if you prefer to disable the ACLK and not use Netdata Cloud.
+
+### Disable at installation
+
+You can pass the `--disable-cloud` parameter to the Agent installation when using a kickstart script
+([kickstart.sh](/packaging/installer/methods/kickstart.md), or a [manual installation from
+Git](/packaging/installer/methods/manual.md).
+
+When you pass this parameter, the installer does not download or compile any extra libraries. Once running, the Agent
+kills the thread responsible for the ACLK and connecting behavior, and behaves as though the ACLK, and thus Netdata Cloud,
+does not exist.
+
+### Disable at runtime
+
+You can change a runtime setting in your `cloud.conf` file to disable the ACLK. This setting only stops the Agent from
+attempting any connection via the ACLK, but does not prevent the installer from downloading and compiling the ACLK's
+dependencies.
+
+The file typically exists at `/var/lib/netdata/cloud.d/cloud.conf`, but can change if you set a prefix during
+installation. To disable the ACLK, open that file and change the `enabled` setting to `no`:
+
+```conf
+[global]
+ enabled = no
+```
+
+If the file at `/var/lib/netdata/cloud.d/cloud.conf` doesn't exist, you need to create it.
+
+Copy and paste the first two lines from below, which will change your prompt to `cat`.
+
+```bash
+cd /var/lib/netdata/cloud.d
+cat > cloud.conf << EOF
+```
+
+Copy and paste in lines 3-6, and after the final `EOF`, hit **Enter**. The final line must contain only `EOF`. Hit **Enter** again to return to your normal prompt with the newly-created file.
+
+To get your normal prompt back, the final line
+must contain only `EOF`.
+
+```bash
+[global]
+ enabled = no
+ cloud base url = https://api.netdata.cloud
+EOF
+```
+
+You also need to change the file's permissions. Use `grep "run as user" /etc/netdata/netdata.conf` to figure out which
+user your Agent runs as (typically `netdata`), and replace `netdata:netdata` as shown below if necessary:
+
+```bash
+sudo chmod 0770 cloud.conf
+sudo chown netdata:netdata cloud.conf
+```
+
+Restart your Agent to disable the ACLK.
+
+### Re-enable the ACLK
+
+If you first disable the ACLK and any Cloud functionality and then decide you would like to use Cloud, you must either
+[reinstall Netdata](/packaging/installer/REINSTALL.md) with Cloud enabled or change the runtime setting in your
+`cloud.conf` file.
+
+If you passed `--disable-cloud` to `netdata-installer.sh` during installation, you must
+[reinstall](/packaging/installer/REINSTALL.md) your Agent. Use the same method as before, but pass `--require-cloud` to
+the installer. When installation finishes you can [connect your node](/claim/README.md#how-to-connect-a-node).
+
+If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` file, edit the file again and change
+`enabled` to `yes`:
+
+```conf
+[global]
+ enabled = yes
+```
+
+Restart your Agent and [connect your node](/claim/README.md#how-to-connect-a-node).
+
+
diff --git a/aclk/aclk.c b/aclk/aclk.c
new file mode 100644
index 0000000..3b035b8
--- /dev/null
+++ b/aclk/aclk.c
@@ -0,0 +1,1173 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk.h"
+
+#ifdef ENABLE_ACLK
+#include "aclk_stats.h"
+#include "mqtt_wss_client.h"
+#include "aclk_otp.h"
+#include "aclk_tx_msgs.h"
+#include "aclk_query.h"
+#include "aclk_query_queue.h"
+#include "aclk_util.h"
+#include "aclk_rx_msgs.h"
+#include "https_client.h"
+#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_capas.h"
+
+#include "aclk_proxy.h"
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#endif
+
+#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+
+#endif /* ENABLE_ACLK */
+
+int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+int aclk_rcvd_cloud_msgs = 0;
+int aclk_connection_counter = 0;
+int disconnect_req = 0;
+
+int aclk_connected = 0;
+int aclk_ctx_based = 0;
+int aclk_disable_runtime = 0;
+int aclk_stats_enabled;
+int aclk_kill_link = 0;
+
+usec_t aclk_session_us = 0;
+time_t aclk_session_sec = 0;
+
+time_t last_conn_time_mqtt = 0;
+time_t last_conn_time_appl = 0;
+time_t last_disconnect_time = 0;
+time_t next_connection_attempt = 0;
+float last_backoff_value = 0;
+
+time_t aclk_block_until = 0;
+
+#ifdef ENABLE_ACLK
+mqtt_wss_client mqttwss_client;
+
+netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
+#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+
+struct aclk_shared_state aclk_shared_state = {
+ .mqtt_shutdown_msg_id = -1,
+ .mqtt_shutdown_msg_rcvd = 0
+};
+
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+OSSL_DECODER_CTX *aclk_dctx = NULL;
+EVP_PKEY *aclk_private_key = NULL;
+#else
+static RSA *aclk_private_key = NULL;
+#endif
+static int load_private_key()
+{
+ if (aclk_private_key != NULL) {
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ EVP_PKEY_free(aclk_private_key);
+ if (aclk_dctx)
+ OSSL_DECODER_CTX_free(aclk_dctx);
+
+ aclk_dctx = NULL;
+#else
+ RSA_free(aclk_private_key);
+#endif
+ }
+ aclk_private_key = NULL;
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
+
+ long bytes_read;
+ char *private_key = read_by_filename(filename, &bytes_read);
+ if (!private_key) {
+ error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
+ return 1;
+ }
+ debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
+
+ BIO *key_bio = BIO_new_mem_buf(private_key, -1);
+ if (key_bio==NULL) {
+ error("Claimed agent cannot establish ACLK - failed to create BIO for key");
+ goto biofailed;
+ }
+
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL,
+ "RSA",
+ OSSL_KEYMGMT_SELECT_PRIVATE_KEY,
+ NULL, NULL);
+
+ if (!aclk_dctx) {
+ error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
+ goto biofailed;
+ }
+
+ // this is necesseary to avoid RSA key with wrong size
+ if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
+ error("Decoding private key (from claiming) failed - invalid format.");
+ goto biofailed;
+ }
+#else
+ aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
+#endif
+ BIO_free(key_bio);
+ if (aclk_private_key!=NULL)
+ {
+ freez(private_key);
+ return 0;
+ }
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
+
+biofailed:
+ freez(private_key);
+ return 1;
+}
+
+static int wait_till_cloud_enabled()
+{
+ info("Waiting for Cloud to be enabled");
+ while (!netdata_cloud_setting) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (netdata_exit)
+ return 1;
+ }
+ return 0;
+}
+
+/**
+ * Will block until agent is claimed. Returns only if agent claimed
+ * or if agent needs to shutdown.
+ *
+ * @return `0` if agent has been claimed,
+ * `1` if interrupted due to agent shutting down
+ */
+static int wait_till_agent_claimed(void)
+{
+ //TODO prevent malloc and freez
+ char *agent_id = get_agent_claimid();
+ while (likely(!agent_id)) {
+ sleep_usec(USEC_PER_SEC * 1);
+ if (netdata_exit)
+ return 1;
+ agent_id = get_agent_claimid();
+ }
+ freez(agent_id);
+ return 0;
+}
+
+/**
+ * Checks everything is ready for connection
+ * agent claimed, cloud url set and private key available
+ *
+ * @param aclk_hostname points to location where string pointer to hostname will be set
+ * @param aclk_port port to int where port will be saved
+ *
+ * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
+ */
+static int wait_till_agent_claim_ready()
+{
+ url_t url;
+ while (!netdata_exit) {
+ if (wait_till_agent_claimed())
+ return 1;
+
+ // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
+ // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (cloud_base_url == NULL) {
+ error("Do not move the cloud base url out of post_conf_load!!");
+ return 1;
+ }
+
+ // We just check configuration is valid here
+ // TODO make it without malloc/free
+ memset(&url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &url)) {
+ error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
+ url_t_destroy(&url);
+ sleep(5);
+ continue;
+ }
+ url_t_destroy(&url);
+
+ if (!load_private_key())
+ return 0;
+
+ sleep(5);
+ }
+
+ return 1;
+}
+
+void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
+{
+ switch(log_type) {
+ case MQTT_WSS_LOG_ERROR:
+ case MQTT_WSS_LOG_FATAL:
+ case MQTT_WSS_LOG_WARN:
+ error_report("%s", str);
+ return;
+ case MQTT_WSS_LOG_INFO:
+ info("%s", str);
+ return;
+ case MQTT_WSS_LOG_DEBUG:
+ debug(D_ACLK, "%s", str);
+ return;
+ default:
+ error("Unknown log type from mqtt_wss");
+ }
+}
+
+//TODO prevent big buffer on stack
+#define RX_MSGLEN_MAX 4096
+static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ UNUSED(qos);
+ aclk_rcvd_cloud_msgs++;
+ if (msglen > RX_MSGLEN_MAX)
+ error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
+
+ debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+ error("Link is shutting down. Ignoring incoming message.");
+ return;
+ }
+
+ const char *msgtype = strrchr(topic, '/');
+ if (unlikely(!msgtype)) {
+ error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+ msgtype++;
+ if (unlikely(!*msgtype)) {
+ error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+ char filename[FN_MAX_LEN];
+ int logfd;
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
+ logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+ if(logfd < 0)
+ error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ write(logfd, msg, msglen);
+ close(logfd);
+#endif
+
+ aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic);
+}
+
+static void puback_callback(uint16_t packet_id)
+{
+ if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
+ last_conn_time_appl = now_realtime_sec();
+ aclk_tbeb_reset();
+ }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_puback(packet_id);
+#endif
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
+ info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
+ aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
+ }
+}
+
+static int read_query_thread_count()
+{
+ int threads = MIN(processors/2, 6);
+ threads = MAX(threads, 2);
+ threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ if(threads < 1) {
+ error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
+ threads = 1;
+ config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
+ }
+ return threads;
+}
+
+void aclk_graceful_disconnect(mqtt_wss_client client);
+
+/* Keeps connection alive and handles all network communications.
+ * Returns on error or when netdata is shutting down.
+ * @param client instance of mqtt_wss_client
+ * @returns 0 - Netdata Exits
+ * >0 - Error happened. Reconnect and start over.
+ */
+static int handle_connection(mqtt_wss_client client)
+{
+ time_t last_periodic_query_wakeup = now_monotonic_sec();
+ while (!netdata_exit) {
+ // timeout 1000 to check at least once a second
+ // for netdata_exit
+ if (mqtt_wss_service(client, 1000) < 0){
+ error_report("Connection Error or Dropped");
+ return 1;
+ }
+
+ if (disconnect_req || aclk_kill_link) {
+ info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
+ disconnect_req ? "true" : "false",
+ aclk_kill_link ? "true" : "false");
+ disconnect_req = 0;
+ aclk_kill_link = 0;
+ aclk_graceful_disconnect(client);
+ aclk_queue_unlock();
+ aclk_shared_state.mqtt_shutdown_msg_id = -1;
+ aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
+ return 1;
+ }
+
+ // mqtt_wss_service will return faster than in one second
+ // if there is enough work to do
+ time_t now = now_monotonic_sec();
+ if (last_periodic_query_wakeup < now) {
+ // wake up at least one Query Thread at least
+ // once per second
+ last_periodic_query_wakeup = now;
+ QUERY_THREAD_WAKEUP;
+ }
+ }
+ return 0;
+}
+
+static inline void mqtt_connected_actions(mqtt_wss_client client)
+{
+ char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
+
+ if (!topic)
+ error("Unable to fetch topic for COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+
+ topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+
+ aclk_stats_upd_online(1);
+ aclk_connected = 1;
+ aclk_pubacks_per_conn = 0;
+ aclk_rcvd_cloud_msgs = 0;
+ aclk_connection_counter++;
+
+ aclk_send_agent_connection_update(client, 1);
+}
+
+void aclk_graceful_disconnect(mqtt_wss_client client)
+{
+ info("Preparing to gracefully shutdown ACLK connection");
+ aclk_queue_lock();
+ aclk_queue_flush();
+
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
+
+ time_t t = now_monotonic_sec();
+ while (!mqtt_wss_service(client, 100)) {
+ if (now_monotonic_sec() - t >= 2) {
+ error("Wasn't able to gracefully shutdown ACLK in time!");
+ break;
+ }
+ if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
+ info("MQTT App Layer `disconnect` message sent successfully");
+ break;
+ }
+ }
+ info("ACLK link is down");
+ log_access("ACLK DISCONNECTED");
+ aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
+ aclk_connected = 0;
+
+ info("Attempting to gracefully shutdown the MQTT/WSS connection");
+ mqtt_wss_disconnect(client, 1000);
+}
+
+static unsigned long aclk_reconnect_delay() {
+ unsigned long recon_delay;
+ time_t now;
+
+ if (aclk_disable_runtime) {
+ aclk_tbeb_reset();
+ return 60 * MSEC_PER_SEC;
+ }
+
+ now = now_monotonic_sec();
+ if (aclk_block_until) {
+ if (now < aclk_block_until) {
+ recon_delay = aclk_block_until - now;
+ recon_delay *= MSEC_PER_SEC;
+ aclk_block_until = 0;
+ aclk_tbeb_reset();
+ return recon_delay;
+ }
+ aclk_block_until = 0;
+ }
+
+ if (!aclk_env || !aclk_env->backoff.base)
+ return aclk_tbeb_delay(0, 2, 0, 1024);
+
+ return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
+}
+
+/* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
+ * @return 0 - Go ahead and connect (delay expired)
+ * 1 - netdata_exit
+ */
+#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
+static int aclk_block_till_recon_allowed() {
+ unsigned long recon_delay = aclk_reconnect_delay();
+
+ next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
+ last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
+
+ info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
+ // we want to wake up from time to time to check netdata_exit
+ while (recon_delay)
+ {
+ if (netdata_exit)
+ return 1;
+ if (recon_delay > NETDATA_EXIT_POLL_MS) {
+ sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
+ recon_delay -= NETDATA_EXIT_POLL_MS;
+ continue;
+ }
+ sleep_usec(recon_delay * USEC_PER_MS);
+ recon_delay = 0;
+ }
+ return netdata_exit;
+}
+
+#ifndef ACLK_DISABLE_CHALLENGE
+/* Cloud returns transport list ordered with highest
+ * priority first. This function selects highest prio
+ * transport that we can actually use (support)
+ */
+static int aclk_get_transport_idx(aclk_env_t *env) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ // currently we support only MQTT 5
+ // therefore select first transport that matches
+ if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
+ return i;
+ }
+ }
+ return -1;
+}
+#endif
+
+/* Attempts to make a connection to MQTT broker over WSS
+ * @param client instance of mqtt_wss_client
+ * @return 0 - Successful Connection,
+ * <0 - Irrecoverable Error -> Kill ACLK,
+ * >0 - netdata_exit
+ */
+#define CLOUD_BASE_URL_READ_RETRY 30
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+#define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
+#else
+#define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
+#endif
+static int aclk_attempt_to_connect(mqtt_wss_client client)
+{
+ int ret;
+
+ url_t base_url;
+
+#ifndef ACLK_DISABLE_CHALLENGE
+ url_t auth_url;
+ url_t mqtt_url;
+#endif
+
+ while (!netdata_exit) {
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (cloud_base_url == NULL) {
+ error_report("Do not move the cloud base url out of post_conf_load!!");
+ return -1;
+ }
+
+ if (aclk_block_till_recon_allowed())
+ return 1;
+
+ info("Attempting connection now");
+ memset(&base_url, 0, sizeof(url_t));
+ if (url_parse(cloud_base_url, &base_url)) {
+ error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
+ sleep(CLOUD_BASE_URL_READ_RETRY);
+ url_t_destroy(&base_url);
+ continue;
+ }
+
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
+
+ struct mqtt_connect_params mqtt_conn_params = {
+ .clientid = "anon",
+ .username = "anon",
+ .password = "anon",
+ .will_topic = "lwt",
+ .will_msg = NULL,
+ .will_flags = MQTT_WSS_PUB_QOS2,
+ .keep_alive = 60,
+ .drop_on_publish_fail = 1
+ };
+
+#ifndef ACLK_DISABLE_CHALLENGE
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ aclk_env = callocz(1, sizeof(aclk_env_t));
+
+ ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
+ url_t_destroy(&base_url);
+ if (ret) {
+ error_report("Failed to Get ACLK environment");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+ }
+
+ if (netdata_exit)
+ return 1;
+
+ if (aclk_env->encoding != ACLK_ENC_PROTO) {
+ error_report("This agent can only use the new cloud protocol but cloud requested old one.");
+ continue;
+ }
+
+ if (!aclk_env_has_capa("proto")) {
+ error_report("Can't use encoding=proto without at least \"proto\" capability.");
+ continue;
+ }
+ info("New ACLK protobuf protocol negotiated successfully (/env response).");
+
+ memset(&auth_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ url_t_destroy(&auth_url);
+ continue;
+ }
+
+ ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
+ url_t_destroy(&auth_url);
+ if (ret) {
+ error_report("Error passing Challenge/Response to get OTP");
+ continue;
+ }
+
+ // aclk_get_topic moved here as during OTP we
+ // generate the topic cache
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
+
+ if (!mqtt_conn_params.will_topic) {
+ error_report("Couldn't get LWT topic. Will not send LWT.");
+ continue;
+ }
+
+ // Do the MQTT connection
+ ret = aclk_get_transport_idx(aclk_env);
+ if (ret < 0) {
+ error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ continue;
+ }
+
+ memset(&mqtt_url, 0, sizeof(url_t));
+ if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ url_t_destroy(&mqtt_url);
+ continue;
+ }
+#endif
+
+ aclk_session_newarch = now_realtime_usec();
+ aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
+ aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
+
+#ifdef ACLK_DISABLE_CHALLENGE
+ ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&base_url);
+#else
+ ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
+ url_t_destroy(&mqtt_url);
+
+ freez((char*)mqtt_conn_params.clientid);
+ freez((char*)mqtt_conn_params.password);
+ freez((char*)mqtt_conn_params.username);
+#endif
+
+ freez((char *)mqtt_conn_params.will_msg);
+
+ if (!ret) {
+ last_conn_time_mqtt = now_realtime_sec();
+ info("ACLK connection successfully established");
+ log_access("ACLK CONNECTED");
+ mqtt_connected_actions(client);
+ return 0;
+ }
+
+ error_report("Connect failed");
+ }
+
+ return 1;
+}
+
+/**
+ * Main agent cloud link thread
+ *
+ * This thread will simply call the main event loop that handles
+ * pending requests - both inbound and outbound
+ *
+ * @param ptr is a pointer to the netdata_static_thread structure.
+ *
+ * @return It always returns NULL
+ */
+void *aclk_main(void *ptr)
+{
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+
+ struct aclk_stats_thread *stats_thread = NULL;
+
+ struct aclk_query_threads query_threads;
+ query_threads.thread_list = NULL;
+
+ ACLK_PROXY_TYPE proxy_type;
+ aclk_get_proxy(&proxy_type);
+ if (proxy_type == PROXY_TYPE_SOCKS5) {
+ error("SOCKS5 proxy is not supported by ACLK-NG yet.");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+ }
+
+ unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
+
+ // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
+ // as it must notify the far end that it shutdown gracefully and avoid the LWT.
+ netdata_thread_disable_cancelability();
+
+#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
+ info("Killing ACLK thread -> cloud functionality has been disabled");
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+#endif
+ query_threads.count = read_query_thread_count();
+
+ if (wait_till_cloud_enabled())
+ goto exit;
+
+ if (wait_till_agent_claim_ready())
+ goto exit;
+
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
+ error("Couldn't initialize MQTT_WSS network library");
+ goto exit;
+ }
+
+ // Enable MQTT buffer growth if necessary
+ // e.g. old cloud architecture clients with huge nodes
+ // that send JSON payloads of 10 MB as single messages
+ mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
+
+ aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
+ if (aclk_stats_enabled) {
+ stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
+ stats_thread->thread = mallocz(sizeof(netdata_thread_t));
+ stats_thread->query_thread_count = query_threads.count;
+ stats_thread->client = mqttwss_client;
+ aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
+ netdata_thread_create(
+ stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread);
+ }
+
+ // Keep reconnecting and talking until our time has come
+ // and the Grim Reaper (netdata_exit) calls
+ do {
+ if (aclk_attempt_to_connect(mqttwss_client))
+ goto exit_full;
+
+ if (unlikely(!query_threads.thread_list))
+ aclk_query_threads_start(&query_threads, mqttwss_client);
+
+ if (handle_connection(mqttwss_client)) {
+ aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
+ aclk_connected = 0;
+ log_access("ACLK DISCONNECTED");
+ }
+ } while (!netdata_exit);
+
+ aclk_graceful_disconnect(mqttwss_client);
+
+exit_full:
+// Tear Down
+ QUERY_THREAD_WAKEUP_ALL;
+
+ aclk_query_threads_cleanup(&query_threads);
+
+ if (aclk_stats_enabled) {
+ netdata_thread_join(*stats_thread->thread, NULL);
+ aclk_stats_thread_cleanup();
+ freez(stats_thread->thread);
+ freez(stats_thread);
+ }
+ free_topic_cache();
+ mqtt_wss_destroy(mqttwss_client);
+exit:
+ if (aclk_env) {
+ aclk_env_t_destroy(aclk_env);
+ freez(aclk_env);
+ }
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+ return NULL;
+}
+
+void aclk_host_state_update(RRDHOST *host, int cmd)
+{
+ uuid_t node_id;
+ int ret;
+
+ if (!aclk_connected)
+ return;
+
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = rrdhost_hostname(host),
+ .machine_guid = host->machine_guid
+ };
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
+ aclk_queue_query(create_query);
+ return;
+ }
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .hops = host->system_info->hops,
+ .live = cmd,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
+
+ node_state_update.capabilities = aclk_get_agent_capas();
+
+ rrdhost_aclk_state_lock(localhost);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ rrdhost_aclk_state_unlock(localhost);
+
+ info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
+ host->system_info->hops);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
+}
+
+void aclk_send_node_instances()
+{
+ struct node_instance_list *list_head = get_node_list();
+ struct node_instance_list *list = list_head;
+ if (unlikely(!list)) {
+ error_report("Failure to get_node_list from DB!");
+ return;
+ }
+ while (!uuid_is_null(list->host_id)) {
+ if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .live = list->live,
+ .hops = list->hops,
+ .queryable = 1,
+ .session_id = aclk_session_newarch
+ };
+ node_state_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
+
+ char host_id[UUID_STR_LEN];
+ uuid_unparse_lower(list->host_id, host_id);
+
+ RRDHOST *host = rrdhost_find_by_guid(host_id);
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
+
+ rrdhost_aclk_state_lock(localhost);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ rrdhost_aclk_state_unlock(localhost);
+ info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
+ list->live,
+ list->hops);
+
+ freez((void*)node_state_update.capabilities);
+ freez((void*)node_state_update.node_id);
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+ aclk_queue_query(query);
+ } else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ node_instance_creation_t node_instance_creation = {
+ .hops = list->hops,
+ .hostname = list->hostname,
+ };
+ node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+ create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+ info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
+ list->hops);
+ freez((void *)node_instance_creation.machine_guid);
+ aclk_queue_query(create_query);
+ }
+ freez(list->hostname);
+
+ list++;
+ }
+ freez(list_head);
+}
+
+void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+{
+ aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
+}
+
+static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status)) {
+ buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tBatch ID: %"PRIu64
+ "\n\t\tLast Acked Seq ID: %"PRIu64
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tLast Submitted Seq ID: %"PRIu64,
+ status.alert_updates,
+ status.alerts_batch_id,
+ status.last_acked_sequence_id,
+ status.pending_min_sequence_id,
+ status.pending_max_sequence_id,
+ status.last_submitted_sequence_id
+ );
+}
+#endif /* ENABLE_ACLK */
+
+char *aclk_state(void)
+{
+#ifndef ENABLE_ACLK
+ return strdupz("ACLK Available: No");
+#else
+ BUFFER *wb = buffer_create(1024);
+ struct tm *tmptr, tmbuf;
+ char *ret;
+
+ buffer_strcat(wb,
+ "ACLK Available: Yes\n"
+ "ACLK Version: 2\n"
+ "Protocols Supported: Protobuf\n"
+ );
+ buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
+
+ char *agent_id = get_agent_claimid();
+ if (agent_id == NULL)
+ buffer_strcat(wb, "No\n");
+ else {
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
+ freez(agent_id);
+ }
+
+ buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
+ if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
+ }
+ if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
+ }
+ if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
+ }
+ if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
+ }
+
+ if (aclk_connected) {
+ buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
+
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
+
+ buffer_strcat(wb, "\tClaimed ID: ");
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ buffer_strcat(wb, host->aclk_state.claimed_id);
+ else
+ buffer_strcat(wb, "null");
+ rrdhost_aclk_state_unlock(host);
+
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ buffer_strcat(wb, "\n\tNode ID: null\n");
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
+ }
+
+ buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
+
+ if (host != localhost)
+ buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
+
+ buffer_strcat(wb, "\n\tAlert Streaming Status:");
+ fill_alert_status_for_host(wb, host);
+ }
+ rrd_unlock();
+ }
+
+ ret = strdupz(buffer_tostring(wb));
+ buffer_free(wb);
+ return ret;
+#endif /* ENABLE_ACLK */
+}
+
+#ifdef ENABLE_ACLK
+static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status))
+ return;
+
+ json_object *tmp = json_object_new_int(status.alert_updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(status.alerts_batch_id);
+ json_object_object_add(obj, "batch-id", tmp);
+
+ tmp = json_object_new_int(status.last_acked_sequence_id);
+ json_object_object_add(obj, "last-acked-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_min_sequence_id);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_max_sequence_id);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(status.last_submitted_sequence_id);
+ json_object_object_add(obj, "last-submitted-seq-id", tmp);
+}
+
+static json_object *timestamp_to_json(const time_t *t)
+{
+ struct tm *tmptr, tmbuf;
+ if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ return json_object_new_string(timebuf);
+ }
+ return NULL;
+}
+#endif /* ENABLE_ACLK */
+
+char *aclk_state_json(void)
+{
+#ifndef ENABLE_ACLK
+ return strdupz("{\"aclk-available\":false}");
+#else
+ json_object *tmp, *grp, *msg = json_object_new_object();
+
+ tmp = json_object_new_boolean(1);
+ json_object_object_add(msg, "aclk-available", tmp);
+
+ tmp = json_object_new_int(2);
+ json_object_object_add(msg, "aclk-version", tmp);
+
+ grp = json_object_new_array();
+ tmp = json_object_new_string("Protobuf");
+ json_object_array_add(grp, tmp);
+ json_object_object_add(msg, "protocols-supported", grp);
+
+ char *agent_id = get_agent_claimid();
+ tmp = json_object_new_boolean(agent_id != NULL);
+ json_object_object_add(msg, "agent-claimed", tmp);
+
+ if (agent_id) {
+ tmp = json_object_new_string(agent_id);
+ freez(agent_id);
+ } else
+ tmp = NULL;
+ json_object_object_add(msg, "claimed-id", tmp);
+
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
+ json_object_object_add(msg, "cloud-url", tmp);
+
+ tmp = json_object_new_boolean(aclk_connected);
+ json_object_object_add(msg, "online", tmp);
+
+ tmp = json_object_new_string("Protobuf");
+ json_object_object_add(msg, "used-cloud-protocol", tmp);
+
+ tmp = json_object_new_int(5);
+ json_object_object_add(msg, "mqtt-version", tmp);
+
+ tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
+ json_object_object_add(msg, "received-app-layer-msgs", tmp);
+
+ tmp = json_object_new_int(aclk_pubacks_per_conn);
+ json_object_object_add(msg, "received-mqtt-pubacks", tmp);
+
+ tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
+ json_object_object_add(msg, "reconnect-count", tmp);
+
+ json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
+ json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
+ json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
+ json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
+ tmp = NULL;
+ if (!aclk_connected && last_backoff_value)
+ tmp = json_object_new_double(last_backoff_value);
+ json_object_object_add(msg, "last-backoff-value", tmp);
+
+ tmp = json_object_new_boolean(aclk_disable_runtime);
+ json_object_object_add(msg, "banned-by-cloud", tmp);
+
+ grp = json_object_new_array();
+
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ json_object *nodeinstance = json_object_new_object();
+
+ tmp = json_object_new_string(rrdhost_hostname(host));
+ json_object_object_add(nodeinstance, "hostname", tmp);
+
+ tmp = json_object_new_string(host->machine_guid);
+ json_object_object_add(nodeinstance, "mguid", tmp);
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id) {
+ tmp = json_object_new_string(host->aclk_state.claimed_id);
+ json_object_object_add(nodeinstance, "claimed_id", tmp);
+ } else
+ json_object_object_add(nodeinstance, "claimed_id", NULL);
+ rrdhost_aclk_state_unlock(host);
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ json_object_object_add(nodeinstance, "node-id", NULL);
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ tmp = json_object_new_string(node_id);
+ json_object_object_add(nodeinstance, "node-id", tmp);
+ }
+
+ tmp = json_object_new_int(host->system_info->hops);
+ json_object_object_add(nodeinstance, "streaming-hops", tmp);
+
+ tmp = json_object_new_string(host == localhost ? "self" : "child");
+ json_object_object_add(nodeinstance, "relationship", tmp);
+
+ tmp = json_object_new_boolean((host->receiver || host == localhost));
+ json_object_object_add(nodeinstance, "streaming-online", tmp);
+
+ tmp = json_object_new_object();
+ fill_alert_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "alert-sync-status", tmp);
+
+ json_object_array_add(grp, nodeinstance);
+ }
+ rrd_unlock();
+ json_object_object_add(msg, "node-instances", grp);
+
+ char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
+ json_object_put(msg);
+ return str;
+#endif /* ENABLE_ACLK */
+}
+
+void add_aclk_host_labels(void) {
+ DICTIONARY *labels = localhost->rrdlabels;
+
+#ifdef ENABLE_ACLK
+ rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+ ACLK_PROXY_TYPE aclk_proxy;
+ char *proxy_str;
+ aclk_get_proxy(&aclk_proxy);
+
+ switch(aclk_proxy) {
+ case PROXY_TYPE_SOCKS5:
+ proxy_str = "SOCKS5";
+ break;
+ case PROXY_TYPE_HTTP:
+ proxy_str = "HTTP";
+ break;
+ default:
+ proxy_str = "none";
+ break;
+ }
+
+ rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#else
+ rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#endif
+}
+
+void aclk_queue_node_info(RRDHOST *host) {
+ struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
+ if (likely(wc)) {
+ wc->node_info_send = 1;
+ }
+}
diff --git a/aclk/aclk.h b/aclk/aclk.h
new file mode 100644
index 0000000..6aed548
--- /dev/null
+++ b/aclk/aclk.h
@@ -0,0 +1,57 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_H
+#define ACLK_H
+
+#include "daemon/common.h"
+
+#ifdef ENABLE_ACLK
+#include "aclk_util.h"
+#include "aclk_rrdhost_state.h"
+
+// How many MQTT PUBACKs we need to get to consider connection
+// stable for the purposes of TBEB (truncated binary exponential backoff)
+#define ACLK_PUBACKS_CONN_STABLE 3
+#endif /* ENABLE_ACLK */
+
+extern int aclk_connected;
+extern int aclk_ctx_based;
+extern int aclk_disable_runtime;
+extern int aclk_stats_enabled;
+extern int aclk_kill_link;
+
+extern usec_t aclk_session_us;
+extern time_t aclk_session_sec;
+
+extern time_t aclk_block_until;
+
+extern int disconnect_req;
+
+#ifdef ENABLE_ACLK
+void *aclk_main(void *ptr);
+
+extern netdata_mutex_t aclk_shared_state_mutex;
+#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
+#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
+
+extern struct aclk_shared_state {
+ // To wait for `disconnect` message PUBACK
+ // when shutting down
+ // at the same time if > 0 we know link is
+ // shutting down
+ int mqtt_shutdown_msg_id;
+ int mqtt_shutdown_msg_rcvd;
+} aclk_shared_state;
+
+void aclk_host_state_update(RRDHOST *host, int cmd);
+void aclk_send_node_instances(void);
+
+void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
+
+#endif /* ENABLE_ACLK */
+
+char *aclk_state(void);
+char *aclk_state_json(void);
+void add_aclk_host_labels(void);
+void aclk_queue_node_info(RRDHOST *host);
+
+#endif /* ACLK_H */
diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c
new file mode 100644
index 0000000..7df51a7
--- /dev/null
+++ b/aclk/aclk_alarm_api.c
@@ -0,0 +1,44 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_alarm_api.h"
+
+#include "aclk_query_queue.h"
+
+#include "aclk_util.h"
+
+#include "aclk.h"
+
+void aclk_send_alarm_log_health(struct alarm_log_health *log_health)
+{
+ aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH);
+ query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health);
+ query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH;
+ query->data.bin_payload.msg_name = "AlarmLogHealth";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
+{
+ size_t payload_size;
+ char *payload = generate_alarm_log_entry(&payload_size, log_entry);
+
+ aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
+}
+
+void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
+{
+ aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG);
+ query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg);
+ query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG;
+ query->data.bin_payload.msg_name = "ProvideAlarmConfiguration";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot)
+{
+ aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT);
+ query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot);
+ query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT;
+ query->data.bin_payload.msg_name = "AlarmSnapshot";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
diff --git a/aclk/aclk_alarm_api.h b/aclk/aclk_alarm_api.h
new file mode 100644
index 0000000..e3fa92b
--- /dev/null
+++ b/aclk/aclk_alarm_api.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_ALARM_API_H
+#define ACLK_ALARM_API_H
+
+#include "../daemon/common.h"
+#include "schema-wrappers/schema_wrappers.h"
+
+void aclk_send_alarm_log_health(struct alarm_log_health *log_health);
+void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry);
+void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg);
+void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot);
+
+#endif /* ACLK_ALARM_API_H */
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c
new file mode 100644
index 0000000..df9d18f
--- /dev/null
+++ b/aclk/aclk_capas.c
@@ -0,0 +1,47 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_capas.h"
+
+#include "ml/ml.h"
+
+const struct capability *aclk_get_agent_capas()
+{
+ static struct capability agent_capabilities[] = {
+ { .name = "json", .version = 2, .enabled = 0 },
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = 0, .enabled = 0 },
+ { .name = "mc", .version = 0, .enabled = 0 },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 1, .enabled = 1 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ agent_capabilities[2].version = ml_capable() ? 1 : 0;
+ agent_capabilities[2].enabled = ml_enabled(localhost);
+
+ agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0;
+ agent_capabilities[3].enabled = enable_metric_correlations;
+
+ return agent_capabilities;
+}
+
+struct capability *aclk_get_node_instance_capas(RRDHOST *host)
+{
+ struct capability ni_caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) },
+ { .name = "mc",
+ .version = enable_metric_correlations ? metric_correlations_version : 0,
+ .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 0, .enabled = 0 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) {
+ ni_caps[4].version = 1;
+ ni_caps[4].enabled = 1;
+ }
+
+ struct capability *ret = mallocz(sizeof(ni_caps));
+ memcpy(ret, ni_caps, sizeof(ni_caps));
+ return ret;
+}
diff --git a/aclk/aclk_capas.h b/aclk/aclk_capas.h
new file mode 100644
index 0000000..c39a197
--- /dev/null
+++ b/aclk/aclk_capas.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_CAPAS_H
+#define ACLK_CAPAS_H
+
+#include "daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+#include "schema-wrappers/capability.h"
+
+const struct capability *aclk_get_agent_capas();
+struct capability *aclk_get_node_instance_capas(RRDHOST *host);
+
+#endif /* ACLK_CAPAS_H */
diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c
new file mode 100644
index 0000000..f334493
--- /dev/null
+++ b/aclk/aclk_contexts_api.c
@@ -0,0 +1,41 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_query_queue.h"
+
+#include "aclk_contexts_api.h"
+
+void aclk_send_contexts_snapshot(contexts_snapshot_t data)
+{
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT;
+ query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsSnapshot";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_send_contexts_updated(contexts_updated_t data)
+{
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED;
+ query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsUpdated";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_update_node_collectors(struct update_node_collectors *collectors)
+{
+ aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS);
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS;
+ query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors);
+ query->data.bin_payload.msg_name = "UpdateNodeCollectors";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_update_node_info(struct update_node_info *info)
+{
+ aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO);
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO;
+ query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info);
+ query->data.bin_payload.msg_name = "UpdateNodeInfo";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h
new file mode 100644
index 0000000..f0b5ec7
--- /dev/null
+++ b/aclk/aclk_contexts_api.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_CONTEXTS_API_H
+#define ACLK_CONTEXTS_API_H
+
+#include "schema-wrappers/schema_wrappers.h"
+
+
+void aclk_send_contexts_snapshot(contexts_snapshot_t data);
+void aclk_send_contexts_updated(contexts_updated_t data);
+void aclk_update_node_collectors(struct update_node_collectors *collectors);
+void aclk_update_node_info(struct update_node_info *info);
+
+#endif /* ACLK_CONTEXTS_API_H */
+
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
new file mode 100644
index 0000000..2bdbb70
--- /dev/null
+++ b/aclk/aclk_otp.c
@@ -0,0 +1,888 @@
+
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_otp.h"
+#include "aclk_util.h"
+#include "aclk.h"
+
+#include "daemon/common.h"
+
+#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
+
+static int aclk_https_request(https_req_t *request, https_req_response_t *response) {
+ int rc;
+ // wrapper for ACLK only which loads ACLK specific proxy settings
+ // then only calls https_request
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
+
+ if (proxy_conf.type == MQTT_WSS_PROXY_HTTP) {
+ request->proxy_host = (char*)proxy_conf.host; // TODO make it const as well
+ request->proxy_port = proxy_conf.port;
+ }
+
+ rc = https_request(request, response);
+ freez((char*)proxy_conf.host);
+ return rc;
+}
+
+struct auth_data {
+ char *client_id;
+ char *username;
+ char *passwd;
+};
+
+#define PARSE_ENV_JSON_CHK_TYPE(it, type, name) \
+ if (json_object_get_type(json_object_iter_peek_value(it)) != type) { \
+ error("value of key \"%s\" should be %s", name, #type); \
+ goto exit; \
+ }
+
+#define JSON_KEY_CLIENTID "clientID"
+#define JSON_KEY_USER "username"
+#define JSON_KEY_PASS "password"
+#define JSON_KEY_TOPICS "topics"
+
+static int parse_passwd_response(const char *json_str, struct auth_data *auth) {
+ int rc = 1;
+ json_object *json;
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http response of /env endpoint");
+ return 1;
+ }
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CLIENTID)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_CLIENTID)
+
+ auth->client_id = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_USER)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_USER)
+
+ auth->username = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_PASS)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_PASS)
+
+ auth->passwd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TOPICS)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TOPICS)
+
+ if (aclk_generate_topic_cache(json_object_iter_peek_value(&it))) {
+ error("Failed to generate topic cache!");
+ goto exit;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+ error("Unknown key \"%s\" in passwd response payload. Ignoring", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ if (!auth->client_id) {
+ error(JSON_KEY_CLIENTID " is compulsory key in /password response");
+ goto exit;
+ }
+ if (!auth->passwd) {
+ error(JSON_KEY_PASS " is compulsory in /password response");
+ goto exit;
+ }
+ if (!auth->username) {
+ error(JSON_KEY_USER " is compulsory in /password response");
+ goto exit;
+ }
+
+ rc = 0;
+exit:
+ json_object_put(json);
+ return rc;
+}
+
+#define JSON_KEY_ERTRY "errorNonRetryable"
+#define JSON_KEY_EDELAY "errorRetryDelaySeconds"
+#define JSON_KEY_EEC "errorCode"
+#define JSON_KEY_EMSGKEY "errorMsgKey"
+#define JSON_KEY_EMSG "errorMessage"
+#if JSON_C_MINOR_VERSION >= 13
+static const char *get_json_str_by_path(json_object *json, const char *path) {
+ json_object *ptr;
+ if (json_pointer_get(json, path, &ptr)) {
+ error("Missing compulsory key \"%s\" in error response", path);
+ return NULL;
+ }
+ if (json_object_get_type(ptr) != json_type_string) {
+ error("Value of Key \"%s\" in error response should be string", path);
+ return NULL;
+ }
+ return json_object_get_string(ptr);
+}
+
+static int aclk_parse_otp_error(const char *json_str) {
+ int rc = 1;
+ json_object *json, *ptr;
+ const char *ec;
+ const char *ek;
+ const char *emsg;
+ int block_retry = -1, backoff = -1;
+
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http response of /env endpoint");
+ return 1;
+ }
+
+ if ((ec = get_json_str_by_path(json, "/" JSON_KEY_EEC)) == NULL)
+ goto exit;
+
+ if ((ek = get_json_str_by_path(json, "/" JSON_KEY_EMSGKEY)) == NULL)
+ goto exit;
+
+ if ((emsg = get_json_str_by_path(json, "/" JSON_KEY_EMSG)) == NULL)
+ goto exit;
+
+ // optional field
+ if (!json_pointer_get(json, "/" JSON_KEY_ERTRY, &ptr)) {
+ if (json_object_get_type(ptr) != json_type_boolean) {
+ error("Error response Key " "/" JSON_KEY_ERTRY " should be of boolean type");
+ goto exit;
+ }
+ block_retry = json_object_get_boolean(ptr);
+ }
+
+ // optional field
+ if (!json_pointer_get(json, "/" JSON_KEY_EDELAY, &ptr)) {
+ if (json_object_get_type(ptr) != json_type_int) {
+ error("Error response Key " "/" JSON_KEY_EDELAY " should be of integer type");
+ goto exit;
+ }
+ backoff = json_object_get_int(ptr);
+ }
+
+ if (block_retry > 0)
+ aclk_disable_runtime = 1;
+
+ if (backoff > 0)
+ aclk_block_until = now_monotonic_sec() + backoff;
+
+ error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff);
+ rc = 0;
+exit:
+ json_object_put(json);
+ return rc;
+}
+#else
+static int aclk_parse_otp_error(const char *json_str) {
+ int rc = 1;
+ int block_retry = -1, backoff = -1;
+
+ const char *ec = NULL;
+ const char *ek = NULL;
+ const char *emsg = NULL;
+
+ json_object *json;
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http response of /env endpoint");
+ return 1;
+ }
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSG)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSG)
+
+ emsg = json_object_get_string(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSGKEY)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSGKEY)
+
+ ek = json_object_get_string(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EEC)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EEC)
+
+ ec = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EDELAY)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_int) {
+ error("value of key " JSON_KEY_EDELAY " should be integer");
+ goto exit;
+ }
+
+ backoff = json_object_get_int(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ERTRY)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_boolean) {
+ error("value of key " JSON_KEY_ERTRY " should be integer");
+ goto exit;
+ }
+
+ block_retry = json_object_get_boolean(json_object_iter_peek_value(&it));
+ json_object_iter_next(&it);
+ continue;
+ }
+ error("Unknown key \"%s\" in error response payload. Ignoring", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ if (block_retry > 0)
+ aclk_disable_runtime = 1;
+
+ if (backoff > 0)
+ aclk_block_until = now_monotonic_sec() + backoff;
+
+ error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff);
+ rc = 0;
+exit:
+ json_object_put(json);
+ return rc;
+}
+#endif
+
+#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
+static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
+{
+ EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
+
+ if (ctx != NULL) {
+ memset(ctx, 0, sizeof(*ctx));
+ }
+ return ctx;
+}
+static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
+{
+ OPENSSL_free(ctx);
+ return;
+}
+#endif
+
+#define CHALLENGE_LEN 256
+#define CHALLENGE_LEN_BASE64 344
+inline static int base64_decode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
+{
+ unsigned char remaining_data[CHALLENGE_LEN];
+ EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
+ EVP_DecodeInit(ctx);
+ EVP_DecodeUpdate(ctx, out, outl, in, in_len);
+ int remainder = 0;
+ EVP_DecodeFinal(ctx, remaining_data, &remainder);
+ EVP_ENCODE_CTX_free(ctx);
+ if (remainder) {
+ error("Unexpected data at EVP_DecodeFinal");
+ return 1;
+ }
+ return 0;
+}
+
+inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
+{
+ int len;
+ unsigned char *str = out;
+ EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
+ EVP_EncodeInit(ctx);
+ EVP_EncodeUpdate(ctx, str, outl, in, in_len);
+ str += *outl;
+ EVP_EncodeFinal(ctx, str, &len);
+ *outl += len;
+ // if we ever expect longer output than what OpenSSL would pack into single line
+ // we would have to skip the endlines, until then we can just cut the string short
+ str = (unsigned char*)strchr((char*)out, '\n');
+ if (str)
+ *str = 0;
+ EVP_ENCODE_CTX_free(ctx);
+ return 0;
+}
+
+#define OTP_URL_PREFIX "/api/v1/auth/node/"
+int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **challenge, int *challenge_bytes)
+{
+ int rc = 1;
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
+
+ BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+
+ req.host = target->host;
+ req.port = target->port;
+ buffer_sprintf(url, "%s/node/%s/challenge", target->path, agent_id);
+ req.url = (char *)buffer_tostring(url);
+
+ if (aclk_https_request(&req, &resp)) {
+ error ("ACLK_OTP Challenge failed");
+ buffer_free(url);
+ return 1;
+ }
+ if (resp.http_code != 200) {
+ error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code);
+ buffer_free(url);
+ if (resp.payload_size)
+ aclk_parse_otp_error(resp.payload);
+ goto cleanup_resp;
+ }
+ buffer_free(url);
+
+ info ("ACLK_OTP Got Challenge from Cloud");
+
+ json_object *json = json_tokener_parse(resp.payload);
+ if (!json) {
+ error ("Couldn't parse HTTP GET challenge payload");
+ goto cleanup_resp;
+ }
+ json_object *challenge_json;
+ if (!json_object_object_get_ex(json, "challenge", &challenge_json)) {
+ error ("No key named \"challenge\" in the returned JSON");
+ goto cleanup_json;
+ }
+ if (!json_object_is_type(challenge_json, json_type_string)) {
+ error ("\"challenge\" is not a string JSON type");
+ goto cleanup_json;
+ }
+ const char *challenge_base64;
+ if (!(challenge_base64 = json_object_get_string(challenge_json))) {
+ error("Failed to extract challenge from JSON object");
+ goto cleanup_json;
+ }
+ if (strlen(challenge_base64) != CHALLENGE_LEN_BASE64) {
+ error("Received Challenge has unexpected length of %zu (expected %d)", strlen(challenge_base64), CHALLENGE_LEN_BASE64);
+ goto cleanup_json;
+ }
+
+ *challenge = mallocz((CHALLENGE_LEN_BASE64 / 4) * 3);
+ base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64));
+ if (*challenge_bytes != CHALLENGE_LEN) {
+ error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN);
+ freez(*challenge);
+ *challenge = NULL;
+ goto cleanup_json;
+ }
+ rc = 0;
+
+cleanup_json:
+ json_object_put(json);
+cleanup_resp:
+ https_req_response_free(&resp);
+ return rc;
+}
+
+int aclk_send_otp_response(const char *agent_id, const unsigned char *response, int response_bytes, url_t *target, struct auth_data *mqtt_auth)
+{
+ int len;
+ int rc = 1;
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
+
+ req.host = target->host;
+ req.port = target->port;
+ req.request_type = HTTP_REQ_POST;
+
+ unsigned char base64[CHALLENGE_LEN_BASE64 + 1];
+ memset(base64, 0, CHALLENGE_LEN_BASE64 + 1);
+
+ base64_encode_helper(base64, &len, response, response_bytes);
+
+ BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+ BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+
+ buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id);
+ buffer_sprintf(resp_json, "{\"response\":\"%s\"}", base64);
+
+ req.url = (char *)buffer_tostring(url);
+ req.payload = (char *)buffer_tostring(resp_json);
+ req.payload_size = strlen(req.payload);
+
+ if (aclk_https_request(&req, &resp)) {
+ error ("ACLK_OTP Password error trying to post result to password");
+ goto cleanup_buffers;
+ }
+ if (resp.http_code != 201) {
+ error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code);
+ if (resp.payload_size)
+ aclk_parse_otp_error(resp.payload);
+ goto cleanup_response;
+ }
+ info ("ACLK_OTP Got Password from Cloud");
+
+ if (parse_passwd_response(resp.payload, mqtt_auth)){
+ error("Error parsing response of password endpoint");
+ goto cleanup_response;
+ }
+
+ rc = 0;
+
+cleanup_response:
+ https_req_response_free(&resp);
+cleanup_buffers:
+ buffer_free(resp_json);
+ buffer_free(url);
+ return rc;
+}
+
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+static int private_decrypt(EVP_PKEY *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted)
+#else
+static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted)
+#endif
+{
+ int result;
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ size_t outlen = EVP_PKEY_size(p_key);
+ EVP_PKEY_CTX *ctx = EVP_PKEY_CTX_new(p_key, NULL);
+ if (!ctx)
+ return 1;
+
+ if (EVP_PKEY_decrypt_init(ctx) <= 0)
+ return 1;
+
+ if (EVP_PKEY_CTX_set_rsa_padding(ctx, RSA_PKCS1_OAEP_PADDING) <= 0)
+ return 1;
+
+ *decrypted = mallocz(outlen);
+
+ if (EVP_PKEY_decrypt(ctx, *decrypted, &outlen, enc_data, data_len) == 1)
+ result = (int) outlen;
+ else
+ result = -1;
+#else
+ *decrypted = mallocz(RSA_size(p_key));
+ result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING);
+#endif
+ if (result == -1)
+ {
+ char err[512];
+ ERR_error_string_n(ERR_get_error(), err, sizeof(err));
+ error("Decryption of the challenge failed: %s", err);
+ }
+ return result;
+}
+
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target)
+#else
+int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target)
+#endif
+{
+ unsigned char *challenge = NULL;
+ int challenge_bytes;
+
+ char *agent_id = get_agent_claimid();
+ if (agent_id == NULL) {
+ error("Agent was not claimed - cannot perform challenge/response");
+ return 1;
+ }
+
+ // Get Challenge
+ if (aclk_get_otp_challenge(target, agent_id, &challenge, &challenge_bytes)) {
+ error("Error getting challenge");
+ freez(agent_id);
+ return 1;
+ }
+
+ // Decrypt Challenge / Get response
+ unsigned char *response_plaintext;
+ int response_plaintext_bytes = private_decrypt(p_key, challenge, challenge_bytes, &response_plaintext);
+ if (response_plaintext_bytes < 0) {
+ error ("Couldn't decrypt the challenge received");
+ freez(response_plaintext);
+ freez(challenge);
+ freez(agent_id);
+ return 1;
+ }
+ freez(challenge);
+
+ // Encode and Send Challenge
+ struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL };
+ if (aclk_send_otp_response(agent_id, response_plaintext, response_plaintext_bytes, target, &data)) {
+ error("Error getting response");
+ freez(response_plaintext);
+ freez(agent_id);
+ return 1;
+ }
+
+ *mqtt_pass = data.passwd;
+ *mqtt_usr = data.username;
+ *mqtt_id = data.client_id;
+
+ freez(response_plaintext);
+ freez(agent_id);
+ return 0;
+}
+
+#define JSON_KEY_ENC "encoding"
+#define JSON_KEY_AUTH_ENDPOINT "authEndpoint"
+#define JSON_KEY_TRP "transports"
+#define JSON_KEY_TRP_TYPE "type"
+#define JSON_KEY_TRP_ENDPOINT "endpoint"
+#define JSON_KEY_BACKOFF "backoff"
+#define JSON_KEY_BACKOFF_BASE "base"
+#define JSON_KEY_BACKOFF_MAX "maxSeconds"
+#define JSON_KEY_BACKOFF_MIN "minSeconds"
+#define JSON_KEY_CAPS "capabilities"
+
+static int parse_json_env_transport(json_object *json, aclk_transport_desc_t *trp) {
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_TYPE)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_TYPE)
+ if (trp->type != ACLK_TRP_UNKNOWN) {
+ error(JSON_KEY_TRP_TYPE " set already");
+ goto exit;
+ }
+ trp->type = aclk_transport_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it)));
+ if (trp->type == ACLK_TRP_UNKNOWN) {
+ error(JSON_KEY_TRP_TYPE " unknown type \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
+ goto exit;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_ENDPOINT)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_ENDPOINT)
+ if (trp->endpoint) {
+ error(JSON_KEY_TRP_ENDPOINT " set already");
+ goto exit;
+ }
+ trp->endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ if (!trp->endpoint) {
+ error (JSON_KEY_TRP_ENDPOINT " is missing from JSON dictionary");
+ goto exit;
+ }
+
+ if (trp->type == ACLK_TRP_UNKNOWN) {
+ error ("transport type not set");
+ goto exit;
+ }
+
+ return 0;
+
+exit:
+ aclk_transport_desc_t_destroy(trp);
+ return 1;
+}
+
+static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) {
+ aclk_transport_desc_t *trp;
+ json_object *obj;
+
+ if (env->transports) {
+ error("transports have been set already");
+ return 1;
+ }
+
+ env->transport_count = json_object_array_length(json_array);
+
+ env->transports = callocz(env->transport_count , sizeof(aclk_transport_desc_t *));
+
+ for (size_t i = 0; i < env->transport_count; i++) {
+ trp = callocz(1, sizeof(aclk_transport_desc_t));
+ obj = json_object_array_get_idx(json_array, i);
+ if (parse_json_env_transport(obj, trp)) {
+ error("error parsing transport idx %d", (int)i);
+ freez(trp);
+ return 1;
+ }
+ env->transports[i] = trp;
+ }
+
+ return 0;
+}
+
+#define MATCHED_CORRECT 1
+#define MATCHED_ERROR -1
+#define NOT_MATCHED 0
+static int parse_json_backoff_int(struct json_object_iterator *it, int *out, const char* name, int min, int max) {
+ if (!strcmp(json_object_iter_peek_name(it), name)) {
+ if (json_object_get_type(json_object_iter_peek_value(it)) != json_type_int) {
+ error("Could not parse \"%s\". Not an integer as expected.", name);
+ return MATCHED_ERROR;
+ }
+
+ *out = json_object_get_int(json_object_iter_peek_value(it));
+
+ if (*out < min || *out > max) {
+ error("Value of \"%s\"=%d out of range (%d-%d).", name, *out, min, max);
+ return MATCHED_ERROR;
+ }
+
+ return MATCHED_CORRECT;
+ }
+ return NOT_MATCHED;
+}
+
+static int parse_json_backoff(json_object *json, aclk_backoff_t *backoff) {
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+ int ret;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if ( (ret = parse_json_backoff_int(&it, &backoff->base, JSON_KEY_BACKOFF_BASE, 1, 10)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if ( (ret = parse_json_backoff_int(&it, &backoff->max_s, JSON_KEY_BACKOFF_MAX, 500, INT_MAX)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if ( (ret = parse_json_backoff_int(&it, &backoff->min_s, JSON_KEY_BACKOFF_MIN, 0, INT_MAX)) ) {
+ if (ret == MATCHED_ERROR) {
+ return 1;
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ return 0;
+}
+
+static int parse_json_env_caps(json_object *json, aclk_env_t *env) {
+ json_object *obj;
+ const char *str;
+
+ if (env->capabilities) {
+ error("transports have been set already");
+ return 1;
+ }
+
+ env->capability_count = json_object_array_length(json);
+
+ // empty capabilities list is allowed
+ if (!env->capability_count)
+ return 0;
+
+ env->capabilities = callocz(env->capability_count , sizeof(char *));
+
+ for (size_t i = 0; i < env->capability_count; i++) {
+ obj = json_object_array_get_idx(json, i);
+ if (json_object_get_type(obj) != json_type_string) {
+ error("Capability at index %d not a string!", (int)i);
+ return 1;
+ }
+ str = json_object_get_string(obj);
+ if (!str) {
+ error("Error parsing capabilities");
+ return 1;
+ }
+ env->capabilities[i] = strdupz(str);
+ }
+
+ return 0;
+}
+
+static int parse_json_env(const char *json_str, aclk_env_t *env) {
+ json_object *json;
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ json = json_tokener_parse(json_str);
+ if (!json) {
+ error("JSON-C failed to parse the payload of http response of /env endpoint");
+ return 1;
+ }
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_AUTH_ENDPOINT)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_AUTH_ENDPOINT)
+ if (env->auth_endpoint) {
+ error("authEndpoint set already");
+ goto exit;
+ }
+ env->auth_endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ENC)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_ENC)
+ if (env->encoding != ACLK_ENC_UNKNOWN) {
+ error(JSON_KEY_ENC " set already");
+ goto exit;
+ }
+ env->encoding = aclk_encoding_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TRP)
+
+ json_object *now = json_object_iter_peek_value(&it);
+ parse_json_env_transports(now, env);
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_BACKOFF)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_object, JSON_KEY_BACKOFF)
+
+ if (parse_json_backoff(json_object_iter_peek_value(&it), &env->backoff)) {
+ env->backoff.base = 0;
+ error("Error parsing Backoff parameters in env");
+ goto exit;
+ }
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CAPS)) {
+ PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_CAPS)
+
+ if (parse_json_env_caps(json_object_iter_peek_value(&it), env)) {
+ error("Error parsing capabilities list");
+ goto exit;
+ }
+
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ // Check all compulsory keys have been set
+ if (env->transport_count < 1) {
+ error("env has to return at least one transport");
+ goto exit;
+ }
+ if (!env->auth_endpoint) {
+ error(JSON_KEY_AUTH_ENDPOINT " is compulsory");
+ goto exit;
+ }
+ if (env->encoding == ACLK_ENC_UNKNOWN) {
+ error(JSON_KEY_ENC " is compulsory");
+ goto exit;
+ }
+ if (!env->backoff.base) {
+ error(JSON_KEY_BACKOFF " is compulsory");
+ goto exit;
+ }
+
+ json_object_put(json);
+ return 0;
+
+exit:
+ aclk_env_t_destroy(env);
+ json_object_put(json);
+ return 1;
+}
+
+int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
+ BUFFER *buf = buffer_create(1024);
+
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
+
+ req.request_type = HTTP_REQ_GET;
+
+ char *agent_id = get_agent_claimid();
+ if (agent_id == NULL)
+ {
+ error("Agent was not claimed - cannot perform challenge/response");
+ buffer_free(buf);
+ return 1;
+ }
+
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+
+ freez(agent_id);
+
+ req.host = (char*)aclk_hostname;
+ req.port = aclk_port;
+ req.url = buf->buffer;
+ if (aclk_https_request(&req, &resp)) {
+ error("Error trying to contact env endpoint");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+ if (resp.http_code != 200) {
+ error("The HTTP code not 200 OK (Got %d)", resp.http_code);
+ if (resp.payload_size)
+ aclk_parse_otp_error(resp.payload);
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ if (!resp.payload || !resp.payload_size) {
+ error("Unexpected empty payload as response to /env call");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ if (parse_json_env(resp.payload, env)) {
+ error ("error parsing /env message");
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 1;
+ }
+
+ info("Getting Cloud /env successful");
+
+ https_req_response_free(&resp);
+ buffer_free(buf);
+ return 0;
+}
diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h
new file mode 100644
index 0000000..2d660e5
--- /dev/null
+++ b/aclk/aclk_otp.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_OTP_H
+#define ACLK_OTP_H
+
+#include "daemon/common.h"
+
+#include "https_client.h"
+#include "aclk_util.h"
+
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target);
+#else
+int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target);
+#endif
+int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port);
+
+#endif /* ACLK_OTP_H */
diff --git a/aclk/aclk_proxy.c b/aclk/aclk_proxy.c
new file mode 100644
index 0000000..1701eb8
--- /dev/null
+++ b/aclk/aclk_proxy.c
@@ -0,0 +1,186 @@
+#include "aclk_proxy.h"
+
+#include "daemon/common.h"
+
+#define ACLK_PROXY_ENV "env"
+#define ACLK_PROXY_CONFIG_VAR "proxy"
+
+struct {
+ ACLK_PROXY_TYPE type;
+ const char *url_str;
+} supported_proxy_types[] = {
+ { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
+ { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
+};
+
+const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
+{
+ switch (*type) {
+ case PROXY_DISABLED:
+ return "disabled";
+ case PROXY_TYPE_HTTP:
+ return "HTTP";
+ case PROXY_TYPE_SOCKS5:
+ return "SOCKS";
+ default:
+ return "Unknown";
+ }
+}
+
+static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
+{
+ int i = 0;
+ while (supported_proxy_types[i].url_str) {
+ if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
+ return supported_proxy_types[i].type;
+ i++;
+ }
+ return PROXY_TYPE_UNKNOWN;
+}
+
+ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
+{
+ if (!string)
+ return PROXY_TYPE_UNKNOWN;
+
+ while (*string == 0x20)
+ string++;
+
+ if (!*string)
+ return PROXY_TYPE_UNKNOWN;
+
+ return aclk_find_proxy(string);
+}
+
+// helper function to censor user&password
+// for logging purposes
+void safe_log_proxy_censor(char *proxy)
+{
+ size_t length = strlen(proxy);
+ char *auth = proxy + length - 1;
+ char *cur;
+
+ while ((auth >= proxy) && (*auth != '@'))
+ auth--;
+
+ //if not found or @ is first char do nothing
+ if (auth <= proxy)
+ return;
+
+ cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+ if (!cur)
+ cur = proxy;
+ else
+ cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
+
+ while (cur < auth) {
+ *cur = 'X';
+ cur++;
+ }
+}
+
+static inline void safe_log_proxy_error(char *str, const char *proxy)
+{
+ char *log = strdupz(proxy);
+ safe_log_proxy_censor(log);
+ error("%s Provided Value:\"%s\"", str, log);
+ freez(log);
+}
+
+static inline int check_socks_enviroment(const char **proxy)
+{
+ char *tmp = getenv("socks_proxy");
+
+ if (!tmp)
+ return 1;
+
+ if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
+ *proxy = tmp;
+ return 0;
+ }
+
+ safe_log_proxy_error(
+ "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
+ tmp);
+ return 1;
+}
+
+static inline int check_http_enviroment(const char **proxy)
+{
+ char *tmp = getenv("http_proxy");
+
+ if (!tmp)
+ return 1;
+
+ if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
+ *proxy = tmp;
+ return 0;
+ }
+
+ safe_log_proxy_error(
+ "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
+ tmp);
+ return 1;
+}
+
+const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
+{
+ const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
+ *type = PROXY_DISABLED;
+
+ if (strcmp(proxy, "none") == 0)
+ return proxy;
+
+ if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
+ if (check_socks_enviroment(&proxy) == 0) {
+#ifdef LWS_WITH_SOCKS5
+ *type = PROXY_TYPE_SOCKS5;
+ return proxy;
+#else
+ safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
+ "but Libwebsockets used doesn't have SOCKS5 support built in. "
+ "Ignoring and checking for other options.",
+ proxy);
+#endif
+ }
+ if (check_http_enviroment(&proxy) == 0)
+ *type = PROXY_TYPE_HTTP;
+ return proxy;
+ }
+
+ *type = aclk_verify_proxy(proxy);
+#ifndef LWS_WITH_SOCKS5
+ if (*type == PROXY_TYPE_SOCKS5) {
+ safe_log_proxy_error(
+ "Config var \"" ACLK_PROXY_CONFIG_VAR
+ "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
+ proxy);
+ }
+#endif
+ if (*type == PROXY_TYPE_UNKNOWN) {
+ *type = PROXY_DISABLED;
+ safe_log_proxy_error(
+ "Config var \"" ACLK_PROXY_CONFIG_VAR
+ "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
+ proxy);
+ }
+
+ return proxy;
+}
+
+// helper function to read settings only once (static)
+// as claiming, challenge/response and ACLK
+// read the same thing, no need to parse again
+const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
+{
+ static const char *proxy = NULL;
+ static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
+
+ if (proxy_type == PROXY_NOT_SET)
+ proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
+
+ *type = proxy_type;
+ return proxy;
+}
diff --git a/aclk/aclk_proxy.h b/aclk/aclk_proxy.h
new file mode 100644
index 0000000..b4ceb7d
--- /dev/null
+++ b/aclk/aclk_proxy.h
@@ -0,0 +1,22 @@
+#ifndef ACLK_PROXY_H
+#define ACLK_PROXY_H
+
+#include <config.h>
+
+#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
+
+typedef enum aclk_proxy_type {
+ PROXY_TYPE_UNKNOWN = 0,
+ PROXY_TYPE_SOCKS5,
+ PROXY_TYPE_HTTP,
+ PROXY_DISABLED,
+ PROXY_NOT_SET,
+} ACLK_PROXY_TYPE;
+
+const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
+ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
+const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
+void safe_log_proxy_censor(char *proxy);
+const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
+
+#endif /* ACLK_PROXY_H */
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
new file mode 100644
index 0000000..5301c28
--- /dev/null
+++ b/aclk/aclk_query.c
@@ -0,0 +1,383 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_query.h"
+#include "aclk_stats.h"
+#include "aclk_tx_msgs.h"
+
+#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
+
+#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
+
+pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
+#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+
+static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url)
+{
+ usec_t t;
+
+ t = now_monotonic_high_precision_usec();
+ w->response.code = web_client_api_request_v1(host, w, url);
+ t = now_monotonic_high_precision_usec() - t;
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_q_process_total += t;
+ aclk_metrics_per_sample.cloud_q_process_count++;
+ if (aclk_metrics_per_sample.cloud_q_process_max < t)
+ aclk_metrics_per_sample.cloud_q_process_max = t;
+ ACLK_STATS_UNLOCK;
+ }
+
+ return t;
+}
+
+static RRDHOST *node_id_2_rrdhost(const char *node_id)
+{
+ int res;
+ uuid_t node_id_bin, host_id_bin;
+
+ RRDHOST *host = find_host_by_node_id((char *)node_id);
+ if (host)
+ return host;
+
+ char host_id[UUID_STR_LEN];
+ if (uuid_parse(node_id, node_id_bin)) {
+ error("Couldn't parse UUID %s", node_id);
+ return NULL;
+ }
+ if ((res = get_host_id(&node_id_bin, &host_id_bin))) {
+ error("node not found rc=%d", res);
+ return NULL;
+ }
+ uuid_unparse_lower(host_id_bin, host_id);
+ return rrdhost_find_by_guid(host_id);
+}
+
+#define NODE_ID_QUERY "/node/"
+// TODO this function should be quarantied and written nicely
+// lots of skeletons from initial ACLK Legacy impl.
+// quick and dirty from the start
+static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
+{
+ int retval = 0;
+ usec_t t;
+ BUFFER *local_buffer = NULL;
+ BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE);
+ RRDHOST *query_host = localhost;
+
+#ifdef NETDATA_WITH_ZLIB
+ int z_ret;
+ BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ char *start, *end;
+#endif
+
+ struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+ w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
+ w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
+ w->acl = WEB_CLIENT_ACL_ACLK;
+
+ buffer_strcat(log_buffer, query->data.http_api_v2.query);
+ size_t size = 0;
+ size_t sent = 0;
+ w->tv_in = query->created_tv;
+ now_realtime_timeval(&w->tv_ready);
+
+ if (query->timeout) {
+ int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000);
+ if (in_queue > query->timeout) {
+ log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout);
+ retval = 1;
+ w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0);
+ goto cleanup;
+ }
+ }
+
+ if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
+ char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
+ char nodeid[UUID_STR_LEN];
+ if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
+ error_report(CLOUD_EMSG_MALFORMED_NODE_ID);
+ retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0);
+ goto cleanup;
+ }
+ strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1);
+
+ query_host = node_id_2_rrdhost(nodeid);
+ if (!query_host) {
+ error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
+ retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
+ goto cleanup;
+ }
+ }
+
+ char *mysep = strchr(query->data.http_api_v2.query, '?');
+ if (mysep) {
+ url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+ *mysep = '\0';
+ } else
+ url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+
+ mysep = strrchr(query->data.http_api_v2.query, '/');
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ int stat_idx = aclk_cloud_req_http_type_to_idx(mysep ? mysep + 1 : "other");
+ aclk_metrics_per_sample.cloud_req_http_by_type[stat_idx]++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ // execute the query
+ t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
+ size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
+ sent = size;
+
+#ifdef NETDATA_WITH_ZLIB
+ // check if gzip encoding can and should be used
+ if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
+ start += strlen(WEB_HDR_ACCEPT_ENC);
+ end = strstr(start, "\x0D\x0A");
+ start = strstr(start, "gzip");
+
+ if (start && start < end) {
+ w->response.zstream.zalloc = Z_NULL;
+ w->response.zstream.zfree = Z_NULL;
+ w->response.zstream.opaque = Z_NULL;
+ if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
+ w->response.zinitialized = 1;
+ w->response.zoutput = 1;
+ } else
+ error("Failed to initialize zlib. Proceeding without compression.");
+ }
+ }
+
+ if (w->response.data->len && w->response.zinitialized) {
+ w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
+ w->response.zstream.avail_in = w->response.data->len;
+ do {
+ w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
+ w->response.zstream.next_out = w->response.zbuffer;
+ z_ret = deflate(&w->response.zstream, Z_FINISH);
+ if(z_ret < 0) {
+ if(w->response.zstream.msg)
+ error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg);
+ else
+ error("Unknown error during zlib compression.");
+ retval = 1;
+ w->response.code = 500;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_ZLIB_ERROR, CLOUD_EMSG_ZLIB_ERROR, NULL, 0);
+ goto cleanup;
+ }
+ int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
+ buffer_need_bytes(z_buffer, bytes_to_cpy);
+ memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
+ z_buffer->len += bytes_to_cpy;
+ } while(z_ret != Z_STREAM_END);
+ // so that web_client_build_http_header
+ // puts correct content length into header
+ buffer_free(w->response.data);
+ w->response.data = z_buffer;
+ z_buffer = NULL;
+ }
+#endif
+
+ w->response.data->date = w->tv_ready.tv_sec;
+ web_client_build_http_header(w);
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer->contenttype = CT_APPLICATION_JSON;
+
+ buffer_strcat(local_buffer, w->response.header_output->buffer);
+
+ if (w->response.data->len) {
+#ifdef NETDATA_WITH_ZLIB
+ if (w->response.zinitialized) {
+ buffer_need_bytes(local_buffer, w->response.data->len);
+ memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
+ local_buffer->len += w->response.data->len;
+ sent = sent - size + w->response.data->len;
+ } else {
+#endif
+ buffer_strcat(local_buffer, w->response.data->buffer);
+#ifdef NETDATA_WITH_ZLIB
+ }
+#endif
+ }
+
+ // send msg.
+ aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
+
+ struct timeval tv;
+
+cleanup:
+ now_realtime_timeval(&tv);
+ log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
+ w->id
+ , gettid()
+ , query_thr->idx
+ , "DATA"
+ , sent
+ , size
+ , size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0)
+ , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
+ , dt_usec(&tv, &w->tv_ready) / 1000.0
+ , dt_usec(&tv, &w->tv_in) / 1000.0
+ , w->response.code
+ , strip_control_characters((char *)buffer_tostring(log_buffer))
+ );
+
+#ifdef NETDATA_WITH_ZLIB
+ if(w->response.zinitialized)
+ deflateEnd(&w->response.zstream);
+ buffer_free(z_buffer);
+#endif
+ buffer_free(w->response.data);
+ buffer_free(w->response.header);
+ buffer_free(w->response.header_output);
+ freez(w);
+ buffer_free(local_buffer);
+ buffer_free(log_buffer);
+ return retval;
+}
+
+static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
+{
+ // this will be simplified when legacy support is removed
+ aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
+ return 0;
+}
+
+const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok)
+{
+ switch (qt) {
+ case HTTP_API_V2: return "http_api_request_v2";
+ case REGISTER_NODE: return "register_node";
+ case NODE_STATE_UPDATE: return "node_state_update";
+ case CHART_DIMS_UPDATE: return "chart_and_dim_update";
+ case CHART_CONFIG_UPDATED: return "chart_config_updated";
+ case CHART_RESET: return "reset_chart_messages";
+ case RETENTION_UPDATED: return "update_retention_info";
+ case UPDATE_NODE_INFO: return "update_node_info";
+ case ALARM_LOG_HEALTH: return "alarm_log_health";
+ case ALARM_PROVIDE_CFG: return "provide_alarm_config";
+ case ALARM_SNAPSHOT: return "alarm_snapshot";
+ case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
+ case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
+ default:
+ if (!unknown_ok)
+ error_report("Unknown query type used %d", (int) qt);
+ return "unknown";
+ }
+}
+
+static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
+{
+ if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
+ error_report("Unknown query in query queue. %u", query->type);
+ aclk_query_free(query);
+ return;
+ }
+
+ worker_is_busy(query->type);
+ if (query->type == HTTP_API_V2) {
+ debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
+ http_api_v2(query_thr, query);
+ } else {
+ debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name);
+ send_bin_msg(query_thr, query);
+ }
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_dispatched++;
+ aclk_queries_per_thread[query_thr->idx]++;
+ aclk_metrics_per_sample.queries_per_type[query->type]++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ aclk_query_free(query);
+
+ worker_is_idle();
+}
+
+/* Processes messages from queue. Compete for work with other threads
+ */
+int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
+{
+ aclk_query_t query;
+ while ((query = aclk_queue_pop()))
+ aclk_query_process_msg(query_thr, query);
+
+ return 0;
+}
+
+static void worker_aclk_register(void) {
+ worker_register("ACLKQUERY");
+ for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
+ worker_register_job_name(i, aclk_query_get_name(i, 0));
+ }
+}
+
+/**
+ * Main query processing thread
+ */
+void *aclk_query_main_thread(void *ptr)
+{
+ worker_aclk_register();
+
+ struct aclk_query_thread *query_thr = ptr;
+
+ while (!netdata_exit) {
+ aclk_query_process_msgs(query_thr);
+
+ worker_is_idle();
+ QUERY_THREAD_LOCK;
+ if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+ sleep_usec(USEC_PER_SEC * 1);
+ QUERY_THREAD_UNLOCK;
+ }
+
+ worker_unregister();
+ return NULL;
+}
+
+#define TASK_LEN_MAX 22
+void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client)
+{
+ info("Starting %d query threads.", query_threads->count);
+
+ char thread_name[TASK_LEN_MAX];
+ query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
+ for (int i = 0; i < query_threads->count; i++) {
+ query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
+
+ if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
+ error("snprintf encoding error");
+ netdata_thread_create(
+ &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
+ &query_threads->thread_list[i]);
+
+ query_threads->thread_list[i].client = client;
+ }
+}
+
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
+{
+ if (query_threads && query_threads->thread_list) {
+ for (int i = 0; i < query_threads->count; i++) {
+ netdata_thread_join(query_threads->thread_list[i].thread, NULL);
+ }
+ freez(query_threads->thread_list);
+ }
+ aclk_queue_lock();
+ aclk_queue_flush();
+}
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
new file mode 100644
index 0000000..c006b01
--- /dev/null
+++ b/aclk/aclk_query.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_QUERY_H
+#define NETDATA_ACLK_QUERY_H
+
+#include "libnetdata/libnetdata.h"
+
+#include "mqtt_wss_client.h"
+
+#include "aclk_query_queue.h"
+
+extern pthread_cond_t query_cond_wait;
+extern pthread_mutex_t query_lock_wait;
+#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
+
+// TODO
+//extern volatile int aclk_connected;
+
+struct aclk_query_thread {
+ netdata_thread_t thread;
+ int idx;
+ mqtt_wss_client client;
+};
+
+struct aclk_query_threads {
+ struct aclk_query_thread *thread_list;
+ int count;
+};
+
+void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
+void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
+
+const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok);
+
+#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
new file mode 100644
index 0000000..9a45057
--- /dev/null
+++ b/aclk/aclk_query_queue.c
@@ -0,0 +1,136 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_query_queue.h"
+#include "aclk_query.h"
+#include "aclk_stats.h"
+
+static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
+#define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex)
+#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex)
+
+static struct aclk_query_queue {
+ aclk_query_t head;
+ aclk_query_t tail;
+ int block_push;
+} aclk_query_queue = {
+ .head = NULL,
+ .tail = NULL,
+ .block_push = 0
+};
+
+static inline int _aclk_queue_query(aclk_query_t query)
+{
+ now_realtime_timeval(&query->created_tv);
+ query->created = now_realtime_usec();
+
+ ACLK_QUEUE_LOCK;
+ if (aclk_query_queue.block_push) {
+ ACLK_QUEUE_UNLOCK;
+ if(!netdata_exit)
+ error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
+ aclk_query_free(query);
+ return 1;
+ }
+ if (!aclk_query_queue.head) {
+ aclk_query_queue.head = query;
+ aclk_query_queue.tail = query;
+ ACLK_QUEUE_UNLOCK;
+ return 0;
+ }
+ // TODO deduplication
+ aclk_query_queue.tail->next = query;
+ aclk_query_queue.tail = query;
+ ACLK_QUEUE_UNLOCK;
+ return 0;
+
+}
+
+int aclk_queue_query(aclk_query_t query)
+{
+ int ret = _aclk_queue_query(query);
+ if (!ret) {
+ QUERY_THREAD_WAKEUP;
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_queued++;
+ ACLK_STATS_UNLOCK;
+ }
+ }
+ return ret;
+}
+
+aclk_query_t aclk_queue_pop(void)
+{
+ aclk_query_t ret;
+
+ ACLK_QUEUE_LOCK;
+ if (aclk_query_queue.block_push) {
+ ACLK_QUEUE_UNLOCK;
+ if(!netdata_exit)
+ error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
+ return NULL;
+ }
+
+ ret = aclk_query_queue.head;
+ if (!ret) {
+ ACLK_QUEUE_UNLOCK;
+ return ret;
+ }
+
+ aclk_query_queue.head = ret->next;
+ if (unlikely(!aclk_query_queue.head))
+ aclk_query_queue.tail = aclk_query_queue.head;
+ ACLK_QUEUE_UNLOCK;
+
+ ret->next = NULL;
+ return ret;
+}
+
+void aclk_queue_flush(void)
+{
+ aclk_query_t query = aclk_queue_pop();
+ while (query) {
+ aclk_query_free(query);
+ query = aclk_queue_pop();
+ };
+}
+
+aclk_query_t aclk_query_new(aclk_query_type_t type)
+{
+ aclk_query_t query = callocz(1, sizeof(struct aclk_query));
+ query->type = type;
+ return query;
+}
+
+void aclk_query_free(aclk_query_t query)
+{
+ switch (query->type) {
+ case HTTP_API_V2:
+ freez(query->data.http_api_v2.payload);
+ if (query->data.http_api_v2.query != query->dedup_id)
+ freez(query->data.http_api_v2.query);
+ break;
+
+ default:
+ break;
+ }
+
+ freez(query->dedup_id);
+ freez(query->callback_topic);
+ freez(query->msg_id);
+ freez(query);
+}
+
+void aclk_queue_lock(void)
+{
+ ACLK_QUEUE_LOCK;
+ aclk_query_queue.block_push = 1;
+ ACLK_QUEUE_UNLOCK;
+}
+
+void aclk_queue_unlock(void)
+{
+ ACLK_QUEUE_LOCK;
+ aclk_query_queue.block_push = 0;
+ ACLK_QUEUE_UNLOCK;
+}
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
new file mode 100644
index 0000000..ab94b63
--- /dev/null
+++ b/aclk/aclk_query_queue.h
@@ -0,0 +1,86 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_QUERY_QUEUE_H
+#define NETDATA_ACLK_QUERY_QUEUE_H
+
+#include "libnetdata/libnetdata.h"
+#include "daemon/common.h"
+#include "schema-wrappers/schema_wrappers.h"
+
+#include "aclk_util.h"
+
+typedef enum {
+ UNKNOWN = 0,
+ HTTP_API_V2,
+ REGISTER_NODE,
+ NODE_STATE_UPDATE,
+ CHART_DIMS_UPDATE,
+ CHART_CONFIG_UPDATED,
+ CHART_RESET,
+ RETENTION_UPDATED,
+ UPDATE_NODE_INFO,
+ ALARM_LOG_HEALTH,
+ ALARM_PROVIDE_CFG,
+ ALARM_SNAPSHOT,
+ UPDATE_NODE_COLLECTORS,
+ PROTO_BIN_MESSAGE,
+ ACLK_QUERY_TYPE_COUNT // always keep this as last
+} aclk_query_type_t;
+
+struct aclk_query_http_api_v2 {
+ char *payload;
+ char *query;
+};
+
+struct aclk_bin_payload {
+ char *payload;
+ size_t size;
+ enum aclk_topics topic;
+ const char *msg_name;
+};
+
+typedef struct aclk_query *aclk_query_t;
+struct aclk_query {
+ aclk_query_type_t type;
+
+ // dedup_id is used to deduplicate queries in the list
+ // if type and dedup_id is the same message is deduplicated
+ // set dedup_id to NULL to never deduplicate the message
+ // set dedup_id to constant (e.g. empty string "") to make
+ // message of this type ever exist only once in the list
+ char *dedup_id;
+ char *callback_topic;
+ char *msg_id;
+
+ struct timeval created_tv;
+ usec_t created;
+ int timeout;
+ aclk_query_t next;
+
+ // TODO maybe remove?
+ int version;
+ union {
+ struct aclk_query_http_api_v2 http_api_v2;
+ struct aclk_bin_payload bin_payload;
+ } data;
+};
+
+aclk_query_t aclk_query_new(aclk_query_type_t type);
+void aclk_query_free(aclk_query_t query);
+
+int aclk_queue_query(aclk_query_t query);
+aclk_query_t aclk_queue_pop(void);
+void aclk_queue_flush(void);
+
+void aclk_queue_lock(void);
+void aclk_queue_unlock(void);
+
+#define QUEUE_IF_PAYLOAD_PRESENT(query) \
+ if (likely(query->data.bin_payload.payload)) { \
+ aclk_queue_query(query); \
+ } else { \
+ error("Failed to generate payload (%s)", __FUNCTION__); \
+ aclk_query_free(query); \
+ }
+
+#endif /* NETDATA_ACLK_QUERY_QUEUE_H */
diff --git a/aclk/aclk_rrdhost_state.h b/aclk/aclk_rrdhost_state.h
new file mode 100644
index 0000000..5c8a2dd
--- /dev/null
+++ b/aclk/aclk_rrdhost_state.h
@@ -0,0 +1,11 @@
+#ifndef ACLK_RRDHOST_STATE_H
+#define ACLK_RRDHOST_STATE_H
+
+#include "libnetdata/libnetdata.h"
+
+typedef struct aclk_rrdhost_state {
+ char *claimed_id; // Claimed ID if host has one otherwise NULL
+ char *prev_claimed_id; // Claimed ID if changed (reclaimed) during runtime
+} aclk_rrdhost_state;
+
+#endif /* ACLK_RRDHOST_STATE_H */
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
new file mode 100644
index 0000000..83bc550
--- /dev/null
+++ b/aclk/aclk_rx_msgs.c
@@ -0,0 +1,551 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_rx_msgs.h"
+
+#include "aclk_stats.h"
+#include "aclk_query_queue.h"
+#include "aclk.h"
+#include "aclk_capas.h"
+
+#include "schema-wrappers/proto_2_json.h"
+
+#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
+#define ACLK_CLOUD_REQ_V2_PREFIX "GET /"
+
+#define ACLK_V_COMPRESSION 2
+
+struct aclk_request {
+ char *type_id;
+ char *msg_id;
+ char *callback_topic;
+ char *payload;
+ int version;
+ int timeout;
+ int min_version;
+ int max_version;
+};
+
+static int cloud_to_agent_parse(JSON_ENTRY *e)
+{
+ struct aclk_request *data = e->callback_data;
+
+ switch (e->type) {
+ case JSON_OBJECT:
+ case JSON_ARRAY:
+ break;
+ case JSON_STRING:
+ if (!strcmp(e->name, "msg-id")) {
+ data->msg_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "type")) {
+ data->type_id = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "callback-topic")) {
+ data->callback_topic = strdupz(e->data.string);
+ break;
+ }
+ if (!strcmp(e->name, "payload")) {
+ if (likely(e->data.string)) {
+ size_t len = strlen(e->data.string);
+ data->payload = mallocz(len+1);
+ if (!url_decode_r(data->payload, e->data.string, len + 1))
+ strcpy(data->payload, e->data.string);
+ }
+ break;
+ }
+ break;
+ case JSON_NUMBER:
+ if (!strcmp(e->name, "version")) {
+ data->version = (int)e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "timeout")) {
+ data->timeout = (int)e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "min-version")) {
+ data->min_version = (int)e->data.number;
+ break;
+ }
+ if (!strcmp(e->name, "max-version")) {
+ data->max_version = (int)e->data.number;
+ break;
+ }
+
+ break;
+
+ case JSON_BOOLEAN:
+ break;
+
+ case JSON_NULL:
+ break;
+ }
+ return 0;
+}
+
+static inline int aclk_extract_v2_data(char *payload, char **data)
+{
+ char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR);
+ if(!ptr)
+ return 1;
+ ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR);
+ *data = strdupz(ptr);
+ return 0;
+}
+
+static inline int aclk_v2_payload_get_query(const char *payload, char **query_url)
+{
+ const char *start, *end;
+
+ // TODO better check of URL
+ if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) {
+ errno = 0;
+ error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
+ return 1;
+ }
+ start = payload + 4;
+
+ if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) {
+ errno = 0;
+ error("Doesn't look like HTTP GET request.");
+ return 1;
+ }
+
+ *query_url = mallocz((end - start) + 1);
+ strncpyz(*query_url, start, end - start);
+
+ return 0;
+}
+
+static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
+{
+ aclk_query_t query;
+
+ errno = 0;
+ if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
+ error(
+ "This handler cannot reply to request with version older than %d, received %d.",
+ ACLK_V_COMPRESSION,
+ cloud_to_agent->version);
+ return 1;
+ }
+
+ query = aclk_query_new(HTTP_API_V2);
+
+ if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) {
+ error("Error extracting payload expected after the JSON dictionary.");
+ goto error;
+ }
+
+ if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) {
+ error("Could not extract payload from query");
+ goto error;
+ }
+
+ if (unlikely(!cloud_to_agent->callback_topic)) {
+ error("Missing callback_topic");
+ goto error;
+ }
+
+ if (unlikely(!cloud_to_agent->msg_id)) {
+ error("Missing msg_id");
+ goto error;
+ }
+
+ // aclk_queue_query takes ownership of data pointer
+ query->callback_topic = cloud_to_agent->callback_topic;
+ query->timeout = cloud_to_agent->timeout;
+ // for clarity and code readability as when we process the request
+ // it would be strange to get URL from `dedup_id`
+ query->data.http_api_v2.query = query->dedup_id;
+ query->msg_id = cloud_to_agent->msg_id;
+ aclk_queue_query(query);
+ return 0;
+
+error:
+ aclk_query_free(query);
+ return 1;
+}
+
+int aclk_handle_cloud_cmd_message(char *payload)
+{
+ struct aclk_request cloud_to_agent;
+ memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
+
+ if (unlikely(!payload)) {
+ error_report("ACLK incoming 'cmd' message is empty");
+ return 1;
+ }
+
+ debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload);
+
+ int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+
+ if (unlikely(rc != JSON_OK)) {
+ error_report("Malformed json request (%s)", payload);
+ goto err_cleanup;
+ }
+
+ if (!cloud_to_agent.type_id) {
+ error_report("Cloud message is missing compulsory key \"type\"");
+ goto err_cleanup;
+ }
+
+ // Originally we were expecting to have multiple types of 'cmd' message,
+ // but after the new protocol was designed we will ever only have 'http'
+ if (strcmp(cloud_to_agent.type_id, "http")) {
+ error_report("Only 'http' cmd message is supported");
+ goto err_cleanup;
+ }
+
+ if (likely(!aclk_handle_cloud_http_request_v2(&cloud_to_agent, payload))) {
+ // aclk_handle_cloud_request takes ownership of the pointers
+ // (to avoid copying) in case of success
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
+
+err_cleanup:
+ if (cloud_to_agent.payload)
+ freez(cloud_to_agent.payload);
+ if (cloud_to_agent.type_id)
+ freez(cloud_to_agent.type_id);
+ if (cloud_to_agent.msg_id)
+ freez(cloud_to_agent.msg_id);
+ if (cloud_to_agent.callback_topic)
+ freez(cloud_to_agent.callback_topic);
+
+ return 1;
+}
+
+typedef uint32_t simple_hash_t;
+typedef int(*rx_msg_handler)(const char *msg, size_t msg_len);
+
+int handle_old_proto_cmd(const char *msg, size_t msg_len)
+{
+ // msg is binary payload in all other cases
+ // however in this message from old legacy cloud
+ // we have to convert it to C string
+ char *str = mallocz(msg_len+1);
+ memcpy(str, msg, msg_len);
+ str[msg_len] = 0;
+ if (aclk_handle_cloud_cmd_message(str)) {
+ freez(str);
+ return 1;
+ }
+ freez(str);
+ return 0;
+}
+
+int create_node_instance_result(const char *msg, size_t msg_len)
+{
+ node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
+ if (!res.machine_guid || !res.node_id) {
+ error_report("Error parsing CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
+ }
+
+ debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
+
+ uuid_t host_id, node_id;
+ if (uuid_parse(res.machine_guid, host_id)) {
+ error("Error parsing machine_guid provided by CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
+ }
+ if (uuid_parse(res.node_id, node_id)) {
+ error("Error parsing node_id provided by CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
+ }
+ update_node_id(&host_id, &node_id);
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ node_instance_connection_t node_state_update = {
+ .hops = 1,
+ .live = 0,
+ .queryable = 1,
+ .session_id = aclk_session_newarch,
+ .node_id = res.node_id
+ };
+
+ RRDHOST *host = rrdhost_find_by_guid(res.machine_guid);
+ if (host) {
+ // not all host must have RRDHOST struct created for them
+ // if they never connected during runtime of agent
+ if (host == localhost) {
+ node_state_update.live = 1;
+ node_state_update.hops = 0;
+ } else {
+ netdata_mutex_lock(&host->receiver_lock);
+ node_state_update.live = (host->receiver != NULL);
+ netdata_mutex_unlock(&host->receiver_lock);
+ node_state_update.hops = host->system_info->hops;
+ }
+ }
+
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
+
+ rrdhost_aclk_state_lock(localhost);
+ node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+ rrdhost_aclk_state_unlock(localhost);
+
+ freez((void *)node_state_update.capabilities);
+
+ query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+
+ aclk_queue_query(query);
+ freez(res.node_id);
+ freez(res.machine_guid);
+ return 0;
+}
+
+int send_node_instances(const char *msg, size_t msg_len)
+{
+ UNUSED(msg);
+ UNUSED(msg_len);
+ aclk_send_node_instances();
+ return 0;
+}
+
+int stream_charts_and_dimensions(const char *msg, size_t msg_len)
+{
+ UNUSED(msg);
+ UNUSED(msg_len);
+ error_report("Received obsolete StreamChartsAndDimensions msg");
+ return 0;
+}
+
+int charts_and_dimensions_ack(const char *msg, size_t msg_len)
+{
+ UNUSED(msg);
+ UNUSED(msg_len);
+ error_report("Received obsolete StreamChartsAndDimensionsAck msg");
+ return 0;
+}
+
+int update_chart_configs(const char *msg, size_t msg_len)
+{
+ UNUSED(msg);
+ UNUSED(msg_len);
+ error_report("Received obsolete UpdateChartConfigs msg");
+ return 0;
+}
+
+int start_alarm_streaming(const char *msg, size_t msg_len)
+{
+ struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
+ if (!res.node_id || !res.batch_id) {
+ error("Error parsing StartAlarmStreaming");
+ freez(res.node_id);
+ return 1;
+ }
+ aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int send_alarm_log_health(const char *msg, size_t msg_len)
+{
+ char *node_id = parse_send_alarm_log_health(msg, msg_len);
+ if (!node_id) {
+ error("Error parsing SendAlarmLogHealth");
+ return 1;
+ }
+ aclk_send_alarm_health_log(node_id);
+ freez(node_id);
+ return 0;
+}
+
+int send_alarm_configuration(const char *msg, size_t msg_len)
+{
+ char *config_hash = parse_send_alarm_configuration(msg, msg_len);
+ if (!config_hash || !*config_hash) {
+ error("Error parsing SendAlarmConfiguration");
+ freez(config_hash);
+ return 1;
+ }
+ aclk_send_alarm_configuration(config_hash);
+ freez(config_hash);
+ return 0;
+}
+
+int send_alarm_snapshot(const char *msg, size_t msg_len)
+{
+ struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
+ if (!sas->node_id || !sas->claim_id) {
+ error("Error parsing SendAlarmSnapshot");
+ destroy_send_alarm_snapshot(sas);
+ return 1;
+ }
+ aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ destroy_send_alarm_snapshot(sas);
+ return 0;
+}
+
+int handle_disconnect_req(const char *msg, size_t msg_len)
+{
+ struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
+ if (!cmd)
+ return 1;
+ if (cmd->permaban) {
+ error("Cloud Banned This Agent!");
+ aclk_disable_runtime = 1;
+ }
+ info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
+ if (cmd->reconnect_after_s > 0) {
+ aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
+ info(
+ "Cloud asks not to reconnect for %u seconds. We shall honor that request",
+ (unsigned int)cmd->reconnect_after_s);
+ }
+ disconnect_req = 1;
+ freez(cmd->error_description);
+ freez(cmd);
+ return 0;
+}
+
+int contexts_checkpoint(const char *msg, size_t msg_len)
+{
+ aclk_ctx_based = 1;
+
+ struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_checkpoint_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
+int stop_streaming_contexts(const char *msg, size_t msg_len)
+{
+ if (!aclk_ctx_based) {
+ error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message");
+ return 1;
+ }
+
+ struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_stop_streaming_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
+typedef struct {
+ const char *name;
+ simple_hash_t name_hash;
+ rx_msg_handler fnc;
+} new_cloud_rx_msg_t;
+
+new_cloud_rx_msg_t rx_msgs[] = {
+ { .name = "cmd", .name_hash = 0, .fnc = handle_old_proto_cmd },
+ { .name = "CreateNodeInstanceResult", .name_hash = 0, .fnc = create_node_instance_result },
+ { .name = "SendNodeInstances", .name_hash = 0, .fnc = send_node_instances },
+ { .name = "StreamChartsAndDimensions", .name_hash = 0, .fnc = stream_charts_and_dimensions },
+ { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack },
+ { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs },
+ { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming },
+ { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health },
+ { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
+ { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
+ { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
+ { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint },
+ { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts },
+ { .name = NULL, .name_hash = 0, .fnc = NULL },
+};
+
+new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash)
+{
+ // we can afford to not compare strings after hash match
+ // because we check for collisions at initialization in
+ // aclk_init_rx_msg_handlers()
+ for (int i = 0; rx_msgs[i].fnc; i++) {
+ if (rx_msgs[i].name_hash == hash)
+ return &rx_msgs[i];
+ }
+ return NULL;
+}
+
+const char *rx_handler_get_name(size_t i)
+{
+ return rx_msgs[i].name;
+}
+
+unsigned int aclk_init_rx_msg_handlers(void)
+{
+ int i;
+ for (i = 0; rx_msgs[i].fnc; i++) {
+ simple_hash_t hash = simple_hash(rx_msgs[i].name);
+ new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash);
+ if (unlikely(hdl)) {
+ // the list of message names changes only by changing
+ // the source code, therefore fatal is appropriate
+ fatal("Hash collision. Choose better hash. Added '%s' clashes with existing '%s'", rx_msgs[i].name, hdl->name);
+ }
+ rx_msgs[i].name_hash = hash;
+ }
+ return i;
+}
+
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic __maybe_unused)
+{
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
+ }
+ new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type));
+ debug(D_ACLK, "Got message named '%s' from cloud", message_type);
+ if (unlikely(!msg_descriptor)) {
+ error("Do not know how to handle message of type '%s'. Ignoring", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
+ return;
+ }
+
+
+ if (aclklog_enabled) {
+ if (!strncmp(message_type, "cmd", strlen("cmd"))) {
+ log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name);
+ } else {
+ char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name);
+ log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name);
+ freez(json);
+ }
+ }
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++;
+ ACLK_STATS_UNLOCK;
+ }
+ if (msg_descriptor->fnc(msg, msg_len)) {
+ error("Error processing message of type '%s'", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
+ }
+ return;
+ }
+}
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
new file mode 100644
index 0000000..61921fa
--- /dev/null
+++ b/aclk/aclk_rx_msgs.h
@@ -0,0 +1,17 @@
+
+
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_RX_MSGS_H
+#define ACLK_RX_MSGS_H
+
+#include "daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+int aclk_handle_cloud_cmd_message(char *payload);
+
+const char *rx_handler_get_name(size_t i);
+unsigned int aclk_init_rx_msg_handlers(void);
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic);
+
+#endif /* ACLK_RX_MSGS_H */
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
new file mode 100644
index 0000000..215313f
--- /dev/null
+++ b/aclk/aclk_stats.c
@@ -0,0 +1,408 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_stats.h"
+
+#include "aclk_query.h"
+
+netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
+
+struct {
+ int query_thread_count;
+ unsigned int proto_hdl_cnt;
+ uint32_t *aclk_proto_rx_msgs_sample;
+ RRDDIM **rx_msg_dims;
+} aclk_stats_cfg; // there is only 1 stats thread at a time
+
+// data ACLK stats need per query thread
+struct aclk_qt_data {
+ RRDDIM *dim;
+} *aclk_qt_data = NULL;
+
+uint32_t *aclk_queries_per_thread = NULL;
+uint32_t *aclk_queries_per_thread_sample = NULL;
+uint32_t *aclk_proto_rx_msgs_sample = NULL;
+
+struct aclk_metrics aclk_metrics = {
+ .online = 0,
+};
+
+struct aclk_metrics_per_sample aclk_metrics_per_sample;
+
+static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent)
+{
+ static RRDSET *st_aclkstats = NULL;
+ static RRDDIM *rd_online_status = NULL;
+
+ if (unlikely(!st_aclkstats)) {
+ st_aclkstats = rrdset_create_localhost(
+ "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status",
+ "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online);
+
+ rrdset_done(st_aclkstats);
+}
+
+static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st_query_thread = NULL;
+ static RRDDIM *rd_queued = NULL;
+ static RRDDIM *rd_dispatched = NULL;
+
+ if (unlikely(!st_query_thread)) {
+ st_query_thread = rrdset_create_localhost(
+ "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s",
+ "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA);
+
+ rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued);
+ rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched);
+
+ rrdset_done(st_query_thread);
+}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_avg = NULL;
+ static RRDDIM *rd_max = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms",
+ "netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ if(per_sample->latency_count)
+ rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count));
+ else
+ rrddim_set_by_pointer(st, rd_avg, 0);
+
+ rrddim_set_by_pointer(st, rd_max, per_sample->latency_max);
+
+ rrdset_done(st);
+}
+#endif
+
+static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_rcvd = NULL;
+ static RRDDIM *rd_rq_err = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
+ "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err);
+ rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT];
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s",
+ "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
+ dims[i] = rrddim_add(st, aclk_query_get_name(i, 1), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+
+ }
+
+ for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
+ rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]);
+
+ rrdset_done(st);
+}
+
+static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = {
+ "other",
+ "info",
+ "data",
+ "alarms",
+ "alarm_log",
+ "chart",
+ "charts"
+ // if you change then update `ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT`.
+};
+
+int aclk_cloud_req_http_type_to_idx(const char *name)
+{
+ for (int i = 1; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++)
+ if (!strcmp(cloud_req_http_type_names[i], name))
+ return i;
+ return 0;
+}
+
+static void aclk_stats_cloud_req_http_type(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT];
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req_http_type", NULL, "aclk", NULL, "Requests received from cloud via HTTP by their type", "req/s",
+ "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++)
+ rd_rq_types[i] = rrddim_add(st, cloud_req_http_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++)
+ rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_http_by_type[i]);
+
+ rrdset_done(st);
+}
+
+#define MAX_DIM_NAME 22
+static void aclk_stats_query_threads(uint32_t *queries_per_thread)
+{
+ static RRDSET *st = NULL;
+
+ char dim_name[MAX_DIM_NAME];
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
+ "netdata", "stats", 200009, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) {
+ if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
+ error("snprintf encoding error");
+ aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+ }
+
+ for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) {
+ rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
+ }
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_avg = NULL;
+ static RRDDIM *rd_rq_max = NULL;
+ static RRDDIM *rd_rq_total = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us",
+ "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ if(per_sample->cloud_q_process_count)
+ rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count));
+ else
+ rrddim_set_by_pointer(st, rd_rq_avg, 0);
+ rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max);
+ rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total);
+
+ rrdset_done(st);
+}
+
+const char *rx_handler_get_name(size_t i);
+static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
+{
+ static RRDSET *st = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_protobuf_rx_types", NULL, "aclk", NULL, "Received new cloud architecture messages by their type.", "msg/s",
+ "netdata", "stats", 200010, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) {
+ aclk_stats_cfg.rx_msg_dims[i] = rrddim_add(st, rx_handler_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+ }
+
+ for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++)
+ rrddim_set_by_pointer(st, aclk_stats_cfg.rx_msg_dims[i], rx_msgs_sample[i]);
+
+ rrdset_done(st);
+}
+
+static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_sent = NULL;
+ static RRDDIM *rd_recvd = NULL;
+ static uint64_t sent = 0;
+ static uint64_t recvd = 0;
+
+ sent += stats->bytes_tx;
+ recvd += stats->bytes_rx;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_openssl_bytes", NULL, "aclk", NULL, "Received and Sent bytes.", "B/s",
+ "netdata", "stats", 200011, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ }
+
+ rrddim_set_by_pointer(st, rd_sent, sent);
+ rrddim_set_by_pointer(st, rd_recvd, recvd);
+
+ rrdset_done(st);
+}
+
+void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
+{
+ aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
+ aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
+ aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+
+ memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+ aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt;
+ aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
+ aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
+ aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*));
+}
+
+void aclk_stats_thread_cleanup()
+{
+ freez(aclk_stats_cfg.rx_msg_dims);
+ freez(aclk_proto_rx_msgs_sample);
+ freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
+ freez(aclk_qt_data);
+ freez(aclk_queries_per_thread);
+ freez(aclk_queries_per_thread_sample);
+}
+
+void *aclk_stats_main_thread(void *ptr)
+{
+ struct aclk_stats_thread *args = ptr;
+
+ aclk_stats_cfg.query_thread_count = args->query_thread_count;
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+ usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
+
+ struct aclk_metrics_per_sample per_sample;
+ struct aclk_metrics permanent;
+
+ while (!netdata_exit) {
+ netdata_thread_testcancel();
+ // ------------------------------------------------------------------------
+ // Wait for the next iteration point.
+
+ heartbeat_next(&hb, step_ut);
+ if (netdata_exit) break;
+
+ ACLK_STATS_LOCK;
+ // to not hold lock longer than necessary, especially not to hold it
+ // during database rrd* operations
+ memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
+
+ memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
+ memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
+
+ memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
+ memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+ memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count);
+ memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count);
+ ACLK_STATS_UNLOCK;
+
+ aclk_stats_collect(&per_sample, &permanent);
+ aclk_stats_query_queue(&per_sample);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_latency(&per_sample);
+#endif
+
+ aclk_stats_cloud_req(&per_sample);
+ aclk_stats_cloud_req_type(&per_sample);
+ aclk_stats_cloud_req_http_type(&per_sample);
+
+ aclk_stats_query_threads(aclk_queries_per_thread_sample);
+
+ aclk_stats_query_time(&per_sample);
+
+ struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client);
+ aclk_stats_mqtt_wss(&mqtt_wss_stats);
+
+ aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
+ }
+
+ return 0;
+}
+
+void aclk_stats_upd_online(int online) {
+ if(!aclk_stats_enabled)
+ return;
+
+ ACLK_STATS_LOCK;
+ aclk_metrics.online = online;
+
+ if(!online)
+ aclk_metrics_per_sample.offline_during_sample = 1;
+ ACLK_STATS_UNLOCK;
+}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+static usec_t pub_time[UINT16_MAX + 1] = {0};
+void aclk_stats_msg_published(uint16_t id)
+{
+ ACLK_STATS_LOCK;
+ pub_time[id] = now_boottime_usec();
+ ACLK_STATS_UNLOCK;
+}
+
+void aclk_stats_msg_puback(uint16_t id)
+{
+ ACLK_STATS_LOCK;
+ usec_t t;
+
+ if (!aclk_stats_enabled) {
+ ACLK_STATS_UNLOCK;
+ return;
+ }
+
+ if (unlikely(!pub_time[id])) {
+ ACLK_STATS_UNLOCK;
+ error("Received PUBACK for unknown message?!");
+ return;
+ }
+
+ t = now_boottime_usec() - pub_time[id];
+ t /= USEC_PER_MS;
+ pub_time[id] = 0;
+ if (aclk_metrics_per_sample.latency_max < t)
+ aclk_metrics_per_sample.latency_max = t;
+
+ aclk_metrics_per_sample.latency_total += t;
+ aclk_metrics_per_sample.latency_count++;
+ ACLK_STATS_UNLOCK;
+}
+#endif /* NETDATA_INTERNAL_CHECKS */
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
new file mode 100644
index 0000000..bec9ac2
--- /dev/null
+++ b/aclk/aclk_stats.h
@@ -0,0 +1,79 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ACLK_STATS_H
+#define NETDATA_ACLK_STATS_H
+
+#include "daemon/common.h"
+#include "libnetdata/libnetdata.h"
+#include "aclk_query_queue.h"
+#include "mqtt_wss_client.h"
+
+#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
+
+extern netdata_mutex_t aclk_stats_mutex;
+
+#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
+#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
+
+// if you change update `cloud_req_http_type_names`.
+#define ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT 7
+
+int aclk_cloud_req_http_type_to_idx(const char *name);
+
+struct aclk_stats_thread {
+ netdata_thread_t *thread;
+ int query_thread_count;
+ mqtt_wss_client client;
+};
+
+// preserve between samples
+struct aclk_metrics {
+ volatile uint8_t online;
+};
+
+// reset to 0 on every sample
+extern struct aclk_metrics_per_sample {
+ /* in the unlikely event of ACLK disconnecting
+ and reconnecting under 1 sampling rate
+ we want to make sure we record the disconnection
+ despite it being then seemingly longer in graph */
+ volatile uint8_t offline_during_sample;
+
+ volatile uint32_t queries_queued;
+ volatile uint32_t queries_dispatched;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ volatile uint32_t latency_max;
+ volatile uint32_t latency_total;
+ volatile uint32_t latency_count;
+#endif
+
+ volatile uint32_t cloud_req_recvd;
+ volatile uint32_t cloud_req_err;
+
+ // query types.
+ volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT];
+
+ // HTTP-specific request types.
+ volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT];
+
+ volatile uint32_t cloud_q_process_total;
+ volatile uint32_t cloud_q_process_count;
+ volatile uint32_t cloud_q_process_max;
+} aclk_metrics_per_sample;
+
+extern uint32_t *aclk_proto_rx_msgs_sample;
+
+extern uint32_t *aclk_queries_per_thread;
+
+void *aclk_stats_main_thread(void *ptr);
+void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt);
+void aclk_stats_thread_cleanup();
+void aclk_stats_upd_online(int online);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+void aclk_stats_msg_published(uint16_t id);
+void aclk_stats_msg_puback(uint16_t id);
+#endif /* NETDATA_INTERNAL_CHECKS */
+
+#endif /* NETDATA_ACLK_STATS_H */
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
new file mode 100644
index 0000000..532b964
--- /dev/null
+++ b/aclk/aclk_tx_msgs.c
@@ -0,0 +1,276 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_tx_msgs.h"
+#include "daemon/common.h"
+#include "aclk_util.h"
+#include "aclk_stats.h"
+#include "aclk.h"
+#include "aclk_capas.h"
+
+#include "schema-wrappers/proto_2_json.h"
+
+#ifndef __GNUC__
+#pragma region aclk_tx_msgs helper functions
+#endif
+
+// version for aclk legacy (old cloud arch)
+#define ACLK_VERSION 2
+
+static void freez_aclk_publish5a(void *ptr) {
+ freez(ptr);
+}
+static void freez_aclk_publish5b(void *ptr) {
+ freez(ptr);
+}
+
+uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+{
+#ifndef ACLK_LOG_CONVERSATION_DIR
+ UNUSED(msgname);
+#endif
+ uint16_t packet_id;
+ const char *topic = aclk_get_topic(subtopic);
+
+ if (unlikely(!topic)) {
+ error("Couldn't get topic. Aborting message send.");
+ return 0;
+ }
+
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+
+ if (aclklog_enabled) {
+ char *json = protomsg_to_json(msg, msg_len, msgname);
+ log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
+ freez(json);
+ }
+
+ return packet_id;
+}
+
+// json_object_put returns int unfortunately :D
+// we need void(*fnc)(void *);
+static void json_object_put_wrapper(void *jsonobj)
+{
+ json_object_put(jsonobj);
+}
+
+#define TOPIC_MAX_LEN 512
+#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
+static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
+{
+ uint16_t packet_id;
+ const char *str;
+ char *full_msg = NULL;
+ int len;
+
+ if (unlikely(!topic || topic[0] != '/')) {
+ error ("Full topic required!");
+ json_object_put(msg);
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ }
+
+ str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+ len = strlen(str);
+
+ if (payload_len) {
+ full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
+
+ memcpy(full_msg, str, len);
+ json_object_put(msg);
+ msg = NULL;
+ memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
+ len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
+ memcpy(&full_msg[len], payload, payload_len);
+ len += payload_len;
+ }
+
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_msg_published(packet_id);
+#endif
+
+ return 0;
+}
+
+/*
+ * Creates universal header common for all ACLK messages. User gets ownership of json object created.
+ * Usually this is freed by send function after message has been sent.
+ */
+static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
+{
+ uuid_t uuid;
+ char uuid_str[36 + 1];
+ json_object *tmp;
+ json_object *obj = json_object_new_object();
+
+ tmp = json_object_new_string(type);
+ json_object_object_add(obj, "type", tmp);
+
+ if (unlikely(!msg_id)) {
+ uuid_generate(uuid);
+ uuid_unparse(uuid, uuid_str);
+ msg_id = uuid_str;
+ }
+
+ if (ts_secs == 0) {
+ ts_us = now_realtime_usec();
+ ts_secs = ts_us / USEC_PER_SEC;
+ ts_us = ts_us % USEC_PER_SEC;
+ }
+
+ tmp = json_object_new_string(msg_id);
+ json_object_object_add(obj, "msg-id", tmp);
+
+ tmp = json_object_new_int64(ts_secs);
+ json_object_object_add(obj, "timestamp", tmp);
+
+// TODO handle this somehow on older json-c
+// tmp = json_object_new_uint64(ts_us);
+// probably jso->_to_json_string -> custom function
+// jso->o.c_uint64 -> map this with pointer to signed int
+// commit that implements json_object_new_uint64 is 3c3b592
+// between 0.14 and 0.15
+ tmp = json_object_new_int64(ts_us);
+ json_object_object_add(obj, "timestamp-offset-usec", tmp);
+
+ tmp = json_object_new_int64(aclk_session_sec);
+ json_object_object_add(obj, "connect", tmp);
+
+// TODO handle this somehow see above
+// tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
+ tmp = json_object_new_int64(aclk_session_us);
+ json_object_object_add(obj, "connect-offset-usec", tmp);
+
+ tmp = json_object_new_int(version);
+ json_object_object_add(obj, "version", tmp);
+
+ return obj;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
+
+#ifndef __GNUC__
+#pragma region aclk_tx_msgs message generators
+#endif
+
+void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
+{
+ json_object *tmp, *msg;
+ msg = create_hdr("http", msg_id, 0, 0, 2);
+ tmp = json_object_new_int(http_code);
+ json_object_object_add(msg, "http-code", tmp);
+
+ tmp = json_object_new_int(ec);
+ json_object_object_add(msg, "error-code", tmp);
+
+ tmp = json_object_new_string(emsg);
+ json_object_object_add(msg, "error-description", tmp);
+
+ if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
+ error("Failed to send cancelation message for http reply");
+ }
+}
+
+void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
+{
+ json_object *tmp, *msg;
+
+ msg = create_hdr("http", msg_id, 0, 0, 2);
+
+ tmp = json_object_new_int64(t_exec);
+ json_object_object_add(msg, "t-exec", tmp);
+
+ tmp = json_object_new_int64(created);
+ json_object_object_add(msg, "t-rx", tmp);
+
+ tmp = json_object_new_int(http_code);
+ json_object_object_add(msg, "http-code", tmp);
+
+ int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
+
+ switch (rc) {
+ case HTTP_RESP_FORBIDDEN:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
+ break;
+ case HTTP_RESP_INTERNAL_SERVER_ERROR:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
+ break;
+ case HTTP_RESP_BACKEND_FETCH_FAILED:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
+ break;
+ }
+}
+
+uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
+ size_t len;
+ uint16_t pid;
+
+ update_agent_connection_t conn = {
+ .reachable = (reachable ? 1 : 0),
+ .lwt = 0,
+ .session_id = aclk_session_newarch,
+ .capabilities = aclk_get_agent_capas()
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return 0;
+ }
+ if (localhost->aclk_state.prev_claimed_id)
+ conn.claim_id = localhost->aclk_state.prev_claimed_id;
+ else
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(&len, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg) {
+ error("Error generating agent::v1::UpdateAgentConnection payload");
+ return 0;
+ }
+
+ pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
+ if (localhost->aclk_state.prev_claimed_id) {
+ freez(localhost->aclk_state.prev_claimed_id);
+ localhost->aclk_state.prev_claimed_id = NULL;
+ }
+ return pid;
+}
+
+char *aclk_generate_lwt(size_t *size) {
+ update_agent_connection_t conn = {
+ .reachable = 0,
+ .lwt = 1,
+ .session_id = aclk_session_newarch,
+ .capabilities = NULL
+ };
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("Internal error. Should not come here if not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return NULL;
+ }
+ conn.claim_id = localhost->aclk_state.claimed_id;
+
+ char *msg = generate_update_agent_connection(size, &conn);
+ rrdhost_aclk_state_unlock(localhost);
+
+ if (!msg)
+ error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
+
+ return msg;
+}
+
+#ifndef __GNUC__
+#pragma endregion
+#endif
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
new file mode 100644
index 0000000..31e5924
--- /dev/null
+++ b/aclk/aclk_tx_msgs.h
@@ -0,0 +1,20 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_TX_MSGS_H
+#define ACLK_TX_MSGS_H
+
+#include <json-c/json.h>
+#include "libnetdata/libnetdata.h"
+#include "daemon/common.h"
+#include "mqtt_wss_client.h"
+#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_util.h"
+
+uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
+
+void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len);
+void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
+
+uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
+char *aclk_generate_lwt(size_t *size);
+
+#endif
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
new file mode 100644
index 0000000..01eaedc
--- /dev/null
+++ b/aclk/aclk_util.c
@@ -0,0 +1,388 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_util.h"
+#include "aclk_proxy.h"
+
+#include "daemon/common.h"
+
+usec_t aclk_session_newarch = 0;
+
+aclk_env_t *aclk_env = NULL;
+
+int chart_batch_id;
+
+aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
+ if (!strcmp(str, "json")) {
+ return ACLK_ENC_JSON;
+ }
+ if (!strcmp(str, "proto")) {
+ return ACLK_ENC_PROTO;
+ }
+ return ACLK_ENC_UNKNOWN;
+}
+
+aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) {
+ if (!strcmp(str, "MQTTv3")) {
+ return ACLK_TRP_MQTT_3_1_1;
+ }
+ if (!strcmp(str, "MQTTv5")) {
+ return ACLK_TRP_MQTT_5;
+ }
+ return ACLK_TRP_UNKNOWN;
+}
+
+void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) {
+ freez(trp_desc->endpoint);
+}
+
+void aclk_env_t_destroy(aclk_env_t *env) {
+ freez(env->auth_endpoint);
+ if (env->transports) {
+ for (size_t i = 0; i < env->transport_count; i++) {
+ if(env->transports[i]) {
+ aclk_transport_desc_t_destroy(env->transports[i]);
+ freez(env->transports[i]);
+ env->transports[i] = NULL;
+ }
+ }
+ freez(env->transports);
+ }
+ if (env->capabilities) {
+ for (size_t i = 0; i < env->capability_count; i++)
+ freez(env->capabilities[i]);
+ freez(env->capabilities);
+ }
+}
+
+int aclk_env_has_capa(const char *capa)
+{
+ for (int i = 0; i < (int) aclk_env->capability_count; i++) {
+ if (!strcasecmp(capa, aclk_env->capabilities[i]))
+ return 1;
+ }
+ return 0;
+}
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+volatile int aclk_conversation_log_counter = 0;
+#endif
+
+#define ACLK_TOPIC_PREFIX "/agent/"
+
+struct aclk_topic {
+ enum aclk_topics topic_id;
+ // as received from cloud - we keep this for
+ // eventual topic list update when claim_id changes
+ char *topic_recvd;
+ // constructed topic
+ char *topic;
+};
+
+// This helps to cache finalized topics (assembled with claim_id)
+// to not have to alloc or create buffer and construct topic every
+// time message is sent as in old ACLK
+static struct aclk_topic **aclk_topic_cache = NULL;
+static size_t aclk_topic_cache_items = 0;
+
+void free_topic_cache(void)
+{
+ if (aclk_topic_cache) {
+ for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+ freez(aclk_topic_cache[i]->topic);
+ freez(aclk_topic_cache[i]->topic_recvd);
+ freez(aclk_topic_cache[i]);
+ }
+ freez(aclk_topic_cache);
+ aclk_topic_cache = NULL;
+ aclk_topic_cache_items = 0;
+ }
+}
+
+#define JSON_TOPIC_KEY_TOPIC "topic"
+#define JSON_TOPIC_KEY_NAME "name"
+
+struct topic_name {
+ enum aclk_topics id;
+ // cloud name - how is it called
+ // in answer to /password endpoint
+ const char *name;
+} topic_names[] = {
+ { .id = ACLK_TOPICID_CHART, .name = "chart" },
+ { .id = ACLK_TOPICID_ALARMS, .name = "alarms" },
+ { .id = ACLK_TOPICID_METADATA, .name = "meta" },
+ { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" },
+ { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" },
+ { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" },
+ { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" },
+ { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" },
+ { .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" },
+ { .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" },
+ { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" },
+ { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" },
+ { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" },
+ { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" },
+ { .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" },
+ { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" },
+ { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" },
+ { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" },
+ { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" },
+ { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" },
+ { .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
+};
+
+enum aclk_topics compulsory_topics[] = {
+// TODO remove old topics once not needed anymore
+ ACLK_TOPICID_CHART, //TODO from legacy
+ ACLK_TOPICID_ALARMS, //TODO from legacy
+ ACLK_TOPICID_METADATA, //TODO from legacy
+ ACLK_TOPICID_COMMAND,
+ ACLK_TOPICID_AGENT_CONN,
+ ACLK_TOPICID_CMD_NG_V1,
+ ACLK_TOPICID_CREATE_NODE,
+ ACLK_TOPICID_NODE_CONN,
+ ACLK_TOPICID_CHART_DIMS,
+ ACLK_TOPICID_CHART_CONFIGS_UPDATED,
+ ACLK_TOPICID_CHART_RESET,
+ ACLK_TOPICID_RETENTION_UPDATED,
+ ACLK_TOPICID_NODE_INFO,
+ ACLK_TOPICID_ALARM_LOG,
+ ACLK_TOPICID_ALARM_HEALTH,
+ ACLK_TOPICID_ALARM_CONFIG,
+ ACLK_TOPICID_ALARM_SNAPSHOT,
+ ACLK_TOPICID_NODE_COLLECTORS,
+ ACLK_TOPICID_CTXS_SNAPSHOT,
+ ACLK_TOPICID_CTXS_UPDATED,
+ ACLK_TOPICID_UNKNOWN
+};
+
+static enum aclk_topics topic_name_to_id(const char *name) {
+ struct topic_name *topic = topic_names;
+ while (topic->name) {
+ if (!strcmp(topic->name, name)) {
+ return topic->id;
+ }
+ topic++;
+ }
+ return ACLK_TOPICID_UNKNOWN;
+}
+
+static const char *topic_id_to_name(enum aclk_topics tid) {
+ struct topic_name *topic = topic_names;
+ while (topic->name) {
+ if (topic->id == tid)
+ return topic->name;
+ topic++;
+ }
+ return "unknown";
+}
+
+#define CLAIM_ID_REPLACE_TAG "#{claim_id}"
+static void topic_generate_final(struct aclk_topic *t) {
+ char *dest;
+ char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG);
+ if (!replace_tag)
+ return;
+
+ rrdhost_aclk_state_lock(localhost);
+ if (unlikely(!localhost->aclk_state.claimed_id)) {
+ error("This should never be called if agent not claimed");
+ rrdhost_aclk_state_unlock(localhost);
+ return;
+ }
+
+ t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id));
+ memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd);
+ dest = t->topic + (replace_tag - t->topic_recvd);
+
+ memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id));
+ dest += strlen(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ replace_tag += strlen(CLAIM_ID_REPLACE_TAG);
+ strcpy(dest, replace_tag);
+ dest += strlen(replace_tag);
+ *dest = 0;
+}
+
+static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic)
+{
+ struct json_object_iterator it;
+ struct json_object_iterator itEnd;
+
+ it = json_object_iter_begin(json);
+ itEnd = json_object_iter_end(json);
+
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+ error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string");
+ return 1;
+ }
+ topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it)));
+ if (topic->topic_id == ACLK_TOPICID_UNKNOWN) {
+ debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
+ }
+ json_object_iter_next(&it);
+ continue;
+ }
+ if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) {
+ if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+ error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string");
+ return 1;
+ }
+ topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ continue;
+ }
+
+ error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it));
+ json_object_iter_next(&it);
+ }
+
+ if (!topic->topic_recvd) {
+ error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC);
+ return 1;
+ }
+
+ topic_generate_final(topic);
+ aclk_topic_cache_items++;
+
+ return 0;
+}
+
+int aclk_generate_topic_cache(struct json_object *json)
+{
+ json_object *obj;
+
+ size_t array_size = json_object_array_length(json);
+ if (!array_size) {
+ error("Empty topic list!");
+ return 1;
+ }
+
+ if (aclk_topic_cache)
+ free_topic_cache();
+
+ aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *));
+
+ for (size_t i = 0; i < array_size; i++) {
+ obj = json_object_array_get_idx(json, i);
+ if (json_object_get_type(obj) != json_type_object) {
+ error("expected json_type_object");
+ return 1;
+ }
+ aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic));
+ if (topic_cache_add_topic(obj, aclk_topic_cache[i])) {
+ error("failed to parse topic @idx=%d", (int)i);
+ return 1;
+ }
+ }
+
+ for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) {
+ if (!aclk_get_topic(compulsory_topics[i])) {
+ error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i]));
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Build a topic based on sub_topic and final_topic
+ * if the sub topic starts with / assume that is an absolute topic
+ *
+ */
+const char *aclk_get_topic(enum aclk_topics topic)
+{
+ if (!aclk_topic_cache) {
+ error("Topic cache not initialized");
+ return NULL;
+ }
+
+ for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+ if (aclk_topic_cache[i]->topic_id == topic)
+ return aclk_topic_cache[i]->topic;
+ }
+ error("Unknown topic");
+ return NULL;
+}
+
+/*
+ * TBEB with randomness
+ *
+ * @param reset 1 - to reset the delay,
+ * 0 - to advance a step and calculate sleep time in ms
+ * @param min, max in seconds
+ * @returns delay in ms
+ *
+ */
+
+unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) {
+ static int attempt = -1;
+
+ if (reset) {
+ attempt = -1;
+ return 0;
+ }
+
+ attempt++;
+
+ if (attempt == 0) {
+ srandom(time(NULL));
+ return 0;
+ }
+
+ unsigned long int delay = pow(base, attempt - 1);
+ delay *= MSEC_PER_SEC;
+
+ delay += (random() % (MAX(1000, delay/2)));
+
+ if (delay <= min * MSEC_PER_SEC)
+ return min;
+
+ if (delay >= max * MSEC_PER_SEC)
+ return max;
+
+ return delay;
+}
+
+
+#define HTTP_PROXY_PREFIX "http://"
+void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type)
+{
+ ACLK_PROXY_TYPE pt;
+ const char *ptr = aclk_get_proxy(&pt);
+ char *tmp;
+ char *host;
+ if (pt != PROXY_TYPE_HTTP)
+ return;
+
+ *port = 0;
+
+ if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
+ ptr += strlen(HTTP_PROXY_PREFIX);
+
+ if ((tmp = strchr(ptr, '@')))
+ ptr = tmp;
+
+ if ((tmp = strchr(ptr, '/'))) {
+ host = mallocz((tmp - ptr) + 1);
+ memcpy(host, ptr, (tmp - ptr));
+ host[tmp - ptr] = 0;
+ } else
+ host = strdupz(ptr);
+
+ if ((tmp = strchr(host, ':'))) {
+ *tmp = 0;
+ tmp++;
+ *port = atoi(tmp);
+ }
+
+ if (*port <= 0 || *port > 65535)
+ *port = 8080;
+
+ *ohost = host;
+
+ if (type)
+ *type = MQTT_WSS_PROXY_HTTP;
+}
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
new file mode 100644
index 0000000..ed715e0
--- /dev/null
+++ b/aclk/aclk_util.h
@@ -0,0 +1,112 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_UTIL_H
+#define ACLK_UTIL_H
+
+#include "libnetdata/libnetdata.h"
+#include "mqtt_wss_client.h"
+
+#define CLOUD_EC_MALFORMED_NODE_ID 1
+#define CLOUD_EMSG_MALFORMED_NODE_ID "URL requests node_id but there is not enough chars following (for it to be valid uuid)."
+#define CLOUD_EC_NODE_NOT_FOUND 2
+#define CLOUD_EMSG_NODE_NOT_FOUND "Node with requested node_id not found"
+#define CLOUD_EC_ZLIB_ERROR 3
+#define CLOUD_EMSG_ZLIB_ERROR "Error during zlib compression"
+#define CLOUD_EC_REQ_REPLY_TOO_BIG 4
+#define CLOUD_EMSG_REQ_REPLY_TOO_BIG "Request reply produces message bigger than allowed maximum"
+#define CLOUD_EC_FAIL_TOPIC 5
+#define CLOUD_EMSG_FAIL_TOPIC "Internal Topic Error"
+#define CLOUD_EC_SND_TIMEOUT 6
+#define CLOUD_EMSG_SND_TIMEOUT "Timeout sending binpacked message"
+
+// Helper stuff which should not have any further inside ACLK dependency
+// and are supposed not to be needed outside of ACLK
+extern usec_t aclk_session_newarch;
+
+extern int chart_batch_id;
+
+typedef enum {
+ ACLK_ENC_UNKNOWN = 0,
+ ACLK_ENC_JSON,
+ ACLK_ENC_PROTO
+} aclk_encoding_type_t;
+
+typedef enum {
+ ACLK_TRP_UNKNOWN = 0,
+ ACLK_TRP_MQTT_3_1_1,
+ ACLK_TRP_MQTT_5
+} aclk_transport_type_t;
+
+typedef struct {
+ char *endpoint;
+ aclk_transport_type_t type;
+} aclk_transport_desc_t;
+
+typedef struct {
+ int base;
+ int max_s;
+ int min_s;
+} aclk_backoff_t;
+
+typedef struct {
+ char *auth_endpoint;
+ aclk_encoding_type_t encoding;
+
+ aclk_transport_desc_t **transports;
+ size_t transport_count;
+
+ char **capabilities;
+ size_t capability_count;
+
+ aclk_backoff_t backoff;
+} aclk_env_t;
+
+extern aclk_env_t *aclk_env;
+
+aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str);
+aclk_transport_type_t aclk_transport_type_t_from_str(const char *str);
+
+void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc);
+void aclk_env_t_destroy(aclk_env_t *env);
+int aclk_env_has_capa(const char *capa);
+
+enum aclk_topics {
+ ACLK_TOPICID_UNKNOWN = 0,
+ ACLK_TOPICID_CHART = 1,
+ ACLK_TOPICID_ALARMS = 2,
+ ACLK_TOPICID_METADATA = 3,
+ ACLK_TOPICID_COMMAND = 4,
+ ACLK_TOPICID_AGENT_CONN = 5,
+ ACLK_TOPICID_CMD_NG_V1 = 6,
+ ACLK_TOPICID_CREATE_NODE = 7,
+ ACLK_TOPICID_NODE_CONN = 8,
+ ACLK_TOPICID_CHART_DIMS = 9,
+ ACLK_TOPICID_CHART_CONFIGS_UPDATED = 10,
+ ACLK_TOPICID_CHART_RESET = 11,
+ ACLK_TOPICID_RETENTION_UPDATED = 12,
+ ACLK_TOPICID_NODE_INFO = 13,
+ ACLK_TOPICID_ALARM_LOG = 14,
+ ACLK_TOPICID_ALARM_HEALTH = 15,
+ ACLK_TOPICID_ALARM_CONFIG = 16,
+ ACLK_TOPICID_ALARM_SNAPSHOT = 17,
+ ACLK_TOPICID_NODE_COLLECTORS = 18,
+ ACLK_TOPICID_CTXS_SNAPSHOT = 19,
+ ACLK_TOPICID_CTXS_UPDATED = 20
+};
+
+const char *aclk_get_topic(enum aclk_topics topic);
+int aclk_generate_topic_cache(struct json_object *json);
+void free_topic_cache(void);
+// TODO
+// aclk_topics_reload //when claim id changes
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+extern volatile int aclk_conversation_log_counter;
+#define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST)
+#endif
+
+unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max);
+#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0)
+
+void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type);
+
+#endif /* ACLK_UTIL_H */
diff --git a/aclk/helpers/mqtt_wss_pal.h b/aclk/helpers/mqtt_wss_pal.h
new file mode 100644
index 0000000..5c89f8b
--- /dev/null
+++ b/aclk/helpers/mqtt_wss_pal.h
@@ -0,0 +1,19 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef MQTT_WSS_PAL_H
+#define MQTT_WSS_PAL_H
+
+#include "libnetdata/libnetdata.h"
+
+#undef OPENSSL_VERSION_095
+#undef OPENSSL_VERSION_097
+#undef OPENSSL_VERSION_110
+#undef OPENSSL_VERSION_111
+
+#define mw_malloc(...) mallocz(__VA_ARGS__)
+#define mw_calloc(...) callocz(__VA_ARGS__)
+#define mw_free(...) freez(__VA_ARGS__)
+#define mw_strdup(...) strdupz(__VA_ARGS__)
+#define mw_realloc(...) reallocz(__VA_ARGS__)
+
+#endif /* MQTT_WSS_PAL_H */
diff --git a/aclk/helpers/ringbuffer_pal.h b/aclk/helpers/ringbuffer_pal.h
new file mode 100644
index 0000000..2f7e1cb
--- /dev/null
+++ b/aclk/helpers/ringbuffer_pal.h
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef RINGBUFFER_PAL_H
+#define RINGBUFFER_PAL_H
+
+#include "libnetdata/libnetdata.h"
+
+#define crbuf_malloc(...) mallocz(__VA_ARGS__)
+#define crbuf_free(...) freez(__VA_ARGS__)
+
+#endif /* RINGBUFFER_PAL_H */
diff --git a/aclk/https_client.c b/aclk/https_client.c
new file mode 100644
index 0000000..1a32f83
--- /dev/null
+++ b/aclk/https_client.c
@@ -0,0 +1,688 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
+
+#include "https_client.h"
+
+#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
+
+enum http_parse_state {
+ HTTP_PARSE_INITIAL = 0,
+ HTTP_PARSE_HEADERS,
+ HTTP_PARSE_CONTENT
+};
+
+static const char *http_req_type_to_str(http_req_type_t req) {
+ switch (req) {
+ case HTTP_REQ_GET:
+ return "GET";
+ case HTTP_REQ_POST:
+ return "POST";
+ case HTTP_REQ_CONNECT:
+ return "CONNECT";
+ default:
+ return "unknown";
+ }
+}
+
+typedef struct {
+ enum http_parse_state state;
+ int content_length;
+ int http_code;
+} http_parse_ctx;
+
+#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 }
+static inline void http_parse_ctx_clear(http_parse_ctx *ctx) {
+ ctx->state = HTTP_PARSE_INITIAL;
+ ctx->content_length = -1;
+ ctx->http_code = 0;
+}
+
+#define POLL_TO_MS 100
+
+#define NEED_MORE_DATA 0
+#define PARSE_SUCCESS 1
+#define PARSE_ERROR -1
+#define HTTP_LINE_TERM "\x0D\x0A"
+#define RESP_PROTO "HTTP/1.1 "
+#define HTTP_KEYVAL_SEPARATOR ": "
+#define HTTP_HDR_BUFFER_SIZE 256
+#define PORT_STR_MAX_BYTES 12
+
+static void process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const char *val)
+{
+ // currently we care only about content-length
+ // but in future the way this is written
+ // it can be extended
+ if (!strcmp("content-length", key)) {
+ parse_ctx->content_length = atoi(val);
+ }
+}
+
+static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx)
+{
+ int idx, idx_end;
+ char buf_key[HTTP_HDR_BUFFER_SIZE];
+ char buf_val[HTTP_HDR_BUFFER_SIZE];
+ char *ptr = buf_key;
+ if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx_end)) {
+ error("CRLF expected");
+ return 1;
+ }
+
+ char *separator = rbuf_find_bytes(buf, HTTP_KEYVAL_SEPARATOR, strlen(HTTP_KEYVAL_SEPARATOR), &idx);
+ if (!separator) {
+ error("Missing Key/Value separator");
+ return 1;
+ }
+ if (idx >= HTTP_HDR_BUFFER_SIZE) {
+ error("Key name is too long");
+ return 1;
+ }
+
+ rbuf_pop(buf, buf_key, idx);
+ buf_key[idx] = 0;
+
+ rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR));
+ idx_end -= strlen(HTTP_KEYVAL_SEPARATOR) + idx;
+ if (idx_end >= HTTP_HDR_BUFFER_SIZE) {
+ error("Value of key \"%s\" too long", buf_key);
+ return 1;
+ }
+
+ rbuf_pop(buf, buf_val, idx_end);
+ buf_val[idx_end] = 0;
+
+ for (ptr = buf_key; *ptr; ptr++)
+ *ptr = tolower(*ptr);
+
+ process_http_hdr(parse_ctx, buf_key, buf_val);
+
+ return 0;
+}
+
+static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
+{
+ int idx;
+ char rc[4];
+
+ do {
+ if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx))
+ return NEED_MORE_DATA;
+ switch (parse_ctx->state) {
+ case HTTP_PARSE_INITIAL:
+ if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) {
+ error("Expected response to start with \"%s\"", RESP_PROTO);
+ return PARSE_ERROR;
+ }
+ rbuf_bump_tail(buf, strlen(RESP_PROTO));
+ if (rbuf_pop(buf, rc, 4) != 4) {
+ error("Expected HTTP status code");
+ return PARSE_ERROR;
+ }
+ if (rc[3] != ' ') {
+ error("Expected space after HTTP return code");
+ return PARSE_ERROR;
+ }
+ rc[3] = 0;
+ parse_ctx->http_code = atoi(rc);
+ if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) {
+ error("HTTP code not in range 100 to 599");
+ return PARSE_ERROR;
+ }
+
+ rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
+
+ rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
+
+ parse_ctx->state = HTTP_PARSE_HEADERS;
+ break;
+ case HTTP_PARSE_HEADERS:
+ if (!idx) {
+ parse_ctx->state = HTTP_PARSE_CONTENT;
+ rbuf_bump_tail(buf, strlen(HTTP_LINE_TERM));
+ break;
+ }
+ if (parse_http_hdr(buf, parse_ctx))
+ return PARSE_ERROR;
+ rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
+ rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
+ break;
+ case HTTP_PARSE_CONTENT:
+ // replies like CONNECT etc. do not have content
+ if (parse_ctx->content_length < 0)
+ return PARSE_SUCCESS;
+
+ if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length)
+ return PARSE_SUCCESS;
+ return NEED_MORE_DATA;
+ }
+ } while(1);
+}
+
+typedef struct https_req_ctx {
+ https_req_t *request;
+
+ int sock;
+ rbuf_t buf_rx;
+
+ struct pollfd poll_fd;
+
+ SSL_CTX *ssl_ctx;
+ SSL *ssl;
+
+ size_t written;
+
+ int self_signed_allowed;
+
+ http_parse_ctx parse_ctx;
+
+ time_t req_start_time;
+} https_req_ctx_t;
+
+static int https_req_check_timedout(https_req_ctx_t *ctx) {
+ if (now_realtime_sec() > ctx->req_start_time + ctx->request->timeout_s) {
+ error("request timed out");
+ return 1;
+ }
+ return 0;
+}
+
+static char *_ssl_err_tos(int err)
+{
+ switch(err){
+ case SSL_ERROR_SSL:
+ return "SSL_ERROR_SSL";
+ case SSL_ERROR_WANT_READ:
+ return "SSL_ERROR_WANT_READ";
+ case SSL_ERROR_WANT_WRITE:
+ return "SSL_ERROR_WANT_WRITE";
+ case SSL_ERROR_NONE:
+ return "SSL_ERROR_NONE";
+ case SSL_ERROR_ZERO_RETURN:
+ return "SSL_ERROR_ZERO_RETURN";
+ case SSL_ERROR_WANT_CONNECT:
+ return "SSL_ERROR_WANT_CONNECT";
+ case SSL_ERROR_WANT_ACCEPT:
+ return "SSL_ERROR_WANT_ACCEPT";
+ }
+ return "Unknown!!!";
+}
+
+static int socket_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) {
+ ctx->written = 0;
+ ctx->poll_fd.events = POLLOUT;
+
+ do {
+ int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS);
+ if (ret < 0) {
+ error("poll error");
+ return 1;
+ }
+ if (ret == 0) {
+ if (https_req_check_timedout(ctx)) {
+ error("Poll timed out");
+ return 2;
+ }
+ continue;
+ }
+
+ ret = write(ctx->sock, &data[ctx->written], data_len - ctx->written);
+ if (ret > 0) {
+ ctx->written += ret;
+ } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("Error writing to socket");
+ return 3;
+ }
+ } while (ctx->written < data_len);
+
+ return 0;
+}
+
+static int ssl_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) {
+ ctx->written = 0;
+ ctx->poll_fd.events |= POLLOUT;
+
+ do {
+ int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS);
+ if (ret < 0) {
+ error("poll error");
+ return 1;
+ }
+ if (ret == 0) {
+ if (https_req_check_timedout(ctx)) {
+ error("Poll timed out");
+ return 2;
+ }
+ continue;
+ }
+ ctx->poll_fd.events = 0;
+
+ ret = SSL_write(ctx->ssl, &data[ctx->written], data_len - ctx->written);
+ if (ret > 0) {
+ ctx->written += ret;
+ } else {
+ ret = SSL_get_error(ctx->ssl, ret);
+ switch (ret) {
+ case SSL_ERROR_WANT_READ:
+ ctx->poll_fd.events |= POLLIN;
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ ctx->poll_fd.events |= POLLOUT;
+ break;
+ default:
+ error("SSL_write Err: %s", _ssl_err_tos(ret));
+ return 3;
+ }
+ }
+ } while (ctx->written < data_len);
+
+ return 0;
+}
+
+static inline int https_client_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) {
+ if (ctx->ssl_ctx)
+ return ssl_write_all(ctx, data, data_len);
+ return socket_write_all(ctx, data, data_len);
+}
+
+static int read_parse_response(https_req_ctx_t *ctx) {
+ int ret;
+ char *ptr;
+ size_t size;
+
+ ctx->poll_fd.events = POLLIN;
+ do {
+ ret = poll(&ctx->poll_fd, 1, POLL_TO_MS);
+ if (ret < 0) {
+ error("poll error");
+ return 1;
+ }
+ if (ret == 0) {
+ if (https_req_check_timedout(ctx)) {
+ error("Poll timed out");
+ return 2;
+ }
+ if (!ctx->ssl_ctx)
+ continue;
+ }
+ ctx->poll_fd.events = 0;
+
+ ptr = rbuf_get_linear_insert_range(ctx->buf_rx, &size);
+
+ if (ctx->ssl_ctx)
+ ret = SSL_read(ctx->ssl, ptr, size);
+ else
+ ret = read(ctx->sock, ptr, size);
+
+ if (ret > 0) {
+ rbuf_bump_head(ctx->buf_rx, ret);
+ } else {
+ if (ctx->ssl_ctx) {
+ ret = SSL_get_error(ctx->ssl, ret);
+ switch (ret) {
+ case SSL_ERROR_WANT_READ:
+ ctx->poll_fd.events |= POLLIN;
+ break;
+ case SSL_ERROR_WANT_WRITE:
+ ctx->poll_fd.events |= POLLOUT;
+ break;
+ default:
+ error("SSL_read Err: %s", _ssl_err_tos(ret));
+ return 3;
+ }
+ } else {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("write error");
+ return 3;
+ }
+ ctx->poll_fd.events |= POLLIN;
+ }
+ }
+ } while (!(ret = parse_http_response(ctx->buf_rx, &ctx->parse_ctx)));
+
+ if (ret != PARSE_SUCCESS) {
+ error("Error parsing HTTP response");
+ return 1;
+ }
+
+ return 0;
+}
+
+#define TX_BUFFER_SIZE 8192
+#define RX_BUFFER_SIZE (TX_BUFFER_SIZE*2)
+static int handle_http_request(https_req_ctx_t *ctx) {
+ BUFFER *hdr = buffer_create(TX_BUFFER_SIZE);
+ int rc = 0;
+
+ http_parse_ctx_clear(&ctx->parse_ctx);
+
+ // Prepare data to send
+ switch (ctx->request->request_type) {
+ case HTTP_REQ_CONNECT:
+ buffer_strcat(hdr, "CONNECT ");
+ break;
+ case HTTP_REQ_GET:
+ buffer_strcat(hdr, "GET ");
+ break;
+ case HTTP_REQ_POST:
+ buffer_strcat(hdr, "POST ");
+ break;
+ default:
+ error("Unknown HTTPS request type!");
+ rc = 1;
+ goto err_exit;
+ }
+
+ if (ctx->request->request_type == HTTP_REQ_CONNECT) {
+ buffer_strcat(hdr, ctx->request->host);
+ buffer_sprintf(hdr, ":%d", ctx->request->port);
+ } else {
+ buffer_strcat(hdr, ctx->request->url);
+ }
+
+ buffer_strcat(hdr, " HTTP/1.1\x0D\x0A");
+
+ //TODO Headers!
+ if (ctx->request->request_type != HTTP_REQ_CONNECT) {
+ buffer_sprintf(hdr, "Host: %s\x0D\x0A", ctx->request->host);
+ }
+ buffer_strcat(hdr, "User-Agent: Netdata/rocks newhttpclient\x0D\x0A");
+
+ if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
+ buffer_sprintf(hdr, "Content-Length: %zu\x0D\x0A", ctx->request->payload_size);
+ }
+
+ buffer_strcat(hdr, "\x0D\x0A");
+
+ // Send the request
+ if (https_client_write_all(ctx, hdr->buffer, hdr->len)) {
+ error("Couldn't write HTTP request header into SSL connection");
+ rc = 2;
+ goto err_exit;
+ }
+
+ if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
+ if (https_client_write_all(ctx, ctx->request->payload, ctx->request->payload_size)) {
+ error("Couldn't write payload into SSL connection");
+ rc = 3;
+ goto err_exit;
+ }
+ }
+
+ // Read The Response
+ if (read_parse_response(ctx)) {
+ error("Error reading or parsing response from server");
+ rc = 4;
+ goto err_exit;
+ }
+
+err_exit:
+ buffer_free(hdr);
+ return rc;
+}
+
+static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
+{
+ X509 *err_cert;
+ int err, depth;
+ char *err_str;
+
+ if (!preverify_ok) {
+ err = X509_STORE_CTX_get_error(ctx);
+ depth = X509_STORE_CTX_get_error_depth(ctx);
+ err_cert = X509_STORE_CTX_get_current_cert(ctx);
+ err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0);
+
+ error("Cert Chain verify error:num=%d:%s:depth=%d:%s", err,
+ X509_verify_cert_error_string(err), depth, err_str);
+
+ free(err_str);
+ }
+
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+ if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT)
+ {
+ preverify_ok = 1;
+ error("Self Signed Certificate Accepted as the agent was built with ACLK_SSL_ALLOW_SELF_SIGNED");
+ }
+#endif
+
+ return preverify_ok;
+}
+
+int https_request(https_req_t *request, https_req_response_t *response) {
+ int rc = 1, ret;
+ char connect_port_str[PORT_STR_MAX_BYTES];
+
+ const char *connect_host = request->proxy_host ? request->proxy_host : request->host;
+ int connect_port = request->proxy_host ? request->proxy_port : request->port;
+ struct timeval timeout = { .tv_sec = request->timeout_s, .tv_usec = 0 };
+
+ https_req_ctx_t *ctx = callocz(1, sizeof(https_req_ctx_t));
+ ctx->req_start_time = now_realtime_sec();
+
+ ctx->buf_rx = rbuf_create(RX_BUFFER_SIZE);
+ if (!ctx->buf_rx) {
+ error("Couldn't allocate buffer for RX data");
+ goto exit_req_ctx;
+ }
+
+ snprintfz(connect_port_str, PORT_STR_MAX_BYTES, "%d", connect_port);
+
+ ctx->sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, connect_host, 0, connect_port_str, &timeout);
+ if (ctx->sock < 0) {
+ error("Error connecting TCP socket to \"%s\"", connect_host);
+ goto exit_buf_rx;
+ }
+
+ if (fcntl(ctx->sock, F_SETFL, fcntl(ctx->sock, F_GETFL, 0) | O_NONBLOCK) == -1) {
+ error("Error setting O_NONBLOCK to TCP socket.");
+ goto exit_sock;
+ }
+
+ ctx->poll_fd.fd = ctx->sock;
+
+ // Do the CONNECT if proxy is used
+ if (request->proxy_host) {
+ https_req_t req = HTTPS_REQ_T_INITIALIZER;
+ req.request_type = HTTP_REQ_CONNECT;
+ req.timeout_s = request->timeout_s;
+ req.host = request->host;
+ req.port = request->port;
+ req.url = request->url;
+ ctx->request = &req;
+ if (handle_http_request(ctx)) {
+ error("Failed to CONNECT with proxy");
+ goto exit_sock;
+ }
+ if (ctx->parse_ctx.http_code != 200) {
+ error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code);
+ goto exit_sock;
+ }
+ info("Proxy accepted CONNECT upgrade");
+ }
+ ctx->request = request;
+
+ ctx->ssl_ctx = security_initialize_openssl_client();
+ if (ctx->ssl_ctx==NULL) {
+ error("Cannot allocate SSL context");
+ goto exit_sock;
+ }
+
+ if (!SSL_CTX_set_default_verify_paths(ctx->ssl_ctx)) {
+ error("Error setting default verify paths");
+ goto exit_CTX;
+ }
+ SSL_CTX_set_verify(ctx->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback);
+
+ ctx->ssl = SSL_new(ctx->ssl_ctx);
+ if (ctx->ssl==NULL) {
+ error("Cannot allocate SSL");
+ goto exit_CTX;
+ }
+
+ SSL_set_fd(ctx->ssl, ctx->sock);
+ ret = SSL_connect(ctx->ssl);
+ if (ret != -1 && ret != 1) {
+ error("SSL could not connect");
+ goto exit_SSL;
+ }
+ if (ret == -1) {
+ // expected as underlying socket is non blocking!
+ // consult SSL_connect documentation for details
+ int ec = SSL_get_error(ctx->ssl, ret);
+ if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) {
+ error("Failed to start SSL connection");
+ goto exit_SSL;
+ }
+ }
+
+ // The actual request here
+ if (handle_http_request(ctx)) {
+ error("Couldn't process request");
+ goto exit_SSL;
+ }
+ response->http_code = ctx->parse_ctx.http_code;
+ if (ctx->parse_ctx.content_length > 0) {
+ response->payload_size = ctx->parse_ctx.content_length;
+ response->payload = mallocz(response->payload_size + 1);
+ ret = rbuf_pop(ctx->buf_rx, response->payload, response->payload_size);
+ if (ret != (int)response->payload_size) {
+ error("Payload size doesn't match remaining data on the buffer!");
+ response->payload_size = ret;
+ }
+ // normally we take payload as it is and copy it
+ // but for convenience in cases where payload is sth. like
+ // json we add terminating zero so that user of the data
+ // doesn't have to convert to C string (0 terminated)
+ // other uses still have correct payload_size and can copy
+ // only exact data without affixed 0x00
+ ((char*)response->payload)[response->payload_size] = 0; // mallocz(response->payload_size + 1);
+ }
+ info("HTTPS \"%s\" request to \"%s\" finished with HTTP code: %d", http_req_type_to_str(ctx->request->request_type), ctx->request->host, response->http_code);
+
+ rc = 0;
+
+exit_SSL:
+ SSL_free(ctx->ssl);
+exit_CTX:
+ SSL_CTX_free(ctx->ssl_ctx);
+exit_sock:
+ close(ctx->sock);
+exit_buf_rx:
+ rbuf_free(ctx->buf_rx);
+exit_req_ctx:
+ freez(ctx);
+ return rc;
+}
+
+void https_req_response_free(https_req_response_t *res) {
+ freez(res->payload);
+}
+
+void https_req_response_init(https_req_response_t *res) {
+ res->http_code = 0;
+ res->payload = NULL;
+ res->payload_size = 0;
+}
+
+static inline char *UNUSED_FUNCTION(min_non_null)(char *a, char *b) {
+ if (!a)
+ return b;
+ if (!b)
+ return a;
+ return (a < b ? a : b);
+}
+
+#define URI_PROTO_SEPARATOR "://"
+#define URL_PARSER_LOG_PREFIX "url_parser "
+
+static int parse_host_port(url_t *url) {
+ char *ptr = strrchr(url->host, ':');
+ if (ptr) {
+ size_t port_len = strlen(ptr + 1);
+ if (!port_len) {
+ error(URL_PARSER_LOG_PREFIX ": specified but no port number");
+ return 1;
+ }
+ if (port_len > 5 /* MAX port length is 5digit long in decimal */) {
+ error(URL_PARSER_LOG_PREFIX "port # is too long");
+ return 1;
+ }
+ *ptr = 0;
+ if (!strlen(url->host)) {
+ error(URL_PARSER_LOG_PREFIX "host empty after removing port");
+ return 1;
+ }
+ url->port = atoi (ptr + 1);
+ }
+ return 0;
+}
+
+static inline void port_by_proto(url_t *url) {
+ if (url->port)
+ return;
+ if (!url->proto)
+ return;
+ if (!strcmp(url->proto, "http")) {
+ url->port = 80;
+ return;
+ }
+ if (!strcmp(url->proto, "https")) {
+ url->port = 443;
+ return;
+ }
+}
+
+#define STRDUPZ_2PTR(dest, start, end) \
+ { \
+ dest = mallocz(1 + end - start); \
+ memcpy(dest, start, end - start); \
+ dest[end - start] = 0; \
+ }
+
+int url_parse(const char *url, url_t *parsed) {
+ const char *start = url;
+ const char *end = strstr(url, URI_PROTO_SEPARATOR);
+
+ if (end) {
+ if (end == start) {
+ error (URL_PARSER_LOG_PREFIX "found " URI_PROTO_SEPARATOR " without protocol specified");
+ return 1;
+ }
+
+ STRDUPZ_2PTR(parsed->proto, start, end)
+ start = end + strlen(URI_PROTO_SEPARATOR);
+ }
+
+ end = strchr(start, '/');
+ if (!end)
+ end = start + strlen(start);
+
+ if (start == end) {
+ error(URL_PARSER_LOG_PREFIX "Host empty");
+ return 1;
+ }
+
+ STRDUPZ_2PTR(parsed->host, start, end);
+
+ if (parse_host_port(parsed))
+ return 1;
+
+ if (!*end) {
+ parsed->path = strdupz("/");
+ port_by_proto(parsed);
+ return 0;
+ }
+
+ parsed->path = strdupz(end);
+ port_by_proto(parsed);
+ return 0;
+}
+
+void url_t_destroy(url_t *url) {
+ freez(url->host);
+ freez(url->path);
+ freez(url->proto);
+}
diff --git a/aclk/https_client.h b/aclk/https_client.h
new file mode 100644
index 0000000..f7bc3d4
--- /dev/null
+++ b/aclk/https_client.h
@@ -0,0 +1,78 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_HTTPS_CLIENT_H
+#define NETDATA_HTTPS_CLIENT_H
+
+#include "libnetdata/libnetdata.h"
+
+typedef enum http_req_type {
+ HTTP_REQ_GET = 0,
+ HTTP_REQ_POST,
+ HTTP_REQ_CONNECT
+} http_req_type_t;
+
+typedef struct {
+ http_req_type_t request_type;
+
+ char *host;
+ int port;
+ char *url;
+
+ time_t timeout_s; //timeout in seconds for the network operation (send/recv)
+
+ void *payload;
+ size_t payload_size;
+
+ char *proxy_host;
+ int proxy_port;
+} https_req_t;
+
+typedef struct {
+ int http_code;
+
+ void *payload;
+ size_t payload_size;
+} https_req_response_t;
+
+
+// Non feature complete URL parser
+// feel free to extend when needed
+// currently implements only what ACLK
+// needs
+// proto://host[:port]/path
+typedef struct {
+ char *proto;
+ char *host;
+ int port;
+ char* path;
+} url_t;
+
+int url_parse(const char *url, url_t *parsed);
+void url_t_destroy(url_t *url);
+
+void https_req_response_free(https_req_response_t *res);
+void https_req_response_init(https_req_response_t *res);
+
+#define HTTPS_REQ_RESPONSE_T_INITIALIZER \
+ { \
+ .http_code = 0, \
+ .payload = NULL, \
+ .payload_size = 0 \
+ }
+
+#define HTTPS_REQ_T_INITIALIZER \
+ { \
+ .request_type = HTTP_REQ_GET, \
+ .host = NULL, \
+ .port = 443, \
+ .url = NULL, \
+ .timeout_s = 30, \
+ .payload = NULL, \
+ .payload_size = 0, \
+ .proxy_host = NULL, \
+ .proxy_port = 8080 \
+ }
+
+int https_request(https_req_t *request, https_req_response_t *response);
+
+#endif /* NETDATA_HTTPS_CLIENT_H */
diff --git a/aclk/schema-wrappers/alarm_config.cc b/aclk/schema-wrappers/alarm_config.cc
new file mode 100644
index 0000000..56d7e6f
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_config.cc
@@ -0,0 +1,147 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "alarm_config.h"
+
+#include "proto/alarm/v1/config.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace alarms::v1;
+
+void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg)
+{
+ freez(cfg->alarm);
+ freez(cfg->tmpl);
+ freez(cfg->on_chart);
+
+ freez(cfg->classification);
+ freez(cfg->type);
+ freez(cfg->component);
+
+ freez(cfg->os);
+ freez(cfg->hosts);
+ freez(cfg->plugin);
+ freez(cfg->module);
+ freez(cfg->charts);
+ freez(cfg->families);
+ freez(cfg->lookup);
+ freez(cfg->every);
+ freez(cfg->units);
+
+ freez(cfg->green);
+ freez(cfg->red);
+
+ freez(cfg->calculation_expr);
+ freez(cfg->warning_expr);
+ freez(cfg->critical_expr);
+
+ freez(cfg->recipient);
+ freez(cfg->exec);
+ freez(cfg->delay);
+ freez(cfg->repeat);
+ freez(cfg->info);
+ freez(cfg->options);
+ freez(cfg->host_labels);
+
+ freez(cfg->p_db_lookup_dimensions);
+ freez(cfg->p_db_lookup_method);
+ freez(cfg->p_db_lookup_options);
+}
+
+char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data)
+{
+ ProvideAlarmConfiguration msg;
+ AlarmConfiguration *cfg = msg.mutable_config();
+
+ msg.set_config_hash(data->cfg_hash);
+
+ if (data->cfg.alarm)
+ cfg->set_alarm(data->cfg.alarm);
+ if (data->cfg.tmpl)
+ cfg->set_template_(data->cfg.tmpl);
+ if(data->cfg.on_chart)
+ cfg->set_on_chart(data->cfg.on_chart);
+
+ if (data->cfg.classification)
+ cfg->set_classification(data->cfg.classification);
+ if (data->cfg.type)
+ cfg->set_type(data->cfg.type);
+ if (data->cfg.component)
+ cfg->set_component(data->cfg.component);
+
+ if (data->cfg.os)
+ cfg->set_os(data->cfg.os);
+ if (data->cfg.hosts)
+ cfg->set_hosts(data->cfg.hosts);
+ if (data->cfg.plugin)
+ cfg->set_plugin(data->cfg.plugin);
+ if(data->cfg.module)
+ cfg->set_module(data->cfg.module);
+ if(data->cfg.charts)
+ cfg->set_charts(data->cfg.charts);
+ if(data->cfg.families)
+ cfg->set_families(data->cfg.families);
+ if(data->cfg.lookup)
+ cfg->set_lookup(data->cfg.lookup);
+ if(data->cfg.every)
+ cfg->set_every(data->cfg.every);
+ if(data->cfg.units)
+ cfg->set_units(data->cfg.units);
+
+ if (data->cfg.green)
+ cfg->set_green(data->cfg.green);
+ if (data->cfg.red)
+ cfg->set_red(data->cfg.red);
+
+ if (data->cfg.calculation_expr)
+ cfg->set_calculation_expr(data->cfg.calculation_expr);
+ if (data->cfg.warning_expr)
+ cfg->set_warning_expr(data->cfg.warning_expr);
+ if (data->cfg.critical_expr)
+ cfg->set_critical_expr(data->cfg.critical_expr);
+
+ if (data->cfg.recipient)
+ cfg->set_recipient(data->cfg.recipient);
+ if (data->cfg.exec)
+ cfg->set_exec(data->cfg.exec);
+ if (data->cfg.delay)
+ cfg->set_delay(data->cfg.delay);
+ if (data->cfg.repeat)
+ cfg->set_repeat(data->cfg.repeat);
+ if (data->cfg.info)
+ cfg->set_info(data->cfg.info);
+ if (data->cfg.options)
+ cfg->set_options(data->cfg.options);
+ if (data->cfg.host_labels)
+ cfg->set_host_labels(data->cfg.host_labels);
+
+ cfg->set_p_db_lookup_after(data->cfg.p_db_lookup_after);
+ cfg->set_p_db_lookup_before(data->cfg.p_db_lookup_before);
+ if (data->cfg.p_db_lookup_dimensions)
+ cfg->set_p_db_lookup_dimensions(data->cfg.p_db_lookup_dimensions);
+ if (data->cfg.p_db_lookup_method)
+ cfg->set_p_db_lookup_method(data->cfg.p_db_lookup_method);
+ if (data->cfg.p_db_lookup_options)
+ cfg->set_p_db_lookup_options(data->cfg.p_db_lookup_options);
+ cfg->set_p_update_every(data->cfg.p_update_every);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (!msg.SerializeToArray(bin, *len))
+ return NULL;
+
+ return bin;
+}
+
+char *parse_send_alarm_configuration(const char *data, size_t len)
+{
+ SendAlarmConfiguration msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+ if (!msg.config_hash().c_str())
+ return NULL;
+ return strdupz(msg.config_hash().c_str());
+}
+
diff --git a/aclk/schema-wrappers/alarm_config.h b/aclk/schema-wrappers/alarm_config.h
new file mode 100644
index 0000000..157fbc6
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_config.h
@@ -0,0 +1,69 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H
+#define ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H
+
+#include <stdlib.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aclk_alarm_configuration {
+ char *alarm;
+ char *tmpl;
+ char *on_chart;
+
+ char *classification;
+ char *type;
+ char *component;
+
+ char *os;
+ char *hosts;
+ char *plugin;
+ char *module;
+ char *charts;
+ char *families;
+ char *lookup;
+ char *every;
+ char *units;
+
+ char *green;
+ char *red;
+
+ char *calculation_expr;
+ char *warning_expr;
+ char *critical_expr;
+
+ char *recipient;
+ char *exec;
+ char *delay;
+ char *repeat;
+ char *info;
+ char *options;
+ char *host_labels;
+
+ int32_t p_db_lookup_after;
+ int32_t p_db_lookup_before;
+ char *p_db_lookup_dimensions;
+ char *p_db_lookup_method;
+ char *p_db_lookup_options;
+ int32_t p_update_every;
+};
+
+void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg);
+
+struct provide_alarm_configuration {
+ char *cfg_hash;
+ struct aclk_alarm_configuration cfg;
+};
+
+char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data);
+char *parse_send_alarm_configuration(const char *data, size_t len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H */
diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc
new file mode 100644
index 0000000..f643933
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_stream.cc
@@ -0,0 +1,253 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "alarm_stream.h"
+
+#include "proto/alarm/v1/stream.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace alarms::v1;
+
+struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len)
+{
+ struct start_alarm_streaming ret;
+ memset(&ret, 0, sizeof(ret));
+
+ StartAlarmStreaming msg;
+
+ if (!msg.ParseFromArray(data, len))
+ return ret;
+
+ ret.node_id = strdupz(msg.node_id().c_str());
+ ret.batch_id = msg.batch_id();
+ ret.start_seq_id = msg.start_sequnce_id();
+
+ return ret;
+}
+
+char *parse_send_alarm_log_health(const char *data, size_t len)
+{
+ SendAlarmLogHealth msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+ return strdupz(msg.node_id().c_str());
+}
+
+char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data)
+{
+ AlarmLogHealth msg;
+ LogEntries *entries;
+
+ msg.set_claim_id(data->claim_id);
+ msg.set_node_id(data->node_id);
+ msg.set_enabled(data->enabled);
+
+ switch (data->status) {
+ case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE:
+ msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE);
+ break;
+ case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING:
+ msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING);
+ break;
+ case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED:
+ msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED);
+ break;
+ default:
+ error("Unknown status of AlarmLogHealth LogEntry");
+ return NULL;
+ }
+
+ entries = msg.mutable_log_entries();
+ entries->set_first_sequence_id(data->log_entries.first_seq_id);
+ entries->set_last_sequence_id(data->log_entries.last_seq_id);
+
+ set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when());
+ set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when());
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (!msg.SerializeToArray(bin, *len))
+ return NULL;
+
+ return bin;
+}
+
+static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status)
+{
+ switch (status) {
+ case aclk_alarm_status::ALARM_STATUS_NULL:
+ return alarms::v1::ALARM_STATUS_NULL;
+ case aclk_alarm_status::ALARM_STATUS_UNKNOWN:
+ return alarms::v1::ALARM_STATUS_UNKNOWN;
+ case aclk_alarm_status::ALARM_STATUS_REMOVED:
+ return alarms::v1::ALARM_STATUS_REMOVED;
+ case aclk_alarm_status::ALARM_STATUS_NOT_A_NUMBER:
+ return alarms::v1::ALARM_STATUS_NOT_A_NUMBER;
+ case aclk_alarm_status::ALARM_STATUS_CLEAR:
+ return alarms::v1::ALARM_STATUS_CLEAR;
+ case aclk_alarm_status::ALARM_STATUS_WARNING:
+ return alarms::v1::ALARM_STATUS_WARNING;
+ case aclk_alarm_status::ALARM_STATUS_CRITICAL:
+ return alarms::v1::ALARM_STATUS_CRITICAL;
+ default:
+ error("Unknown alarm status");
+ return alarms::v1::ALARM_STATUS_UNKNOWN;
+ }
+}
+
+void destroy_alarm_log_entry(struct alarm_log_entry *entry)
+{
+ //freez(entry->node_id);
+ //freez(entry->claim_id);
+
+ freez(entry->chart);
+ freez(entry->name);
+ freez(entry->family);
+
+ freez(entry->config_hash);
+
+ freez(entry->timezone);
+
+ freez(entry->exec_path);
+ freez(entry->conf_source);
+ freez(entry->command);
+
+ freez(entry->value_string);
+ freez(entry->old_value_string);
+
+ freez(entry->rendered_info);
+ freez(entry->chart_context);
+}
+
+static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto)
+{
+ proto->set_node_id(data->node_id);
+ proto->set_claim_id(data->claim_id);
+
+ proto->set_chart(data->chart);
+ proto->set_name(data->name);
+ if (data->family)
+ proto->set_family(data->family);
+
+ proto->set_batch_id(data->batch_id);
+ proto->set_sequence_id(data->sequence_id);
+ proto->set_when(data->when);
+
+ proto->set_config_hash(data->config_hash);
+
+ proto->set_utc_offset(data->utc_offset);
+ proto->set_timezone(data->timezone);
+
+ proto->set_exec_path(data->exec_path);
+ proto->set_conf_source(data->conf_source);
+ proto->set_command(data->command);
+
+ proto->set_duration(data->duration);
+ proto->set_non_clear_duration(data->non_clear_duration);
+
+
+ proto->set_status(aclk_alarm_status_to_proto(data->status));
+ proto->set_old_status(aclk_alarm_status_to_proto(data->old_status));
+ proto->set_delay(data->delay);
+ proto->set_delay_up_to_timestamp(data->delay_up_to_timestamp);
+
+ proto->set_last_repeat(data->last_repeat);
+ proto->set_silenced(data->silenced);
+
+ if (data->value_string)
+ proto->set_value_string(data->value_string);
+ if (data->old_value_string)
+ proto->set_old_value_string(data->old_value_string);
+
+ proto->set_value(data->value);
+ proto->set_old_value(data->old_value);
+
+ proto->set_updated(data->updated);
+
+ proto->set_rendered_info(data->rendered_info);
+
+ proto->set_chart_context(data->chart_context);
+}
+
+char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
+{
+ AlarmLogEntry le;
+
+ fill_alarm_log_entry(data, &le);
+
+ *len = PROTO_COMPAT_MSG_SIZE(le);
+ char *bin = (char*)mallocz(*len);
+ if (!le.SerializeToArray(bin, *len)) {
+ freez(bin);
+ return NULL;
+ }
+
+ return bin;
+}
+
+struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len)
+{
+ SendAlarmSnapshot msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ struct send_alarm_snapshot *ret = (struct send_alarm_snapshot*)callocz(1, sizeof(struct send_alarm_snapshot));
+ if (msg.claim_id().c_str())
+ ret->claim_id = strdupz(msg.claim_id().c_str());
+ if (msg.node_id().c_str())
+ ret->node_id = strdupz(msg.node_id().c_str());
+ ret->snapshot_id = msg.snapshot_id();
+ ret->sequence_id = msg.sequence_id();
+
+ return ret;
+}
+
+void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr)
+{
+ freez(ptr->claim_id);
+ freez(ptr->node_id);
+ freez(ptr);
+}
+
+alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data)
+{
+ AlarmSnapshot *msg = new AlarmSnapshot;
+ if (unlikely(!msg)) fatal("Cannot allocate memory for AlarmSnapshot");
+
+ msg->set_node_id(data->node_id);
+ msg->set_claim_id(data->claim_id);
+ msg->set_snapshot_id(data->snapshot_id);
+ msg->set_chunks(data->chunks);
+ msg->set_chunk(data->chunk);
+
+ // this is handled automatically by add_alarm_log_entry2snapshot function
+ msg->set_chunk_size(0);
+
+ return msg;
+}
+
+void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data)
+{
+ AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot;
+ AlarmLogEntry *alarm_log_entry = alarm_snapshot->add_alarms();
+
+ fill_alarm_log_entry(data, alarm_log_entry);
+
+ alarm_snapshot->set_chunk_size(alarm_snapshot->chunk_size() + 1);
+}
+
+char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot)
+{
+ AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(alarm_snapshot);
+ char *bin = (char*)mallocz(*len);
+ if (!alarm_snapshot->SerializeToArray(bin, *len)) {
+ delete alarm_snapshot;
+ return NULL;
+ }
+
+ delete alarm_snapshot;
+ return bin;
+}
diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h
new file mode 100644
index 0000000..63911da
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_stream.h
@@ -0,0 +1,136 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H
+
+#include <stdlib.h>
+
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum alarm_log_status_aclk {
+ ALARM_LOG_STATUS_UNSPECIFIED = 0,
+ ALARM_LOG_STATUS_RUNNING = 1,
+ ALARM_LOG_STATUS_IDLE = 2
+};
+
+struct alarm_log_entries {
+ int64_t first_seq_id;
+ struct timeval first_when;
+
+ int64_t last_seq_id;
+ struct timeval last_when;
+};
+
+struct alarm_log_health {
+ char *claim_id;
+ char *node_id;
+ int enabled;
+ enum alarm_log_status_aclk status;
+ struct alarm_log_entries log_entries;
+};
+
+struct start_alarm_streaming {
+ char *node_id;
+ uint64_t batch_id;
+ uint64_t start_seq_id;
+};
+
+struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len);
+char *parse_send_alarm_log_health(const char *data, size_t len);
+
+char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data);
+
+enum aclk_alarm_status {
+ ALARM_STATUS_NULL = 0,
+ ALARM_STATUS_UNKNOWN = 1,
+ ALARM_STATUS_REMOVED = 2,
+ ALARM_STATUS_NOT_A_NUMBER = 3,
+ ALARM_STATUS_CLEAR = 4,
+ ALARM_STATUS_WARNING = 5,
+ ALARM_STATUS_CRITICAL = 6
+};
+
+struct alarm_log_entry {
+ char *node_id;
+ char *claim_id;
+
+ char *chart;
+ char *name;
+ char *family;
+
+ uint64_t batch_id;
+ uint64_t sequence_id;
+ uint64_t when;
+
+ char *config_hash;
+
+ int32_t utc_offset;
+ char *timezone;
+
+ char *exec_path;
+ char *conf_source;
+ char *command;
+
+ uint32_t duration;
+ uint32_t non_clear_duration;
+
+ enum aclk_alarm_status status;
+ enum aclk_alarm_status old_status;
+ uint64_t delay;
+ uint64_t delay_up_to_timestamp;
+
+ uint64_t last_repeat;
+ int silenced;
+
+ char *value_string;
+ char *old_value_string;
+
+ double value;
+ double old_value;
+
+ // updated alarm entry, when the status of the alarm has been updated by a later entry
+ int updated;
+
+ // rendered_info
+ char *rendered_info;
+
+ char *chart_context;
+};
+
+struct send_alarm_snapshot {
+ char *node_id;
+ char *claim_id;
+ uint64_t snapshot_id;
+ uint64_t sequence_id;
+};
+
+struct alarm_snapshot {
+ char *node_id;
+ char *claim_id;
+ uint64_t snapshot_id;
+ uint32_t chunks;
+ uint32_t chunk;
+};
+
+typedef void* alarm_snapshot_proto_ptr_t;
+
+void destroy_alarm_log_entry(struct alarm_log_entry *entry);
+
+char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data);
+
+struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len);
+void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr);
+
+alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data);
+void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data);
+char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H */
diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc
new file mode 100644
index 0000000..af45740
--- /dev/null
+++ b/aclk/schema-wrappers/capability.cc
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/aclk/v1/lib.pb.h"
+
+#include "capability.h"
+
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) {
+ proto_capa->set_name(c_capa->name);
+ proto_capa->set_enabled(c_capa->enabled);
+ proto_capa->set_version(c_capa->version);
+}
diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h
new file mode 100644
index 0000000..c6085a4
--- /dev/null
+++ b/aclk/schema-wrappers/capability.h
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_CAPABILITY_H
+#define ACLK_SCHEMA_CAPABILITY_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct capability {
+ const char *name;
+ uint32_t version;
+ int enabled;
+};
+
+#ifdef __cplusplus
+}
+
+#include "proto/aclk/v1/lib.pb.h"
+
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa);
+#endif
+
+#endif /* ACLK_SCHEMA_CAPABILITY_H */
diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc
new file mode 100644
index 0000000..20b40ec
--- /dev/null
+++ b/aclk/schema-wrappers/connection.cc
@@ -0,0 +1,72 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/agent/v1/connection.pb.h"
+#include "proto/agent/v1/disconnect.pb.h"
+#include "connection.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+using namespace agent::v1;
+
+char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data)
+{
+ UpdateAgentConnection connupd;
+
+ connupd.set_claim_id(data->claim_id);
+ connupd.set_reachable(data->reachable);
+ connupd.set_session_id(data->session_id);
+
+ connupd.set_update_source((data->lwt) ? CONNECTION_UPDATE_SOURCE_LWT : CONNECTION_UPDATE_SOURCE_AGENT);
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ google::protobuf::Timestamp *timestamp = connupd.mutable_updated_at();
+ timestamp->set_seconds(tv.tv_sec);
+ timestamp->set_nanos(tv.tv_usec * 1000);
+
+ if (data->capabilities) {
+ const struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(connupd);
+ char *msg = (char*)mallocz(*len);
+ if (msg)
+ connupd.SerializeToArray(msg, *len);
+
+ return msg;
+}
+
+struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) {
+ DisconnectReq req;
+ struct disconnect_cmd *res;
+
+ if (!req.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct disconnect_cmd *)callocz(1, sizeof(struct disconnect_cmd));
+
+ if (!res)
+ return NULL;
+
+ res->reconnect_after_s = req.reconnect_after_seconds();
+ res->permaban = req.permaban();
+ res->error_code = req.error_code();
+ if (req.error_description().c_str()) {
+ res->error_description = strdupz(req.error_description().c_str());
+ if (!res->error_description) {
+ freez(res);
+ return NULL;
+ }
+ }
+
+ return res;
+}
diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h
new file mode 100644
index 0000000..0356c7d
--- /dev/null
+++ b/aclk/schema-wrappers/connection.h
@@ -0,0 +1,47 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H
+#define ACLK_SCHEMA_WRAPPER_CONNECTION_H
+
+#include "capability.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char *claim_id;
+ unsigned int reachable:1;
+
+ int64_t session_id;
+
+ unsigned int lwt:1;
+
+ const struct capability *capabilities;
+
+// TODO in future optional fields
+// > 15 optional fields:
+// How long the system was running until connection (only applicable when reachable=true)
+// google.protobuf.Duration system_uptime = 15;
+// How long the netdata agent was running until connection (only applicable when reachable=true)
+// google.protobuf.Duration agent_uptime = 16;
+
+
+} update_agent_connection_t;
+
+char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data);
+
+struct disconnect_cmd {
+ uint64_t reconnect_after_s;
+ int permaban;
+ uint32_t error_code;
+ char *error_description;
+};
+
+struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONNECTION_H */
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc
new file mode 100644
index 0000000..b04c9d2
--- /dev/null
+++ b/aclk/schema-wrappers/context.cc
@@ -0,0 +1,125 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+#include "context.h"
+
+using namespace context::v1;
+
+// ContextsSnapshot
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version)
+{
+ ContextsSnapshot *ctxs_snap = new ContextsSnapshot;
+
+ if (ctxs_snap == NULL)
+ fatal("Cannot allocate ContextsSnapshot object. OOM");
+
+ ctxs_snap->set_claim_id(claim_id);
+ ctxs_snap->set_node_id(node_id);
+ ctxs_snap->set_version(version);
+
+ return ctxs_snap;
+}
+
+void contexts_snapshot_delete(contexts_snapshot_t snapshot)
+{
+ delete (ContextsSnapshot *)snapshot;
+}
+
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version)
+{
+ ((ContextsSnapshot *)ctxs_snapshot)->set_version(version);
+}
+
+static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx)
+{
+ ctx->set_id(c_ctx->id);
+ ctx->set_version(c_ctx->version);
+ ctx->set_first_entry(c_ctx->first_entry);
+ ctx->set_last_entry(c_ctx->last_entry);
+ ctx->set_deleted(c_ctx->deleted);
+ ctx->set_title(c_ctx->title);
+ ctx->set_priority(c_ctx->priority);
+ ctx->set_chart_type(c_ctx->chart_type);
+ ctx->set_units(c_ctx->units);
+ ctx->set_family(c_ctx->family);
+}
+
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ ContextUpdated *ctx = ctxs_snap->add_contexts();
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_snap->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_snap;
+ return NULL;
+ }
+
+ delete ctxs_snap;
+ return bin;
+}
+
+// ContextsUpdated
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at)
+{
+ ContextsUpdated *ctxs_updated = new ContextsUpdated;
+
+ if (ctxs_updated == NULL)
+ fatal("Cannot allocate ContextsUpdated object. OOM");
+
+ ctxs_updated->set_claim_id(claim_id);
+ ctxs_updated->set_node_id(node_id);
+ ctxs_updated->set_version_hash(version_hash);
+ ctxs_updated->set_created_at(created_at);
+
+ return ctxs_updated;
+}
+
+void contexts_updated_delete(contexts_updated_t ctxs_updated)
+{
+ delete (ContextsUpdated *)ctxs_updated;
+}
+
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash)
+{
+ ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash);
+}
+
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ ContextUpdated *ctx = ctxs_update->add_contextupdates();
+
+ if (ctx == NULL)
+ fatal("Cannot allocate ContextUpdated object. OOM");
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_update->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_update;
+ return NULL;
+ }
+
+ delete ctxs_update;
+ return bin;
+}
diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h
new file mode 100644
index 0000000..cbb7701
--- /dev/null
+++ b/aclk/schema-wrappers/context.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_H
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void* contexts_updated_t;
+typedef void* contexts_snapshot_t;
+
+struct context_updated {
+ // context id
+ const char *id;
+
+ uint64_t version;
+
+ uint64_t first_entry;
+ uint64_t last_entry;
+
+ int deleted;
+
+ const char *title;
+ uint64_t priority;
+ const char *chart_type;
+ const char *units;
+ const char *family;
+};
+
+// ContextS Snapshot related
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version);
+void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
+
+// ContextS Updated related
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
+void contexts_updated_delete(contexts_updated_t ctxs_updated);
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */
diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc
new file mode 100644
index 0000000..3bb1956
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.cc
@@ -0,0 +1,42 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/stream.pb.h"
+
+#include "context_stream.h"
+
+#include "libnetdata/libnetdata.h"
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len)
+{
+ context::v1::StopStreamingContexts msg;
+
+ struct stop_streaming_ctxs *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+
+ return res;
+}
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len)
+{
+ context::v1::ContextsCheckpoint msg;
+
+ struct ctxs_checkpoint *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+ res->version_hash = msg.version_hash();
+
+ return res;
+}
diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h
new file mode 100644
index 0000000..8c691d2
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct stop_streaming_ctxs {
+ char *claim_id;
+ char *node_id;
+ // we omit reason as there is only one defined at this point
+ // as soon as there is more than one defined in StopStreaminContextsReason
+ // we should add it
+ // 0 - RATE_LIMIT_EXCEEDED
+};
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len);
+
+struct ctxs_checkpoint {
+ char *claim_id;
+ char *node_id;
+
+ uint64_t version_hash;
+};
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len);
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
new file mode 100644
index 0000000..db1fa64
--- /dev/null
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -0,0 +1,46 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/nodeinstance/connection/v1/connection.pb.h"
+#include "node_connection.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data) {
+ nodeinstance::v1::UpdateNodeInstanceConnection msg;
+
+ if(data->claim_id)
+ msg.set_claim_id(data->claim_id);
+ msg.set_node_id(data->node_id);
+
+ msg.set_liveness(data->live);
+ msg.set_queryable(data->queryable);
+
+ msg.set_session_id(data->session_id);
+ msg.set_hops(data->hops);
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ google::protobuf::Timestamp *timestamp = msg.mutable_updated_at();
+ timestamp->set_seconds(tv.tv_sec);
+ timestamp->set_nanos(tv.tv_usec * 1000);
+
+ if (data->capabilities) {
+ const struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
new file mode 100644
index 0000000..dac0d8f
--- /dev/null
+++ b/aclk/schema-wrappers/node_connection.h
@@ -0,0 +1,32 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+
+#include "capability.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char* claim_id;
+ const char* node_id;
+
+ unsigned int live:1;
+ unsigned int queryable:1;
+
+ int64_t session_id;
+
+ int32_t hops;
+ const struct capability *capabilities;
+} node_instance_connection_t;
+
+char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H */
diff --git a/aclk/schema-wrappers/node_creation.cc b/aclk/schema-wrappers/node_creation.cc
new file mode 100644
index 0000000..5ad25b7
--- /dev/null
+++ b/aclk/schema-wrappers/node_creation.cc
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/nodeinstance/create/v1/creation.pb.h"
+#include "node_creation.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <stdlib.h>
+
+char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data)
+{
+ nodeinstance::create::v1::CreateNodeInstance msg;
+
+ if (data->claim_id)
+ msg.set_claim_id(data->claim_id);
+ msg.set_machine_guid(data->machine_guid);
+ msg.set_hostname(data->hostname);
+ msg.set_hops(data->hops);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len)
+{
+ nodeinstance::create::v1::CreateNodeInstanceResult msg;
+ node_instance_creation_result_t res = { .node_id = NULL, .machine_guid = NULL };
+
+ if (!msg.ParseFromArray(data, len))
+ return res;
+
+ res.node_id = strdupz(msg.node_id().c_str());
+ res.machine_guid = strdupz(msg.machine_guid().c_str());
+ return res;
+}
diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h
new file mode 100644
index 0000000..7a8c7f7
--- /dev/null
+++ b/aclk/schema-wrappers/node_creation.h
@@ -0,0 +1,31 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_CREATION_H
+#define ACLK_SCHEMA_WRAPPER_NODE_CREATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char *claim_id;
+ const char *machine_guid;
+ const char *hostname;
+
+ int32_t hops;
+} node_instance_creation_t;
+
+typedef struct {
+ char *node_id;
+ char *machine_guid;
+} node_instance_creation_result_t;
+
+char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data);
+node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_CREATION_H */
diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc
new file mode 100644
index 0000000..5e321f6
--- /dev/null
+++ b/aclk/schema-wrappers/node_info.cc
@@ -0,0 +1,136 @@
+#include "node_info.h"
+
+#include "proto/nodeinstance/info/v1/info.pb.h"
+
+#include "schema_wrapper_utils.h"
+
+static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data)
+{
+ google::protobuf::Map<std::string, std::string> *map;
+
+ if (data->name)
+ info->set_name(data->name);
+
+ if (data->os)
+ info->set_os(data->os);
+ if (data->os_name)
+ info->set_os_name(data->os_name);
+ if (data->os_version)
+ info->set_os_version(data->os_version);
+
+ if (data->kernel_name)
+ info->set_kernel_name(data->kernel_name);
+ if (data->kernel_version)
+ info->set_kernel_version(data->kernel_version);
+
+ if (data->architecture)
+ info->set_architecture(data->architecture);
+
+ info->set_cpus(data->cpus);
+
+ if (data->cpu_frequency)
+ info->set_cpu_frequency(data->cpu_frequency);
+
+ if (data->memory)
+ info->set_memory(data->memory);
+
+ if (data->disk_space)
+ info->set_disk_space(data->disk_space);
+
+ if (data->version)
+ info->set_version(data->version);
+
+ if (data->release_channel)
+ info->set_release_channel(data->release_channel);
+
+ if (data->timezone)
+ info->set_timezone(data->timezone);
+
+ if (data->virtualization_type)
+ info->set_virtualization_type(data->virtualization_type);
+
+ if (data->container_type)
+ info->set_container_type(data->container_type);
+
+ if (data->custom_info)
+ info->set_custom_info(data->custom_info);
+
+ if (data->machine_guid)
+ info->set_machine_guid(data->machine_guid);
+
+ nodeinstance::info::v1::MachineLearningInfo *ml_info = info->mutable_ml_info();
+ ml_info->set_ml_capable(data->ml_info.ml_capable);
+ ml_info->set_ml_enabled(data->ml_info.ml_enabled);
+
+ map = info->mutable_host_labels();
+ rrdlabels_walkthrough_read(data->host_labels_ptr, label_add_to_map_callback, map);
+ return 0;
+}
+
+char *generate_update_node_info_message(size_t *len, struct update_node_info *info)
+{
+ nodeinstance::info::v1::UpdateNodeInfo msg;
+
+ msg.set_node_id(info->node_id);
+ msg.set_claim_id(info->claim_id);
+
+ if (generate_node_info(msg.mutable_data(), &info->data))
+ return NULL;
+
+ set_google_timestamp_from_timeval(info->updated_at, msg.mutable_updated_at());
+ msg.set_machine_guid(info->machine_guid);
+ msg.set_child(info->child);
+
+ nodeinstance::info::v1::MachineLearningInfo *ml_info = msg.mutable_ml_info();
+ ml_info->set_ml_capable(info->ml_info.ml_capable);
+ ml_info->set_ml_enabled(info->ml_info.ml_enabled);
+
+ struct capability *capa;
+ if (info->node_capabilities) {
+ capa = info->node_capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.mutable_node_info()->add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+ if (info->node_instance_capabilities) {
+ capa = info->node_instance_capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.mutable_node_instance_info()->add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *upd_node_collectors)
+{
+ nodeinstance::info::v1::UpdateNodeCollectors msg;
+
+ msg.set_node_id(upd_node_collectors->node_id);
+ msg.set_claim_id(upd_node_collectors->claim_id);
+
+ void *colls;
+ dfe_start_read(upd_node_collectors->node_collectors, colls) {
+ struct collector_info *c =(struct collector_info *)colls;
+ nodeinstance::info::v1::CollectorInfo *col = msg.add_collectors();
+ col->set_plugin(c->plugin);
+ col->set_module(c->module);
+ }
+ dfe_done(colls);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h
new file mode 100644
index 0000000..de4ade7
--- /dev/null
+++ b/aclk/schema-wrappers/node_info.h
@@ -0,0 +1,79 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_INFO_H
+#define ACLK_SCHEMA_WRAPPER_NODE_INFO_H
+
+#include <stdlib.h>
+#include <stdint.h>
+
+#include "capability.h"
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct machine_learning_info {
+ bool ml_capable;
+ bool ml_enabled;
+};
+
+struct aclk_node_info {
+ const char *name;
+
+ const char *os;
+ const char *os_name;
+ const char *os_version;
+ const char *kernel_name;
+ const char *kernel_version;
+ const char *architecture;
+ uint32_t cpus;
+ const char *cpu_frequency;
+ const char *memory;
+ const char *disk_space;
+ const char *version;
+ const char *release_channel;
+ const char *timezone;
+ const char *virtualization_type;
+ const char *container_type;
+ const char *custom_info;
+ const char *machine_guid;
+
+ DICTIONARY *host_labels_ptr;
+ struct machine_learning_info ml_info;
+};
+
+struct update_node_info {
+ char *node_id;
+ char *claim_id;
+ struct aclk_node_info data;
+ struct timeval updated_at;
+ char *machine_guid;
+ int child;
+
+ struct machine_learning_info ml_info;
+
+ struct capability *node_capabilities;
+ struct capability *node_instance_capabilities;
+};
+
+struct collector_info {
+ const char *module;
+ const char *plugin;
+};
+
+struct update_node_collectors {
+ char *claim_id;
+ char *node_id;
+ DICTIONARY *node_collectors;
+};
+
+char *generate_update_node_info_message(size_t *len, struct update_node_info *info);
+
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *collectors);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_INFO_H */
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc
new file mode 100644
index 0000000..8853b2e
--- /dev/null
+++ b/aclk/schema-wrappers/proto_2_json.cc
@@ -0,0 +1,85 @@
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "proto/alarm/v1/config.pb.h"
+#include "proto/alarm/v1/stream.pb.h"
+#include "proto/aclk/v1/lib.pb.h"
+#include "proto/agent/v1/connection.pb.h"
+#include "proto/agent/v1/disconnect.pb.h"
+#include "proto/nodeinstance/connection/v1/connection.pb.h"
+#include "proto/nodeinstance/create/v1/creation.pb.h"
+#include "proto/nodeinstance/info/v1/info.pb.h"
+#include "proto/context/v1/stream.pb.h"
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "proto_2_json.h"
+
+using namespace google::protobuf::util;
+
+static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
+{
+//tx side
+ if (!strcmp(msgname, "UpdateAgentConnection"))
+ return new agent::v1::UpdateAgentConnection;
+ if (!strcmp(msgname, "UpdateNodeInstanceConnection"))
+ return new nodeinstance::v1::UpdateNodeInstanceConnection;
+ if (!strcmp(msgname, "CreateNodeInstance"))
+ return new nodeinstance::create::v1::CreateNodeInstance;
+ if (!strcmp(msgname, "UpdateNodeInfo"))
+ return new nodeinstance::info::v1::UpdateNodeInfo;
+ if (!strcmp(msgname, "AlarmLogHealth"))
+ return new alarms::v1::AlarmLogHealth;
+ if (!strcmp(msgname, "ProvideAlarmConfiguration"))
+ return new alarms::v1::ProvideAlarmConfiguration;
+ if (!strcmp(msgname, "AlarmSnapshot"))
+ return new alarms::v1::AlarmSnapshot;
+ if (!strcmp(msgname, "AlarmLogEntry"))
+ return new alarms::v1::AlarmLogEntry;
+ if (!strcmp(msgname, "UpdateNodeCollectors"))
+ return new nodeinstance::info::v1::UpdateNodeCollectors;
+ if (!strcmp(msgname, "ContextsUpdated"))
+ return new context::v1::ContextsUpdated;
+ if (!strcmp(msgname, "ContextsSnapshot"))
+ return new context::v1::ContextsSnapshot;
+
+//rx side
+ if (!strcmp(msgname, "CreateNodeInstanceResult"))
+ return new nodeinstance::create::v1::CreateNodeInstanceResult;
+ if (!strcmp(msgname, "SendNodeInstances"))
+ return new agent::v1::SendNodeInstances;
+ if (!strcmp(msgname, "StartAlarmStreaming"))
+ return new alarms::v1::StartAlarmStreaming;
+ if (!strcmp(msgname, "SendAlarmLogHealth"))
+ return new alarms::v1::SendAlarmLogHealth;
+ if (!strcmp(msgname, "SendAlarmConfiguration"))
+ return new alarms::v1::SendAlarmConfiguration;
+ if (!strcmp(msgname, "SendAlarmSnapshot"))
+ return new alarms::v1::SendAlarmSnapshot;
+ if (!strcmp(msgname, "DisconnectReq"))
+ return new agent::v1::DisconnectReq;
+ if (!strcmp(msgname, "ContextsCheckpoint"))
+ return new context::v1::ContextsCheckpoint;
+ if (!strcmp(msgname, "StopStreamingContexts"))
+ return new context::v1::StopStreamingContexts;
+
+ return NULL;
+}
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname)
+{
+ google::protobuf::Message *msg = msg_name_to_protomsg(msgname);
+ if (msg == NULL)
+ return strdupz("Don't know this message type by name.");
+
+ if (!msg->ParseFromArray(protobin, len))
+ return strdupz("Can't parse this message. Malformed or wrong parser used.");
+
+ JsonPrintOptions options;
+
+ std::string output;
+ google::protobuf::util::MessageToJsonString(*msg, &output, options);
+ delete msg;
+ return strdupz(output.c_str());
+}
diff --git a/aclk/schema-wrappers/proto_2_json.h b/aclk/schema-wrappers/proto_2_json.h
new file mode 100644
index 0000000..3bd9847
--- /dev/null
+++ b/aclk/schema-wrappers/proto_2_json.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef PROTO_2_JSON_H
+#define PROTO_2_JSON_H
+
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROTO_2_JSON_H */
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/aclk/schema-wrappers/schema_wrapper_utils.cc
new file mode 100644
index 0000000..6573e62
--- /dev/null
+++ b/aclk/schema-wrappers/schema_wrapper_utils.cc
@@ -0,0 +1,22 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "schema_wrapper_utils.h"
+
+void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts)
+{
+ ts->set_nanos(tv.tv_usec*1000);
+ ts->set_seconds(tv.tv_sec);
+}
+
+void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv)
+{
+ tv->tv_sec = ts.seconds();
+ tv->tv_usec = ts.nanos()/1000;
+}
+
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ (void)ls;
+ auto map = (google::protobuf::Map<std::string, std::string> *)data;
+ map->insert({name, value});
+ return 1;
+}
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h
new file mode 100644
index 0000000..2815d0f
--- /dev/null
+++ b/aclk/schema-wrappers/schema_wrapper_utils.h
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef SCHEMA_WRAPPER_UTILS_H
+#define SCHEMA_WRAPPER_UTILS_H
+
+#include "database/rrd.h"
+
+#include <sys/time.h>
+#include <google/protobuf/timestamp.pb.h>
+#include <google/protobuf/map.h>
+
+#if GOOGLE_PROTOBUF_VERSION < 3001000
+#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize();
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) (size_t)msg->ByteSize();
+#else
+#define PROTO_COMPAT_MSG_SIZE(msg) msg.ByteSizeLong();
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) msg->ByteSizeLong();
+#endif
+
+void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts);
+void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv);
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data);
+
+#endif /* SCHEMA_WRAPPER_UTILS_H */
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
new file mode 100644
index 0000000..a96f7ea
--- /dev/null
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+// utility header to include all the message wrappers at once
+
+#ifndef SCHEMA_WRAPPERS_H
+#define SCHEMA_WRAPPERS_H
+
+#include "connection.h"
+#include "node_connection.h"
+#include "node_creation.h"
+#include "alarm_config.h"
+#include "alarm_stream.h"
+#include "node_info.h"
+#include "capability.h"
+#include "context_stream.h"
+#include "context.h"
+
+#endif /* SCHEMA_WRAPPERS_H */