summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/aclk.c26
-rw-r--r--aclk/aclk.h12
-rw-r--r--aclk/aclk_api.c157
-rw-r--r--aclk/aclk_query_queue.c16
-rw-r--r--aclk/aclk_rrdhost_state.h1
-rw-r--r--aclk/aclk_rx_msgs.c468
-rw-r--r--aclk/aclk_rx_msgs.h3
-rw-r--r--aclk/aclk_stats.c18
-rw-r--r--aclk/aclk_stats.h18
-rw-r--r--aclk/aclk_tx_msgs.c17
-rw-r--r--aclk/aclk_util.h1
-rw-r--r--aclk/legacy/Makefile.am19
-rw-r--r--aclk/legacy/aclk_common.c53
-rw-r--r--aclk/legacy/aclk_common.h51
-rw-r--r--aclk/legacy/aclk_lws_https_client.c244
-rw-r--r--aclk/legacy/aclk_lws_https_client.h18
-rw-r--r--aclk/legacy/aclk_lws_wss_client.c622
-rw-r--r--aclk/legacy/aclk_lws_wss_client.h92
-rw-r--r--aclk/legacy/aclk_query.c843
-rw-r--r--aclk/legacy/aclk_query.h41
-rw-r--r--aclk/legacy/aclk_rx_msgs.c388
-rw-r--r--aclk/legacy/aclk_rx_msgs.h13
-rw-r--r--aclk/legacy/aclk_stats.c411
-rw-r--r--aclk/legacy/aclk_stats.h100
-rw-r--r--aclk/legacy/agent_cloud_link.c1502
-rw-r--r--aclk/legacy/agent_cloud_link.h85
-rw-r--r--aclk/legacy/mqtt.c370
-rw-r--r--aclk/legacy/mqtt.h25
-rw-r--r--aclk/legacy/tests/fake-charts.d.plugin24
-rw-r--r--aclk/legacy/tests/install-fake-charts.d.sh.in6
-rwxr-xr-xaclk/legacy/tests/launch-paho.sh4
-rw-r--r--aclk/legacy/tests/paho-inspection.py59
-rw-r--r--aclk/legacy/tests/paho.Dockerfile14
-rw-r--r--aclk/schema-wrappers/node_info.cc8
-rw-r--r--aclk/schema-wrappers/node_info.h9
35 files changed, 356 insertions, 5382 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index a24d258c5..936f431b6 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -121,7 +121,7 @@ static int wait_till_agent_claimed(void)
* agent claimed, cloud url set and private key available
*
* @param aclk_hostname points to location where string pointer to hostname will be set
- * @param ackl_port port to int where port will be saved
+ * @param aclk_port port to int where port will be saved
*
* @return If non 0 returned irrecoverable error happened and ACLK should be terminated
*/
@@ -212,7 +212,7 @@ static void msg_callback_old_protocol(const char *topic, const void *msg, size_t
close(logfd);
#endif
- debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg);
+ debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg);
if (strcmp(cmd_topic, topic))
error("Received message on unexpected topic %s", topic);
@@ -222,7 +222,7 @@ static void msg_callback_old_protocol(const char *topic, const void *msg, size_t
return;
}
- aclk_handle_cloud_message(cmsg);
+ aclk_handle_cloud_cmd_message(cmsg);
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
@@ -303,7 +303,7 @@ static int read_query_thread_count()
void aclk_graceful_disconnect(mqtt_wss_client client);
-/* Keeps connection alive and handles all network comms.
+/* Keeps connection alive and handles all network communications.
* Returns on error or when netdata is shutting down.
* @param client instance of mqtt_wss_client
* @returns 0 - Netdata Exits
@@ -320,7 +320,7 @@ static int handle_connection(mqtt_wss_client client)
return 1;
}
- if (disconnect_req) {
+ if (disconnect_req || aclk_kill_link) {
disconnect_req = 0;
aclk_graceful_disconnect(client);
aclk_queue_unlock();
@@ -763,6 +763,10 @@ void *aclk_main(void *ptr)
return NULL;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ aclk_init_rx_msg_handlers();
+#endif
+
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
// as it must notify the far end that it shutdown gracefully and avoid the LWT.
netdata_thread_disable_cancelability();
@@ -859,7 +863,7 @@ exit:
// fix this in both old and new ACLK
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
-void ng_aclk_alarm_reload(void)
+void aclk_alarm_reload(void)
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
@@ -871,7 +875,7 @@ void ng_aclk_alarm_reload(void)
aclk_queue_query(aclk_query_new(METADATA_ALARMS));
}
-int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
+int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer;
json_object *msg;
@@ -902,7 +906,7 @@ int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
}
-int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
+int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
struct aclk_query *query;
@@ -926,7 +930,7 @@ int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
* Add a new collector to the list
* If it exists, update the chart count
*/
-void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
@@ -969,7 +973,7 @@ void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *m
* This function will release the memory used and schedule
* a cloud update
*/
-void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
@@ -1010,7 +1014,7 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
aclk_queue_query(query);
}
-void ng_aclk_host_state_update(RRDHOST *host, int cmd)
+void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
int ret;
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 444de86be..4d8546314 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -32,18 +32,18 @@ extern struct aclk_shared_state {
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;
-void ng_aclk_alarm_reload(void);
-int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
+void aclk_alarm_reload(void);
+int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
/* Informs ACLK about created/deleted chart
* @param create 0 - if chart was deleted, other if chart created
*/
-int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
+int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
-void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
+void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void ng_aclk_host_state_update(RRDHOST *host, int cmd);
+void aclk_host_state_update(RRDHOST *host, int cmd);
void aclk_send_node_instances(void);
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index 251f5b708..172cf2982 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -2,12 +2,9 @@
#include "libnetdata/libnetdata.h"
#include "database/rrd.h"
-#ifdef ACLK_NG
+#ifdef ENABLE_ACLK
#include "aclk.h"
#endif
-#ifdef ACLK_LEGACY
-#include "legacy/agent_cloud_link.h"
-#endif
int aclk_connected = 0;
int aclk_kill_link = 0;
@@ -20,12 +17,6 @@ int aclk_disable_single_updates = 0;
int aclk_stats_enabled;
-#ifdef ACLK_NG
-int aclk_ng = 1;
-#else
-int aclk_ng = 0;
-#endif
-
#define ACLK_IMPL_KEY_NAME "aclk implementation"
#ifdef ENABLE_ACLK
@@ -33,41 +24,13 @@ void *aclk_starter(void *ptr) {
char *aclk_impl_req = config_get(CONFIG_SECTION_CLOUD, ACLK_IMPL_KEY_NAME, "ng");
if (!strcasecmp(aclk_impl_req, "ng")) {
- aclk_ng = 1;
+ return aclk_main(ptr);
} else if (!strcasecmp(aclk_impl_req, "legacy")) {
- aclk_ng = 0;
+ error("Legacy ACLK is not supported anymore key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\" ignored. Using ACLK-NG.");
} else {
- error("Unknown value \"%s\" of key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\". Trying default ACLK %s.", aclk_impl_req, aclk_ng ? "NG" : "Legacy");
- }
-
-#ifndef ACLK_NG
- if (aclk_ng) {
- error("Configuration requests ACLK-NG but it is not available in this agent. Switching to Legacy.");
- aclk_ng = 0;
- }
-#endif
-
-#ifndef ACLK_LEGACY
- if (!aclk_ng) {
- error("Configuration requests ACLK Legacy but it is not available in this agent. Switching to NG.");
- aclk_ng = 1;
- }
-#endif
-
-#ifdef ACLK_NG
- if (aclk_ng) {
- info("Starting ACLK-NG");
- return aclk_main(ptr);
- }
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng) {
- info("Starting ACLK Legacy");
- return legacy_aclk_main(ptr);
+ error("Unknown value \"%s\" of key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\". Using ACLK-NG. This config key will be deprecated.", aclk_impl_req);
}
-#endif
- error_report("No ACLK could be started");
- return NULL;
+ return aclk_main(ptr);
}
void aclk_single_update_disable()
@@ -79,99 +42,15 @@ void aclk_single_update_enable()
{
aclk_disable_single_updates = 0;
}
-
-void aclk_alarm_reload(void)
-{
-#ifdef ACLK_NG
- if (aclk_ng)
- ng_aclk_alarm_reload();
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- legacy_aclk_alarm_reload();
-#endif
-}
-
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
-{
-#ifdef ACLK_NG
- if (aclk_ng)
- return ng_aclk_update_chart(host, chart_name, create);
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- return legacy_aclk_update_chart(host, chart_name, create);
-#endif
- error_report("No usable aclk_update_chart implementation");
- return 1;
-}
-
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
-{
-#ifdef ACLK_NG
- if (aclk_ng)
- return ng_aclk_update_alarm(host, ae);
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- return legacy_aclk_update_alarm(host, ae);
-#endif
- error_report("No usable aclk_update_alarm implementation");
- return 1;
-}
-
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
-#ifdef ACLK_NG
- if (aclk_ng)
- return ng_aclk_add_collector(host, plugin_name, module_name);
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- return legacy_aclk_add_collector(host, plugin_name, module_name);
-#endif
- error_report("No usable aclk_add_collector implementation");
-}
-
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
-#ifdef ACLK_NG
- if (aclk_ng)
- return ng_aclk_del_collector(host, plugin_name, module_name);
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- return legacy_aclk_del_collector(host, plugin_name, module_name);
-#endif
- error_report("No usable aclk_del_collector implementation");
-}
-
-void aclk_host_state_update(RRDHOST *host, int connect)
-{
-#ifdef ACLK_NG
- if (aclk_ng)
- return ng_aclk_host_state_update(host, connect);
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- return legacy_aclk_host_state_update(host, connect);
-#endif
- error_report("Couldn't use any version of aclk_host_state_update");
-}
-
#endif /* ENABLE_ACLK */
struct label *add_aclk_host_labels(struct label *label) {
-#ifdef ACLK_NG
+#ifdef ENABLE_ACLK
label = add_label_to_list(label, "_aclk_ng_available", "true", LABEL_SOURCE_AUTO);
#else
label = add_label_to_list(label, "_aclk_ng_available", "false", LABEL_SOURCE_AUTO);
#endif
-#ifdef ACLK_LEGACY
- label = add_label_to_list(label, "_aclk_legacy_available", "true", LABEL_SOURCE_AUTO);
-#else
label = add_label_to_list(label, "_aclk_legacy_available", "false", LABEL_SOURCE_AUTO);
-#endif
#ifdef ENABLE_ACLK
ACLK_PROXY_TYPE aclk_proxy;
char *proxy_str;
@@ -189,7 +68,7 @@ struct label *add_aclk_host_labels(struct label *label) {
break;
}
- label = add_label_to_list(label, "_aclk_impl", aclk_ng ? "Next Generation" : "Legacy", LABEL_SOURCE_AUTO);
+ label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#endif
return label;
@@ -199,30 +78,14 @@ char *aclk_state(void) {
#ifndef ENABLE_ACLK
return strdupz("ACLK Available: No");
#else
-#ifdef ACLK_NG
- if (aclk_ng)
- return ng_aclk_state();
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- return legacy_aclk_state();
+ return ng_aclk_state();
#endif
-#endif /* ENABLE_ACLK */
- return NULL;
}
char *aclk_state_json(void) {
#ifndef ENABLE_ACLK
- return strdupz("{\"aclk-available\": false}");
+ return strdupz("{\"aclk-available\":false}");
#else
-#ifdef ACLK_NG
- if (aclk_ng)
- return ng_aclk_state_json();
-#endif
-#ifdef ACLK_LEGACY
- if (!aclk_ng)
- return legacy_aclk_state_json();
+ return ng_aclk_state_json();
#endif
-#endif /* ENABLE_ACLK */
- return NULL;
}
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 18b4783ee..fe7ee123c 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -50,21 +50,21 @@ static inline int _aclk_queue_query(aclk_query_t query)
static inline volatile uint32_t *aclk_stats_qmetric_for_qtype(aclk_query_type_t qtype) {
switch (qtype) {
case HTTP_API_V2:
- return &aclk_metrics_per_sample.cloud_req_type_http;
+ return &aclk_metrics_per_sample.query_type_http;
case ALARM_STATE_UPDATE:
- return &aclk_metrics_per_sample.cloud_req_type_alarm_upd;
+ return &aclk_metrics_per_sample.query_type_alarm_upd;
case METADATA_INFO:
- return &aclk_metrics_per_sample.cloud_req_type_metadata_info;
+ return &aclk_metrics_per_sample.query_type_metadata_info;
case METADATA_ALARMS:
- return &aclk_metrics_per_sample.cloud_req_type_metadata_alarms;
+ return &aclk_metrics_per_sample.query_type_metadata_alarms;
case CHART_NEW:
- return &aclk_metrics_per_sample.cloud_req_type_chart_new;
+ return &aclk_metrics_per_sample.query_type_chart_new;
case CHART_DEL:
- return &aclk_metrics_per_sample.cloud_req_type_chart_del;
+ return &aclk_metrics_per_sample.query_type_chart_del;
case REGISTER_NODE:
- return &aclk_metrics_per_sample.cloud_req_type_register_node;
+ return &aclk_metrics_per_sample.query_type_register_node;
case NODE_STATE_UPDATE:
- return &aclk_metrics_per_sample.cloud_req_type_node_upd;
+ return &aclk_metrics_per_sample.query_type_node_upd;
default:
return NULL;
}
diff --git a/aclk/aclk_rrdhost_state.h b/aclk/aclk_rrdhost_state.h
index 73925b330..9138123df 100644
--- a/aclk/aclk_rrdhost_state.h
+++ b/aclk/aclk_rrdhost_state.h
@@ -30,6 +30,7 @@ typedef enum aclk_agent_state {
typedef struct aclk_rrdhost_state {
char *claimed_id; // Claimed ID if host has one otherwise NULL
+ char *prev_claimed_id; // Claimed ID if changed (reclaimed) during runtime
#ifdef ACLK_LEGACY
// per child popcorning
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index e7ce932ea..ecb2b4179 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -119,7 +119,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
}\
ACLK_SHARED_STATE_UNLOCK;
-static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
+static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
if (!aclk_use_new_cloud_arch) {
HTTP_CHECK_AGENT_INITIALIZED();
@@ -172,73 +172,43 @@ error:
return 1;
}
-typedef struct aclk_incoming_msg_type{
- char *name;
- int(*fnc)(struct aclk_request *, char *);
-}aclk_incoming_msg_type;
-
-aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
- { .name = "http", .fnc = aclk_handle_cloud_request_v2 },
- { .name = NULL, .fnc = NULL }
-};
-
-struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
-
-int aclk_handle_cloud_message(char *payload)
+int aclk_handle_cloud_cmd_message(char *payload)
{
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
-
if (unlikely(!payload)) {
- errno = 0;
- error("ACLK incoming message is empty");
- goto err_cleanup_nojson;
+ error_report("ACLK incoming 'cmd' message is empty");
+ return 1;
}
- debug(D_ACLK, "ACLK incoming message (%s)", payload);
+ debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload);
int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
if (unlikely(rc != JSON_OK)) {
- errno = 0;
- error("Malformed json request (%s)", payload);
+ error_report("Malformed json request (%s)", payload);
goto err_cleanup;
}
if (!cloud_to_agent.type_id) {
- errno = 0;
- error("Cloud message is missing compulsory key \"type\"");
+ error_report("Cloud message is missing compulsory key \"type\"");
goto err_cleanup;
}
-
- for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
- if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
- if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
- // in case of success handler is supposed to clean up after itself
- // or as in the case of aclk_handle_cloud_request take
- // ownership of the pointers (done to avoid copying)
- // see what `aclk_queue_query` parameter `internal` does
-
- // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
- // msg handlers (namely aclk_handle_version_responce)
- // can freely change what aclk_incoming_msg_types points to
- // so either exit or restart this for loop
- freez(cloud_to_agent.type_id);
- return 0;
- }
- goto err_cleanup;
- }
+ // Originally we were expecting to have multiple types of 'cmd' message,
+ // but after the new protocol was designed we will ever only have 'http'
+ if (strcmp(cloud_to_agent.type_id, "http")) {
+ error_report("Only 'http' cmd message is supported");
+ goto err_cleanup;
}
- errno = 0;
- error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
+ if (likely(!aclk_handle_cloud_http_request_v2(&cloud_to_agent, payload))) {
+ // aclk_handle_cloud_request takes ownership of the pointers
+ // (to avoid copying) in case of success
+ freez(cloud_to_agent.type_id);
+ return 0;
+ }
err_cleanup:
if (cloud_to_agent.payload)
@@ -250,191 +220,283 @@ err_cleanup:
if (cloud_to_agent.callback_topic)
freez(cloud_to_agent.callback_topic);
-err_cleanup_nojson:
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_err++;
- ACLK_STATS_UNLOCK;
- }
-
return 1;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+typedef uint32_t simple_hash_t;
+typedef int(*rx_msg_handler)(const char *msg, size_t msg_len);
+
+int handle_old_proto_cmd(const char *msg, size_t msg_len)
{
- // TODO do the look up table with hashes to optimize when there are more
- // than few
- if (!strcmp(message_type, "cmd")) {
- // msg is binary payload in all other cases
- // however in this message from old legacy cloud
- // we have to convert it to C string
- char *str = mallocz(msg_len+1);
- memcpy(str, msg, msg_len);
- str[msg_len] = 0;
- aclk_handle_cloud_message(str);
+ // msg is binary payload in all other cases
+ // however in this message from old legacy cloud
+ // we have to convert it to C string
+ char *str = mallocz(msg_len+1);
+ memcpy(str, msg, msg_len);
+ str[msg_len] = 0;
+ if (aclk_handle_cloud_cmd_message(str)) {
freez(str);
- return;
+ return 1;
}
- if (!strcmp(message_type, "CreateNodeInstanceResult")) {
- node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
- if (!res.machine_guid || !res.node_id) {
- error_report("Error parsing CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
+ freez(str);
+ return 0;
+}
- debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
+int create_node_instance_result(const char *msg, size_t msg_len)
+{
+ node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len);
+ if (!res.machine_guid || !res.node_id) {
+ error_report("Error parsing CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
+ }
- uuid_t host_id, node_id;
- if (uuid_parse(res.machine_guid, host_id)) {
- error("Error parsing machine_guid provided by CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
- if (uuid_parse(res.node_id, node_id)) {
- error("Error parsing node_id provided by CreateNodeInstanceResult");
- freez(res.machine_guid);
- freez(res.node_id);
- return;
- }
- update_node_id(&host_id, &node_id);
-
- aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
- query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
- rrdhost_aclk_state_lock(localhost);
- query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
- rrdhost_aclk_state_unlock(localhost);
-
- RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
- query->data.node_update.live = 0;
-
- if (host) {
- // not all host must have RRDHOST struct created for them
- // if they never connected during runtime of agent
- if (host == localhost) {
- query->data.node_update.live = 1;
- query->data.node_update.hops = 0;
- } else {
- netdata_mutex_lock(&host->receiver_lock);
- query->data.node_update.live = (host->receiver != NULL);
- netdata_mutex_unlock(&host->receiver_lock);
- query->data.node_update.hops = host->system_info->hops;
- }
- }
+ debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id);
- query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
- query->data.node_update.queryable = 1;
- query->data.node_update.session_id = aclk_session_newarch;
- aclk_queue_query(query);
+ uuid_t host_id, node_id;
+ if (uuid_parse(res.machine_guid, host_id)) {
+ error("Error parsing machine_guid provided by CreateNodeInstanceResult");
freez(res.machine_guid);
- return;
+ freez(res.node_id);
+ return 1;
}
- if (!strcmp(message_type, "SendNodeInstances")) {
- debug(D_ACLK, "Got SendNodeInstances");
- aclk_send_node_instances();
- return;
+ if (uuid_parse(res.node_id, node_id)) {
+ error("Error parsing node_id provided by CreateNodeInstanceResult");
+ freez(res.machine_guid);
+ freez(res.node_id);
+ return 1;
}
-
- if (!strcmp(message_type, "StreamChartsAndDimensions")) {
- stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return;
+ update_node_id(&host_id, &node_id);
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+
+ RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
+ query->data.node_update.live = 0;
+
+ if (host) {
+ // not all host must have RRDHOST struct created for them
+ // if they never connected during runtime of agent
+ if (host == localhost) {
+ query->data.node_update.live = 1;
+ query->data.node_update.hops = 0;
+ } else {
+ netdata_mutex_lock(&host->receiver_lock);
+ query->data.node_update.live = (host->receiver != NULL);
+ netdata_mutex_unlock(&host->receiver_lock);
+ query->data.node_update.hops = host->system_info->hops;
}
- chart_batch_id = res.batch_id;
- aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ }
+
+ query->data.node_update.node_id = res.node_id; // aclk_query_free will free it
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ aclk_queue_query(query);
+ freez(res.machine_guid);
+ return 0;
+}
+
+int send_node_instances(const char *msg, size_t msg_len)
+{
+ UNUSED(msg);
+ UNUSED(msg_len);
+ aclk_send_node_instances();
+ return 0;
+}
+
+int stream_charts_and_dimensions(const char *msg, size_t msg_len)
+{
+ stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
freez(res.claim_id);
freez(res.node_id);
- return;
+ return 1;
}
- if (!strcmp(message_type, "ChartsAndDimensionsAck")) {
- chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return;
- }
- aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ chart_batch_id = res.batch_id;
+ aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int charts_and_dimensions_ack(const char *msg, size_t msg_len)
+{
+ chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
+ if (!res.claim_id || !res.node_id) {
+ error("Error parsing StreamChartsAndDimensions msg");
freez(res.claim_id);
freez(res.node_id);
- return;
- }
- if (!strcmp(message_type, "UpdateChartConfigs")) {
- struct update_chart_config res = parse_update_chart_config(msg, msg_len);
- if (!res.claim_id || !res.node_id || !res.hashes)
- error("Error parsing UpdateChartConfigs msg");
- else
- aclk_get_chart_config(res.hashes);
- destroy_update_chart_config(&res);
- return;
+ return 1;
}
- if (!strcmp(message_type, "StartAlarmStreaming")) {
- struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
- if (!res.node_id || !res.batch_id) {
- error("Error parsing StartAlarmStreaming");
- freez(res.node_id);
- return;
- }
- aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
+ freez(res.claim_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int update_chart_configs(const char *msg, size_t msg_len)
+{
+ struct update_chart_config res = parse_update_chart_config(msg, msg_len);
+ if (!res.claim_id || !res.node_id || !res.hashes)
+ error("Error parsing UpdateChartConfigs msg");
+ else
+ aclk_get_chart_config(res.hashes);
+ destroy_update_chart_config(&res);
+ return 0;
+}
+
+int start_alarm_streaming(const char *msg, size_t msg_len)
+{
+ struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
+ if (!res.node_id || !res.batch_id) {
+ error("Error parsing StartAlarmStreaming");
freez(res.node_id);
- return;
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmLogHealth")) {
- char *node_id = parse_send_alarm_log_health(msg, msg_len);
- if (!node_id) {
- error("Error parsing SendAlarmLogHealth");
- return;
- }
- aclk_send_alarm_health_log(node_id);
- freez(node_id);
- return;
+ aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id);
+ freez(res.node_id);
+ return 0;
+}
+
+int send_alarm_log_health(const char *msg, size_t msg_len)
+{
+ char *node_id = parse_send_alarm_log_health(msg, msg_len);
+ if (!node_id) {
+ error("Error parsing SendAlarmLogHealth");
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmConfiguration")) {
- char *config_hash = parse_send_alarm_configuration(msg, msg_len);
- if (!config_hash || !*config_hash) {
- error("Error parsing SendAlarmConfiguration");
- freez(config_hash);
- return;
- }
- aclk_send_alarm_configuration(config_hash);
+ aclk_send_alarm_health_log(node_id);
+ freez(node_id);
+ return 0;
+}
+
+int send_alarm_configuration(const char *msg, size_t msg_len)
+{
+ char *config_hash = parse_send_alarm_configuration(msg, msg_len);
+ if (!config_hash || !*config_hash) {
+ error("Error parsing SendAlarmConfiguration");
freez(config_hash);
- return;
+ return 1;
}
- if (!strcmp(message_type, "SendAlarmSnapshot")) {
- struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
- if (!sas->node_id || !sas->claim_id) {
- error("Error parsing SendAlarmSnapshot");
- destroy_send_alarm_snapshot(sas);
- return;
- }
- aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ aclk_send_alarm_configuration(config_hash);
+ freez(config_hash);
+ return 0;
+}
+
+int send_alarm_snapshot(const char *msg, size_t msg_len)
+{
+ struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
+ if (!sas->node_id || !sas->claim_id) {
+ error("Error parsing SendAlarmSnapshot");
destroy_send_alarm_snapshot(sas);
- return;
+ return 1;
+ }
+ aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+ destroy_send_alarm_snapshot(sas);
+ return 0;
+}
+
+int handle_disconnect_req(const char *msg, size_t msg_len)
+{
+ struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
+ if (!cmd)
+ return 1;
+ if (cmd->permaban) {
+ error("Cloud Banned This Agent!");
+ aclk_disable_runtime = 1;
+ }
+ info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
+ if (cmd->reconnect_after_s > 0) {
+ aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
+ info(
+ "Cloud asks not to reconnect for %u seconds. We shall honor that request",
+ (unsigned int)cmd->reconnect_after_s);
+ }
+ disconnect_req = 1;
+ freez(cmd->error_description);
+ freez(cmd);
+ return 0;
+}
+
+typedef struct {
+ const char *name;
+ simple_hash_t name_hash;
+ rx_msg_handler fnc;
+} new_cloud_rx_msg_t;
+
+new_cloud_rx_msg_t rx_msgs[] = {
+ { .name = "cmd", .name_hash = 0, .fnc = handle_old_proto_cmd },
+ { .name = "CreateNodeInstanceResult", .name_hash = 0, .fnc = create_node_instance_result },
+ { .name = "SendNodeInstances", .name_hash = 0, .fnc = send_node_instances },
+ { .name = "StreamChartsAndDimensions", .name_hash = 0, .fnc = stream_charts_and_dimensions },
+ { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack },
+ { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs },
+ { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming },
+ { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health },
+ { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
+ { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
+ { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
+ { .name = NULL, .name_hash = 0, .fnc = NULL },
+};
+
+new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash)
+{
+ // we can afford to not compare strings after hash match
+ // because we check for collisions at initialization in
+ // aclk_init_rx_msg_handlers()
+ for (int i = 0; rx_msgs[i].fnc; i++) {
+ if (rx_msgs[i].name_hash == hash)
+ return &rx_msgs[i];
}
- if (!strcmp(message_type, "DisconnectReq")) {
- struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
- if (!cmd)
- return;
- if (cmd->permaban) {
- error ("Cloud Banned This Agent!");
- aclk_disable_runtime = 1;
+ return NULL;
+}
+
+void aclk_init_rx_msg_handlers(void)
+{
+ for (int i = 0; rx_msgs[i].fnc; i++) {
+ simple_hash_t hash = simple_hash(rx_msgs[i].name);
+ new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash);
+ if (unlikely(hdl)) {
+ // the list of message names changes only by changing
+ // the source code, therefore fatal is appropriate
+ fatal("Hash collision. Choose better hash. Added '%s' clashes with existing '%s'", rx_msgs[i].name, hdl->name);
+ }
+ rx_msgs[i].name_hash = hash;
+ }
+}
+
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+{
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_recvd++;
+ ACLK_STATS_UNLOCK;
+ }
+ new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type));
+ debug(D_ACLK, "Got message named '%s' from cloud", message_type);
+ if (unlikely(!msg_descriptor)) {
+ error("Do not know how to handle message of type '%s'. Ignoring", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
}
- info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
- if (cmd->reconnect_after_s > 0) {
- aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
- info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s);
+ return;
+ }
+ if (msg_descriptor->fnc(msg, msg_len)) {
+ error("Error processing message of type '%s'", message_type);
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_err++;
+ ACLK_STATS_UNLOCK;
}
- disconnect_req = 1;
- freez(cmd->error_description);
- freez(cmd);
return;
}
- error ("Unknown new cloud arch message type received \"%s\"", message_type);
}
#endif
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index 074dc004a..38243a4c9 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -8,9 +8,10 @@
#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
-int aclk_handle_cloud_message(char *payload);
+int aclk_handle_cloud_cmd_message(char *payload);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+void aclk_init_rx_msg_handlers(void);
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len);
#endif
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 765c6a333..a7d4a4709 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -124,7 +124,7 @@ static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample
if (unlikely(!st)) {
st = rrdset_create_localhost(
- "netdata", "aclk_cloud_req_type", NULL, "aclk", NULL, "Requests received from cloud by their type", "req/s",
+ "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s",
"netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
rd_type_http = rrddim_add(st, "http", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
@@ -138,14 +138,14 @@ static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample
} else
rrdset_next(st);
- rrddim_set_by_pointer(st, rd_type_http, per_sample->cloud_req_type_http);
- rrddim_set_by_pointer(st, rd_type_alarm_upd, per_sample->cloud_req_type_alarm_upd);
- rrddim_set_by_pointer(st, rd_type_metadata_info, per_sample->cloud_req_type_metadata_info);
- rrddim_set_by_pointer(st, rd_type_metadata_alarms, per_sample->cloud_req_type_metadata_alarms);
- rrddim_set_by_pointer(st, rd_type_chart_new, per_sample->cloud_req_type_chart_new);
- rrddim_set_by_pointer(st, rd_type_chart_del, per_sample->cloud_req_type_chart_del);
- rrddim_set_by_pointer(st, rd_type_register_node, per_sample->cloud_req_type_register_node);
- rrddim_set_by_pointer(st, rd_type_node_upd, per_sample->cloud_req_type_node_upd);
+ rrddim_set_by_pointer(st, rd_type_http, per_sample->query_type_http);
+ rrddim_set_by_pointer(st, rd_type_alarm_upd, per_sample->query_type_alarm_upd);
+ rrddim_set_by_pointer(st, rd_type_metadata_info, per_sample->query_type_metadata_info);
+ rrddim_set_by_pointer(st, rd_type_metadata_alarms, per_sample->query_type_metadata_alarms);
+ rrddim_set_by_pointer(st, rd_type_chart_new, per_sample->query_type_chart_new);
+ rrddim_set_by_pointer(st, rd_type_chart_del, per_sample->query_type_chart_del);
+ rrddim_set_by_pointer(st, rd_type_register_node, per_sample->query_type_register_node);
+ rrddim_set_by_pointer(st, rd_type_node_upd, per_sample->query_type_node_upd);
rrdset_done(st);
}
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index 317a34ba4..3cc6a0cb0 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -48,15 +48,15 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_req_recvd;
volatile uint32_t cloud_req_err;
- // request types.
- volatile uint32_t cloud_req_type_http;
- volatile uint32_t cloud_req_type_alarm_upd;
- volatile uint32_t cloud_req_type_metadata_info;
- volatile uint32_t cloud_req_type_metadata_alarms;
- volatile uint32_t cloud_req_type_chart_new;
- volatile uint32_t cloud_req_type_chart_del;
- volatile uint32_t cloud_req_type_register_node;
- volatile uint32_t cloud_req_type_node_upd;
+ // query types.
+ volatile uint32_t query_type_http;
+ volatile uint32_t query_type_alarm_upd;
+ volatile uint32_t query_type_metadata_info;
+ volatile uint32_t query_type_metadata_alarms;
+ volatile uint32_t query_type_chart_new;
+ volatile uint32_t query_type_chart_del;
+ volatile uint32_t query_type_register_node;
+ volatile uint32_t query_type_node_upd;
// HTTP-specific request types.
volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT];
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 237c1bdd2..74fc19c72 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -147,7 +147,11 @@ static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obje
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
- mqtt_wss_publish_pid(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id);
+ int rc = mqtt_wss_publish_pid_block(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT)
+ error("Timeout sending binpacked message");
+ if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL)
+ error("Message is bigger than allowed maximum");
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
@@ -188,7 +192,7 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time
// TODO handle this somehow on older json-c
// tmp = json_object_new_uint64(ts_us);
-// probably jso->_to_json_strinf -> custom function
+// probably jso->_to_json_string -> custom function
// jso->o.c_uint64 -> map this with pointer to signed int
// commit that implements json_object_new_uint64 is 3c3b592
// between 0.14 and 0.15
@@ -420,7 +424,10 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
rrdhost_aclk_state_unlock(localhost);
return 0;
}
- conn.claim_id = localhost->aclk_state.claimed_id;
+ if (localhost->aclk_state.prev_claimed_id)
+ conn.claim_id = localhost->aclk_state.prev_claimed_id;
+ else
+ conn.claim_id = localhost->aclk_state.claimed_id;
char *msg = generate_update_agent_connection(&len, &conn);
rrdhost_aclk_state_unlock(localhost);
@@ -432,6 +439,10 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
freez(msg);
+ if (localhost->aclk_state.prev_claimed_id) {
+ freez(localhost->aclk_state.prev_claimed_id);
+ localhost->aclk_state.prev_claimed_id = NULL;
+ }
return pid;
}
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 07de5c58a..4d8744e7f 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -7,7 +7,6 @@
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK
-
extern int aclk_use_new_cloud_arch;
extern usec_t aclk_session_newarch;
diff --git a/aclk/legacy/Makefile.am b/aclk/legacy/Makefile.am
deleted file mode 100644
index 1cd876b40..000000000
--- a/aclk/legacy/Makefile.am
+++ /dev/null
@@ -1,19 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-
-CLEANFILES = \
- tests/install-fake-charts.d.sh \
- $(NULL)
-
-include $(top_srcdir)/build/subst.inc
-SUFFIXES = .in
-
-#sbin_SCRIPTS = \
-# tests/install-fake-charts.d.sh \
-# $(NULL)
-
-dist_noinst_SCRIPTS = tests/install-fake-charts.d.sh
-dist_noinst_DATA = tests/install-fake-charts.d.sh.in
-
diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c
deleted file mode 100644
index 7f8368e44..000000000
--- a/aclk/legacy/aclk_common.c
+++ /dev/null
@@ -1,53 +0,0 @@
-#include "aclk_common.h"
-
-#include "daemon/common.h"
-
-#ifdef ENABLE_ACLK
-#include <libwebsockets.h>
-#endif
-
-netdata_mutex_t legacy_aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
-
-struct legacy_aclk_shared_state legacy_aclk_shared_state = {
- .version_neg = 0,
- .version_neg_wait_till = 0
-};
-
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
-{
- int pos = 0;
- if (!strncmp("https://", url, 8)) {
- pos = 8;
- } else if (!strncmp("http://", url, 7)) {
- error("Cannot connect ACLK over %s -> unencrypted link is not supported", url);
- return 1;
- }
- int host_end = pos;
- while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':')
- host_end++;
- if (url[host_end] == 0) {
- *aclk_hostname = strdupz(url + pos);
- *aclk_port = 443;
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
- return 0;
- }
- if (url[host_end] == ':') {
- *aclk_hostname = callocz(host_end - pos + 1, 1);
- strncpy(*aclk_hostname, url + pos, host_end - pos);
- int port_end = host_end + 1;
- while (url[port_end] >= '0' && url[port_end] <= '9')
- port_end++;
- if (port_end - host_end > 6) {
- error("Port specified in %s is invalid", url);
- return 0;
- }
- *aclk_port = atoi(&url[host_end+1]);
- }
- if (url[host_end] == '/') {
- *aclk_port = 443;
- *aclk_hostname = callocz(1, host_end - pos + 1);
- strncpy(*aclk_hostname, url+pos, host_end - pos);
- }
- info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
- return 0;
-}
diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h
deleted file mode 100644
index 080680ff1..000000000
--- a/aclk/legacy/aclk_common.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#ifndef ACLK_COMMON_H
-#define ACLK_COMMON_H
-
-#include "../aclk_rrdhost_state.h"
-#include "daemon/common.h"
-
-extern netdata_mutex_t legacy_aclk_shared_state_mutex;
-#define legacy_aclk_shared_state_LOCK netdata_mutex_lock(&legacy_aclk_shared_state_mutex)
-#define legacy_aclk_shared_state_UNLOCK netdata_mutex_unlock(&legacy_aclk_shared_state_mutex)
-
-// minimum and maximum supported version of ACLK
-// in this version of agent
-#define ACLK_VERSION_MIN 2
-#define ACLK_VERSION_MAX 3
-
-// Version negotiation messages have they own versioning
-// this is also used for LWT message as we set that up
-// before version negotiation
-#define ACLK_VERSION_NEG_VERSION 1
-
-// Maximum time to wait for version negotiation before aborting
-// and defaulting to oldest supported version
-#define VERSION_NEG_TIMEOUT 3
-
-#if ACLK_VERSION_MIN > ACLK_VERSION_MAX
-#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
-#endif
-
-// Define ACLK Feature Version Boundaries Here
-#define ACLK_V_COMPRESSION 2
-#define ACLK_V_CHILDRENSTATE 3
-
-#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING)
-#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update)
-
-extern struct legacy_aclk_shared_state {
- // optimization to avoid looping through hosts
- // every time Query Thread wakes up
- RRDHOST *next_popcorn_host;
-
- // read only while ACLK connected
- // protect by lock otherwise
- int version_neg;
- usec_t version_neg_wait_till;
-} legacy_aclk_shared_state;
-
-const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
-
-int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
-
-#endif //ACLK_COMMON_H
diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c
deleted file mode 100644
index 8a490c6f4..000000000
--- a/aclk/legacy/aclk_lws_https_client.c
+++ /dev/null
@@ -1,244 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#define ACLK_LWS_HTTPS_CLIENT_INTERNAL
-#include "aclk_lws_https_client.h"
-#include "aclk_common.h"
-#include "aclk_lws_wss_client.h"
-
-#define SMALL_BUFFER 16
-
-struct simple_hcc_data {
- char *data;
- size_t data_size;
- size_t written;
- char lws_work_buffer[1024 + LWS_PRE];
- char *payload;
- int response_code;
- int done;
-};
-
-static int simple_https_client_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
-{
- UNUSED(user);
- int n;
- char *ptr;
- char buffer[SMALL_BUFFER];
- struct simple_hcc_data *perconn_data = lws_get_opaque_user_data(wsi);
-
- switch (reason) {
- case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ:
- debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ");
- if (perconn_data->data_size - 1 - perconn_data->written < len)
- return 1;
- memcpy(&perconn_data->data[perconn_data->written], in, len);
- perconn_data->written += len;
- return 0;
- case LWS_CALLBACK_RECEIVE_CLIENT_HTTP:
- debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP");
- if(!perconn_data) {
- error("Missing Per Connect Data");
- return -1;
- }
- n = sizeof(perconn_data->lws_work_buffer) - LWS_PRE;
- ptr = perconn_data->lws_work_buffer + LWS_PRE;
- if (lws_http_client_read(wsi, &ptr, &n) < 0)
- return -1;
- perconn_data->data[perconn_data->written] = '\0';
- return 0;
- case LWS_CALLBACK_WSI_DESTROY:
- debug(D_ACLK, "LWS_CALLBACK_WSI_DESTROY");
- if(perconn_data)
- perconn_data->done = 1;
- return 0;
- case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
- debug(D_ACLK, "LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP");
- if(perconn_data)
- perconn_data->response_code = lws_http_client_http_response(wsi);
- return 0;
- case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
- debug(D_ACLK, "LWS_CALLBACK_CLOSED_CLIENT_HTTP");
- return 0;
- case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
- debug(D_ACLK, "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS");
- return 0;
- case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
- debug(D_ACLK, "LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER");
- if(perconn_data && perconn_data->payload) {
- unsigned char **p = (unsigned char **)in, *end = (*p) + len;
- snprintfz(buffer, SMALL_BUFFER, "%zu", strlen(perconn_data->payload));
- if (lws_add_http_header_by_token(wsi,
- WSI_TOKEN_HTTP_CONTENT_LENGTH,
- (unsigned char *)buffer, strlen(buffer), p, end))
- return -1;
- if (lws_add_http_header_by_token(wsi,
- WSI_TOKEN_HTTP_CONTENT_TYPE,
- (unsigned char *)ACLK_CONTENT_TYPE_JSON,
- strlen(ACLK_CONTENT_TYPE_JSON), p, end))
- return -1;
- lws_client_http_body_pending(wsi, 1);
- lws_callback_on_writable(wsi);
- }
- return 0;
- case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE:
- debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_WRITEABLE");
- if(perconn_data && perconn_data->payload) {
- n = strlen(perconn_data->payload);
- if(perconn_data->data_size < (size_t)LWS_PRE + n + 1) {
- error("Buffer given is not big enough");
- return 1;
- }
-
- memcpy(&perconn_data->data[LWS_PRE], perconn_data->payload, n);
- if(n != lws_write(wsi, (unsigned char*)&perconn_data->data[LWS_PRE], n, LWS_WRITE_HTTP)) {
- error("lws_write error");
- perconn_data->data[0] = 0;
- return 1;
- }
- lws_client_http_body_pending(wsi, 0);
- // clean for subsequent reply read
- perconn_data->data[0] = 0;
- }
- return 0;
- case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
- debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL");
- return 0;
- case LWS_CALLBACK_WSI_CREATE:
- debug(D_ACLK, "LWS_CALLBACK_WSI_CREATE");
- return 0;
- case LWS_CALLBACK_PROTOCOL_INIT:
- debug(D_ACLK, "LWS_CALLBACK_PROTOCOL_INIT");
- return 0;
- case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
- debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL");
- return 0;
- case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
- debug(D_ACLK, "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED");
- return 0;
- case LWS_CALLBACK_GET_THREAD_ID:
- debug(D_ACLK, "LWS_CALLBACK_GET_THREAD_ID");
- return 0;
- case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
- debug(D_ACLK, "LWS_CALLBACK_EVENT_WAIT_CANCELLED");
- return 0;
- case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
- debug(D_ACLK, "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION");
- return 0;
- case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
- debug(D_ACLK, "LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH");
- return 0;
- default:
- debug(D_ACLK, "Unknown callback %d", (int)reason);
- return 0;
- }
-}
-
-static const struct lws_protocols protocols[] = {
- {
- "http",
- simple_https_client_callback,
- 0,
- 0,
- 0,
- 0,
- 0
- },
- { NULL, NULL, 0, 0, 0, 0, 0 }
-};
-
-static void simple_hcc_log_divert(int level, const char *line)
-{
- UNUSED(level);
- error("Libwebsockets: %s", line);
-}
-
-int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload)
-{
- info("%s %s", __func__, method);
-
- struct lws_context_creation_info info;
- struct lws_client_connect_info i;
- struct lws_context *context;
-
- struct simple_hcc_data *data = callocz(1, sizeof(struct simple_hcc_data));
- data->data = b;
- data->data[0] = 0;
- data->data_size = b_size;
- data->payload = payload;
-
- int n = 0;
- time_t timestamp;
-
- struct lws_vhost *vhost;
-
- memset(&info, 0, sizeof info);
-
- info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
- info.port = CONTEXT_PORT_NO_LISTEN;
- info.protocols = protocols;
-
-
- context = lws_create_context(&info);
- if (!context) {
- error("Error creating LWS context");
- freez(data);
- return 1;
- }
-
- lws_set_log_level(LLL_ERR | LLL_WARN, simple_hcc_log_divert);
-
- lws_service(context, 0);
-
- memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
- i.context = context;
-
-#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
- i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE;
- info("Disabling SSL certificate checks");
-#else
- i.ssl_connection = LCCSCF_USE_SSL;
-#endif
-#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0
-#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM.
- i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
-#endif
-
- i.port = port;
- i.address = host;
- i.path = url;
-
- i.host = i.address;
- i.origin = i.address;
- i.method = method;
- i.opaque_user_data = data;
- i.alpn = "http/1.1";
-
- i.protocol = protocols[0].name;
-
- vhost = lws_get_vhost_by_name(context, "default");
- if(!vhost)
- fatal("Could not find the default LWS vhost.");
-
- //set up proxy
- aclk_wss_set_proxy(vhost);
-
- lws_client_connect_via_info(&i);
-
- // libwebsockets handle connection timeouts already
- // this adds additional safety in case of bug in LWS
- timestamp = now_monotonic_sec();
- while( n >= 0 && !data->done && !netdata_exit) {
- n = lws_service(context, 0);
- if( now_monotonic_sec() - timestamp > SEND_HTTPS_REQUEST_TIMEOUT ) {
- data->data[0] = 0;
- data->done = 1;
- error("Servicing LWS took too long.");
- }
- }
-
- lws_context_destroy(context);
-
- n = data->response_code;
-
- freez(data);
- return (n < 200 || n >= 300);
-}
diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h
deleted file mode 100644
index 5f30a37fd..000000000
--- a/aclk/legacy/aclk_lws_https_client.h
+++ /dev/null
@@ -1,18 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_LWS_HTTPS_CLIENT_H
-#define NETDATA_LWS_HTTPS_CLIENT_H
-
-#include "daemon/common.h"
-#include "libnetdata/libnetdata.h"
-
-#define DATAMAXLEN 1024*16
-
-#ifdef ACLK_LWS_HTTPS_CLIENT_INTERNAL
-#define ACLK_CONTENT_TYPE_JSON "application/json"
-#define SEND_HTTPS_REQUEST_TIMEOUT 30
-#endif
-
-int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload);
-
-#endif /* NETDATA_LWS_HTTPS_CLIENT_H */
diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c
deleted file mode 100644
index 012f2a8cc..000000000
--- a/aclk/legacy/aclk_lws_wss_client.c
+++ /dev/null
@@ -1,622 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "aclk_lws_wss_client.h"
-
-#include "libnetdata/libnetdata.h"
-#include "daemon/common.h"
-#include "aclk_common.h"
-#include "aclk_stats.h"
-#include "../aclk_proxy.h"
-
-extern int aclk_shutting_down;
-
-static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
-
-struct aclk_lws_wss_perconnect_data {
- int todo;
-};
-
-static struct aclk_lws_wss_engine_instance *engine_instance = NULL;
-
-void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len)
-{
- if (write_len != NULL && write_len_bytes != NULL)
- {
- *write_len = 0;
- *write_len_bytes = 0;
- if (engine_instance != NULL)
- {
- aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
-
- struct lws_wss_packet_buffer *write_b;
- size_t w,wb;
- for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
- {
- w++;
- wb += write_b->data_size - write_b->written;
- }
- *write_len = w;
- *write_len_bytes = wb;
- aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
- }
- }
- else if (write_len != NULL)
- {
- *write_len = 0;
- if (engine_instance != NULL)
- {
- aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
-
- struct lws_wss_packet_buffer *write_b;
- size_t w;
- for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
- w++;
- *write_len = w;
- aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
- }
- }
- if (read_len != NULL)
- {
- *read_len = 0;
- if (engine_instance != NULL)
- {
- aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
- *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
- aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
- }
- }
-}
-
-static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size)
-{
- struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
- if (data) {
- new->data = mallocz(LWS_PRE + size);
- memcpy(new->data + LWS_PRE, data, size);
- new->data_size = size;
- new->written = 0;
- }
- return new;
-}
-
-static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item)
-{
- struct lws_wss_packet_buffer *tail = *list;
- if (!*list) {
- *list = item;
- return;
- }
- while (tail->next) {
- tail = tail->next;
- }
- tail->next = item;
-}
-
-static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list)
-{
- struct lws_wss_packet_buffer *ret = *list;
- if (ret != NULL)
- *list = ret->next;
-
- return ret;
-}
-
-static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item)
-{
- freez(item->data);
- freez(item);
-}
-
-static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer)
-{
- size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL);
- if (elems > 0)
- lws_ring_consume(ringbuffer, NULL, NULL, elems);
-}
-
-static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list)
-{
- struct lws_wss_packet_buffer *i;
- while ((i = lws_wss_packet_buffer_pop(list)) != NULL) {
- lws_wss_packet_buffer_free(i);
- }
- *list = NULL;
-}
-
-static inline void aclk_lws_wss_clear_io_buffers()
-{
- aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
- _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer);
- aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
- aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
- _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head);
- aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
-}
-
-static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback,
- sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 },
- { NULL, NULL, 0, 0, 0, 0, 0 } };
-
-static void aclk_lws_wss_log_divert(int level, const char *line)
-{
- switch (level) {
- case LLL_ERR:
- error("Libwebsockets Error: %s", line);
- break;
- case LLL_WARN:
- debug(D_ACLK, "Libwebsockets Warn: %s", line);
- break;
- default:
- error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line);
- }
-}
-
-static int aclk_lws_wss_client_init( char *target_hostname, int target_port)
-{
- static int lws_logging_initialized = 0;
-
- if (unlikely(!lws_logging_initialized)) {
- lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert);
- lws_logging_initialized = 1;
- }
-
- if (!target_hostname)
- return 1;
-
- engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance));
-
- engine_instance->host = target_hostname;
- engine_instance->port = target_port;
-
-
- aclk_lws_mutex_init(&engine_instance->write_buf_mutex);
- aclk_lws_mutex_init(&engine_instance->read_buf_mutex);
-
- engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL);
- if (!engine_instance->read_ringbuffer)
- goto failure_cleanup;
-
- return 0;
-
-failure_cleanup:
- freez(engine_instance);
- return 1;
-}
-
-void aclk_lws_wss_destroy_context()
-{
- if (!engine_instance)
- return;
- if (!engine_instance->lws_context)
- return;
- lws_context_destroy(engine_instance->lws_context);
- engine_instance->lws_context = NULL;
-}
-
-
-void aclk_lws_wss_client_destroy()
-{
- if (engine_instance == NULL)
- return;
-
- aclk_lws_wss_destroy_context();
- engine_instance->lws_wsi = NULL;
-
- aclk_lws_wss_clear_io_buffers();
-
-#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
- pthread_mutex_destroy(&engine_instance->write_buf_mutex);
- pthread_mutex_destroy(&engine_instance->read_buf_mutex);
-#endif
-}
-
-#ifdef LWS_WITH_SOCKS5
-static int aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks)
-{
- char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
-
- if (!proxy)
- return -1;
-
- proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
-
- if (!*proxy)
- return -1;
-
- return lws_set_socks(vhost, proxy);
-}
-#endif
-
-void aclk_wss_set_proxy(struct lws_vhost *vhost)
-{
- const char *proxy;
- ACLK_PROXY_TYPE proxy_type;
- char *log;
-
- proxy = aclk_get_proxy(&proxy_type);
-
-#ifdef LWS_WITH_SOCKS5
- lws_set_socks(vhost, ":");
-#endif
- lws_set_proxy(vhost, ":");
-
- if (proxy_type == PROXY_TYPE_UNKNOWN) {
- error("Unknown proxy type");
- return;
- }
-
- if (proxy_type == PROXY_TYPE_SOCKS5 || proxy_type == PROXY_TYPE_HTTP) {
- log = strdupz(proxy);
- safe_log_proxy_censor(log);
- info("Connecting using %s proxy:\"%s\"", aclk_proxy_type_to_s(&proxy_type), log);
- freez(log);
- }
- if (proxy_type == PROXY_TYPE_SOCKS5) {
-#ifdef LWS_WITH_SOCKS5
- if (aclk_wss_set_socks(vhost, proxy))
- error("LWS failed to accept socks proxy.");
- return;
-#else
- fatal("We have no SOCKS5 support but we made it here. Programming error!");
-#endif
- }
- if (proxy_type == PROXY_TYPE_HTTP) {
- if (lws_set_proxy(vhost, proxy))
- error("LWS failed to accept http proxy.");
- return;
- }
- if (proxy_type != PROXY_DISABLED)
- error("Unknown proxy type");
-}
-
-// Return code indicates if connection attempt has started async.
-int aclk_lws_wss_connect(char *host, int port)
-{
- struct lws_client_connect_info i;
- struct lws_vhost *vhost;
- int n;
-
- if (!engine_instance) {
- if (aclk_lws_wss_client_init(host, port))
- return 1; // Propagate failure
- }
-
- if (!engine_instance->lws_context)
- {
- // First time through (on this connection), create the context
- struct lws_context_creation_info info;
- memset(&info, 0, sizeof(struct lws_context_creation_info));
- info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
- info.port = CONTEXT_PORT_NO_LISTEN;
- info.protocols = protocols;
- engine_instance->lws_context = lws_create_context(&info);
- if (!engine_instance->lws_context)
- {
- error("Failed to create lws_context, ACLK will not function");
- return 1;
- }
- return 0;
- // PROTOCOL_INIT callback will call again.
- }
-
- for (n = 0; n < ACLK_LWS_CALLBACK_HISTORY; n++)
- engine_instance->lws_callback_history[n] = 0;
-
- if (engine_instance->lws_wsi) {
- error("Already Connected. Only one connection supported at a time.");
- return 0;
- }
-
- memset(&i, 0, sizeof(i));
- i.context = engine_instance->lws_context;
- i.port = engine_instance->port;
- i.address = engine_instance->host;
- i.path = "/mqtt";
- i.host = engine_instance->host;
- i.protocol = "mqtt";
-
- // from LWS docu:
- // If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is
- // created; you're expected to create your own vhosts afterwards using
- // lws_create_vhost(). Otherwise a vhost named "default" is also created
- // using the information in the vhost-related members, for compatibility.
- vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default");
- if(!vhost)
- fatal("Could not find the default LWS vhost.");
-
- aclk_wss_set_proxy(vhost);
-
-#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
- i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE;
- info("Disabling SSL certificate checks");
-#else
- i.ssl_connection = LCCSCF_USE_SSL;
-#endif
-#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0
-#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM.
- i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
-#endif
- lws_client_connect_via_info(&i);
- return 0;
-}
-
-static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len)
-{
- if (lws_ring_insert(buffer, data, len) != len) {
- error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding.");
- return 0;
- }
- return 1;
-}
-
-#ifdef ACLK_TRP_DEBUG_VERBOSE
-static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
-{
- switch (reason) {
- case LWS_CALLBACK_CLIENT_WRITEABLE:
- return "LWS_CALLBACK_CLIENT_WRITEABLE";
- case LWS_CALLBACK_CLIENT_RECEIVE:
- return "LWS_CALLBACK_CLIENT_RECEIVE";
- case LWS_CALLBACK_PROTOCOL_INIT:
- return "LWS_CALLBACK_PROTOCOL_INIT";
- case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
- return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED";
- case LWS_CALLBACK_USER:
- return "LWS_CALLBACK_USER";
- case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
- return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR";
- case LWS_CALLBACK_CLIENT_CLOSED:
- return "LWS_CALLBACK_CLIENT_CLOSED";
- case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
- return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE";
- case LWS_CALLBACK_WSI_DESTROY:
- return "LWS_CALLBACK_WSI_DESTROY";
- case LWS_CALLBACK_CLIENT_ESTABLISHED:
- return "LWS_CALLBACK_CLIENT_ESTABLISHED";
- case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
- return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION";
- case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
- return "LWS_CALLBACK_EVENT_WAIT_CANCELLED";
- default:
- // Not using an internal buffer here for thread-safety with unknown calling context.
- error("Unknown LWS callback %u", reason);
- return "unknown";
- }
-}
-#endif
-
-void aclk_lws_wss_fail_report()
-{
- int i;
- int anything_to_send = 0;
- BUFFER *buf;
-
- if (netdata_anonymous_statistics_enabled <= 0)
- return;
-
- // guess - most of the callback will be 1-99 + ',' + \0
- buf = buffer_create((ACLK_LWS_CALLBACK_HISTORY * 2) + 10);
-
- for (i = 0; i < ACLK_LWS_CALLBACK_HISTORY; i++)
- if (engine_instance->lws_callback_history[i]) {
- buffer_sprintf(buf, "%s%d", (i ? "," : ""), engine_instance->lws_callback_history[i]);
- anything_to_send = 1;
- }
-
- if (anything_to_send)
- send_statistics("ACLK_CONN_FAIL", "FAIL", buffer_tostring(buf));
-
- buffer_free(buf);
-}
-
-static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
-{
- UNUSED(user);
- struct lws_wss_packet_buffer *data;
- int retval = 0;
- static int lws_shutting_down = 0;
- int i;
-
- for (i = ACLK_LWS_CALLBACK_HISTORY - 1; i > 0; i--)
- engine_instance->lws_callback_history[i] = engine_instance->lws_callback_history[i - 1];
- engine_instance->lws_callback_history[0] = (int)reason;
-
- if (unlikely(aclk_shutting_down && !lws_shutting_down)) {
- lws_shutting_down = 1;
- retval = -1;
- engine_instance->upstream_reconnect_request = 0;
- }
-
- // Callback servicing is forced when we are closed from above.
- if (engine_instance->upstream_reconnect_request) {
- error("Closing lws connection due to libmosquitto error.");
- char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
- lws_close_reason(
- wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error,
- strlen(upstream_connection_error));
- retval = -1;
- engine_instance->upstream_reconnect_request = 0;
- }
-
- // Don't log to info - volume is proportional to message flow on ACLK.
- switch (reason) {
- case LWS_CALLBACK_CLIENT_WRITEABLE:
- aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
- data = engine_instance->write_buffer_head;
- if (likely(data)) {
- size_t bytes_left = data->data_size - data->written;
- if ( bytes_left > FRAGMENT_SIZE)
- bytes_left = FRAGMENT_SIZE;
- int n = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY);
- if (n>=0) {
- data->written += n;
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.write_q_consumed += n;
- LEGACY_ACLK_STATS_UNLOCK;
- }
- }
- //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc);
- if (data->written == data->data_size)
- {
- lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
- lws_wss_packet_buffer_free(data);
- }
- if (engine_instance->write_buffer_head)
- lws_callback_on_writable(engine_instance->lws_wsi);
- }
- aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
- return retval;
-
- case LWS_CALLBACK_CLIENT_RECEIVE:
- aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
- if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len))
- retval = 1;
- aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.read_q_added += len;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-
- // to future myself -> do not call this while read lock is active as it will eventually
- // want to acquire same lock later in aclk_lws_wss_client_read() function
- aclk_lws_connection_data_received();
- return retval;
-
- case LWS_CALLBACK_WSI_CREATE:
- case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
- case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
- case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
- case LWS_CALLBACK_GET_THREAD_ID: // ?
- case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
- case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
- // Expected and safe to ignore.
-#ifdef ACLK_TRP_DEBUG_VERBOSE
- debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason));
-#endif
- return retval;
-
- default:
- // Pass to next switch, this case removes compiler warnings.
- break;
- }
- // Log to info - volume is proportional to connection attempts.
-#ifdef ACLK_TRP_DEBUG_VERBOSE
- info("Processing callback %s", aclk_lws_callback_name(reason));
-#endif
- switch (reason) {
- case LWS_CALLBACK_PROTOCOL_INIT:
- aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection
- break;
- case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
- if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi)
- error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi);
- engine_instance->lws_wsi = wsi;
- break;
- case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
- error(
- "Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host,
- engine_instance->port, (in ? (char *)in : "not given"));
- // Fall-through
- case LWS_CALLBACK_CLIENT_CLOSED:
- case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
- engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback
- aclk_lws_connection_closed();
- return -1; // the callback response is ignored, hope the above remains true
- case LWS_CALLBACK_WSI_DESTROY:
- aclk_lws_wss_clear_io_buffers();
- if (!engine_instance->websocket_connection_up)
- aclk_lws_wss_fail_report();
- engine_instance->lws_wsi = NULL;
- engine_instance->websocket_connection_up = 0;
- aclk_lws_connection_closed();
- break;
- case LWS_CALLBACK_CLIENT_ESTABLISHED:
- engine_instance->websocket_connection_up = 1;
- aclk_lws_connection_established(engine_instance->host, engine_instance->port);
- break;
-
- default:
-#ifdef ACLK_TRP_DEBUG_VERBOSE
- error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason));
-#endif
- break;
- }
- return retval; //0-OK, other connection should be closed!
-}
-
-int aclk_lws_wss_client_write(void *buf, size_t count)
-{
- if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
- aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
- lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count));
- aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
-
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.write_q_added += count;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-
- lws_callback_on_writable(engine_instance->lws_wsi);
- return count;
- }
- return 0;
-}
-
-int aclk_lws_wss_client_read(void *buf, size_t count)
-{
- size_t data_to_be_read = count;
-
- aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
- size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
- if (unlikely(readable_byte_count == 0)) {
- errno = EAGAIN;
- data_to_be_read = -1;
- goto abort;
- }
-
- if (readable_byte_count < data_to_be_read)
- data_to_be_read = readable_byte_count;
-
- data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read);
- if (data_to_be_read == readable_byte_count)
- engine_instance->data_to_read = 0;
-
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.read_q_consumed += data_to_be_read;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-
-abort:
- aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
- return data_to_be_read;
-}
-
-void aclk_lws_wss_service_loop()
-{
- if (engine_instance)
- {
- /*if (engine_instance->lws_wsi) {
- lws_cancel_service(engine_instance->lws_context);
- lws_callback_on_writable(engine_instance->lws_wsi);
- }*/
- lws_service(engine_instance->lws_context, 0);
- }
-}
-
-// in case the MQTT connection disconnect while lws transport is still operational
-// we should drop connection and reconnect
-// this function should be called when that happens to notify lws of that situation
-void aclk_lws_wss_mqtt_layer_disconnect_notif()
-{
- if (!engine_instance)
- return;
- if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
- engine_instance->upstream_reconnect_request = 1;
- lws_callback_on_writable(
- engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written.
- }
-}
diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h
deleted file mode 100644
index c68649cf3..000000000
--- a/aclk/legacy/aclk_lws_wss_client.h
+++ /dev/null
@@ -1,92 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef ACLK_LWS_WSS_CLIENT_H
-#define ACLK_LWS_WSS_CLIENT_H
-
-#include <libwebsockets.h>
-
-#include "libnetdata/libnetdata.h"
-
-// This is as define because ideally the ACLK at high level
-// can do mosquitto writes and reads only from one thread
-// which is cleaner implementation IMHO
-// in such case this mutexes are not necessary and life
-// is simpler
-#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1
-
-#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES (128 * 1024)
-
-#define ACLK_LWS_CALLBACK_HISTORY 10
-
-#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
-#define aclk_lws_mutex_init(x) netdata_mutex_init(x)
-#define aclk_lws_mutex_lock(x) netdata_mutex_lock(x)
-#define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x)
-#else
-#define aclk_lws_mutex_init(x)
-#define aclk_lws_mutex_lock(x)
-#define aclk_lws_mutex_unlock(x)
-#endif
-
-struct aclk_lws_wss_engine_callbacks {
- void (*connection_established_callback)();
- void (*data_rcvd_callback)();
- void (*data_writable_callback)();
- void (*connection_closed)();
-};
-
-struct lws_wss_packet_buffer {
- unsigned char *data;
- size_t data_size, written;
- struct lws_wss_packet_buffer *next;
-};
-
-struct aclk_lws_wss_engine_instance {
- //target host/port for connection
- char *host;
- int port;
-
- //internal data
- struct lws_context *lws_context;
- struct lws *lws_wsi;
-
-#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
- netdata_mutex_t write_buf_mutex;
- netdata_mutex_t read_buf_mutex;
-#endif
-
- struct lws_wss_packet_buffer *write_buffer_head;
- struct lws_ring *read_ringbuffer;
-
- //flags to be read by engine user
- int websocket_connection_up;
-
- // currently this is by default disabled
-
- int data_to_read;
- int upstream_reconnect_request;
-
- int lws_callback_history[ACLK_LWS_CALLBACK_HISTORY];
-};
-
-void aclk_lws_wss_client_destroy();
-void aclk_lws_wss_destroy_context();
-
-int aclk_lws_wss_connect(char *host, int port);
-
-int aclk_lws_wss_client_write(void *buf, size_t count);
-int aclk_lws_wss_client_read(void *buf, size_t count);
-void aclk_lws_wss_service_loop();
-
-void aclk_lws_wss_mqtt_layer_disconnect_notif();
-
-// Notifications inside the layer above
-void aclk_lws_connection_established();
-void aclk_lws_connection_data_received();
-void aclk_lws_connection_closed();
-void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
-
-void aclk_wss_set_proxy(struct lws_vhost *vhost);
-
-#define FRAGMENT_SIZE 4096
-#endif
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c
deleted file mode 100644
index 21eae11fd..000000000
--- a/aclk/legacy/aclk_query.c
+++ /dev/null
@@ -1,843 +0,0 @@
-#include "aclk_common.h"
-#include "aclk_query.h"
-#include "aclk_stats.h"
-#include "aclk_rx_msgs.h"
-#include "agent_cloud_link.h"
-
-#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
-
-#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
-
-pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER;
-pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
-#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait)
-#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait)
-
-#ifndef __GNUC__
-#pragma region ACLK_QUEUE
-#endif
-
-static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
-#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex)
-#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex)
-
-struct aclk_query {
- usec_t created;
- struct timeval tv_in;
- usec_t created_boot_time;
- time_t run_after; // Delay run until after this time
- ACLK_CMD cmd; // What command is this
- char *topic; // Topic to respond to
- char *data; // Internal data (NULL if request from the cloud)
- char *msg_id; // msg_id generated by the cloud (NULL if internal)
- char *query; // The actual query
- u_char deleted; // Mark deleted for garbage collect
- int idx; // index of query thread
- struct aclk_query *next;
-};
-
-struct aclk_query_queue {
- struct aclk_query *aclk_query_head;
- struct aclk_query *aclk_query_tail;
- unsigned int count;
-} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
-
-
-unsigned int aclk_query_size()
-{
- int r;
- ACLK_QUEUE_LOCK;
- r = aclk_queue.count;
- ACLK_QUEUE_UNLOCK;
- return r;
-}
-
-/*
- * Free a query structure when done
- */
-static void aclk_query_free(struct aclk_query *this_query)
-{
- if (unlikely(!this_query))
- return;
-
- freez(this_query->topic);
- if (likely(this_query->query))
- freez(this_query->query);
- if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) {
- struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data;
- freez(del->query_endpoint);
- freez(del->data);
- freez(del);
- }
- if (likely(this_query->msg_id))
- freez(this_query->msg_id);
- freez(this_query);
-}
-
-/*
- * Get the next query to process - NULL if nothing there
- * The caller needs to free memory by calling aclk_query_free()
- *
- * topic
- * query
- * The structure itself
- *
- */
-static struct aclk_query *aclk_queue_pop()
-{
- struct aclk_query *this_query;
-
- ACLK_QUEUE_LOCK;
-
- if (likely(!aclk_queue.aclk_query_head)) {
- ACLK_QUEUE_UNLOCK;
- return NULL;
- }
-
- this_query = aclk_queue.aclk_query_head;
-
- // Get rid of the deleted entries
- while (this_query && this_query->deleted) {
- aclk_queue.count--;
-
- aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
-
- if (likely(!aclk_queue.aclk_query_head)) {
- aclk_queue.aclk_query_tail = NULL;
- }
-
- aclk_query_free(this_query);
-
- this_query = aclk_queue.aclk_query_head;
- }
-
- if (likely(!this_query)) {
- ACLK_QUEUE_UNLOCK;
- return NULL;
- }
-
- if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
- info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
- ACLK_QUEUE_UNLOCK;
- return NULL;
- }
-
- aclk_queue.count--;
- aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
-
- if (likely(!aclk_queue.aclk_query_head)) {
- aclk_queue.aclk_query_tail = NULL;
- }
-
- ACLK_QUEUE_UNLOCK;
- return this_query;
-}
-
-// Returns the entry after which we need to create a new entry to run at the specified time
-// If NULL is returned we need to add to HEAD
-// Need to have a QUERY lock before calling this
-
-static struct aclk_query *aclk_query_find_position(time_t time_to_run)
-{
- struct aclk_query *tmp_query, *last_query;
-
- // Quick check if we will add to the end
- if (likely(aclk_queue.aclk_query_tail)) {
- if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
- return aclk_queue.aclk_query_tail;
- }
-
- last_query = NULL;
- tmp_query = aclk_queue.aclk_query_head;
-
- while (tmp_query) {
- if (tmp_query->run_after > time_to_run)
- return last_query;
- last_query = tmp_query;
- tmp_query = tmp_query->next;
- }
- return last_query;
-}
-
-// Need to have a QUERY lock before calling this
-static struct aclk_query *
-aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
-{
- struct aclk_query *tmp_query, *prev_query;
- UNUSED(cmd);
-
- tmp_query = aclk_queue.aclk_query_head;
- prev_query = NULL;
- while (tmp_query) {
- if (likely(!tmp_query->deleted)) {
- if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
- if ((!data || data == tmp_query->data) &&
- (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
- if (likely(last_query))
- *last_query = prev_query;
- return tmp_query;
- }
- }
- }
- prev_query = tmp_query;
- tmp_query = tmp_query->next;
- }
- return NULL;
-}
-
-/*
- * Add a query to execute, the result will be send to the specified topic
- */
-
-int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
-{
- struct aclk_query *new_query, *tmp_query;
-
- // Ignore all commands while we wait for the agent to initialize
- if (unlikely(!aclk_connected))
- return 1;
-
- run_after = now_realtime_sec() + run_after;
-
- ACLK_QUEUE_LOCK;
- struct aclk_query *last_query = NULL;
-
- tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
- if (unlikely(tmp_query)) {
- if (tmp_query->run_after == run_after) {
- ACLK_QUEUE_UNLOCK;
- LEGACY_QUERY_THREAD_WAKEUP;
- return 0;
- }
-
- if (last_query)
- last_query->next = tmp_query->next;
- else
- aclk_queue.aclk_query_head = tmp_query->next;
-
- debug(D_ACLK, "Removing double entry");
- aclk_query_free(tmp_query);
- aclk_queue.count--;
- }
-
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.queries_queued++;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-
- new_query = callocz(1, sizeof(struct aclk_query));
- new_query->cmd = aclk_cmd;
- if (internal) {
- new_query->topic = strdupz(topic);
- if (likely(query))
- new_query->query = strdupz(query);
- } else {
- new_query->topic = topic;
- new_query->query = query;
- new_query->msg_id = msg_id;
- }
-
- new_query->data = data;
- new_query->next = NULL;
- now_realtime_timeval(&new_query->tv_in);
- new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec;
- new_query->created_boot_time = now_boottime_usec();
- new_query->run_after = run_after;
-
- debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
-
- tmp_query = aclk_query_find_position(run_after);
-
- if (tmp_query) {
- new_query->next = tmp_query->next;
- tmp_query->next = new_query;
- if (tmp_query == aclk_queue.aclk_query_tail)
- aclk_queue.aclk_query_tail = new_query;
- aclk_queue.count++;
- ACLK_QUEUE_UNLOCK;
- LEGACY_QUERY_THREAD_WAKEUP;
- return 0;
- }
-
- new_query->next = aclk_queue.aclk_query_head;
- aclk_queue.aclk_query_head = new_query;
- aclk_queue.count++;
-
- ACLK_QUEUE_UNLOCK;
- LEGACY_QUERY_THREAD_WAKEUP;
- return 0;
-}
-
-#ifndef __GNUC__
-#pragma endregion
-#endif
-
-#ifndef __GNUC__
-#pragma region Helper Functions
-#endif
-
-/*
- * Take a buffer, encode it and rewrite it
- *
- */
-
-static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
-{
- char *tmp_buffer = mallocz(content_size * 2);
- char *dst = tmp_buffer;
- while (content_size > 0) {
- switch (*src) {
- case '\n':
- if (keep_newlines)
- {
- *dst++ = '\\';
- *dst++ = 'n';
- }
- break;
- case '\t':
- break;
- case 0x01 ... 0x08:
- case 0x0b ... 0x1F:
- *dst++ = '\\';
- *dst++ = 'u';
- *dst++ = '0';
- *dst++ = '0';
- *dst++ = (*src < 0x0F) ? '0' : '1';
- *dst++ = to_hex(*src);
- break;
- case '\"':
- *dst++ = '\\';
- *dst++ = *src;
- break;
- default:
- *dst++ = *src;
- }
- src++;
- content_size--;
- }
- *dst = '\0';
-
- return tmp_buffer;
-}
-
-#ifndef __GNUC__
-#pragma endregion
-#endif
-
-#ifndef __GNUC__
-#pragma region ACLK_QUERY
-#endif
-
-
-static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
-{
- usec_t t = now_boottime_usec();
- legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
-
- w->response.code = web_client_api_request_v1(host, w, url);
- t = now_boottime_usec() - t;
-
- legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t);
-
- return t;
-}
-
-static int aclk_execute_query(struct aclk_query *this_query)
-{
- if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
- struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
- w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
- w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
- w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
- w->acl = 0x1f;
-
- char *mysep = strchr(this_query->query, '?');
- if (mysep) {
- strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
- *mysep = '\0';
- } else
- strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
-
- mysep = strrchr(this_query->query, '/');
-
- // TODO: handle bad response perhaps in a different way. For now it does to the payload
- w->tv_in = this_query->tv_in;
- now_realtime_timeval(&w->tv_ready);
- aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
- size_t size = w->response.data->len;
- size_t sent = size;
- w->response.data->date = w->tv_ready.tv_sec;
- web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
- BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
- buffer_strcat(local_buffer, ",\n\t\"payload\": ");
- char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
- char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
-
- buffer_sprintf(
- local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
- w->response.code, encoded_response, encoded_header);
-
- buffer_sprintf(local_buffer, "\n}");
-
- debug(D_ACLK, "Response:%s", encoded_header);
-
- aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
-
- struct timeval tv;
- now_realtime_timeval(&tv);
-
- log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
- w->id
- , gettid()
- , this_query->idx
- , "DATA"
- , sent
- , size
- , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
- , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
- , dt_usec(&tv, &w->tv_ready) / 1000.0
- , dt_usec(&tv, &w->tv_in) / 1000.0
- , w->response.code
- , strip_control_characters(this_query->query)
- );
-
- buffer_free(w->response.data);
- buffer_free(w->response.header);
- buffer_free(w->response.header_output);
- freez(w);
- buffer_free(local_buffer);
- freez(encoded_response);
- freez(encoded_header);
- return 0;
- }
- return 1;
-}
-
-static int aclk_execute_query_v2(struct aclk_query *this_query)
-{
- int retval = 0;
- usec_t t;
- BUFFER *local_buffer = NULL;
- struct aclk_cloud_req_v2 *cloud_req = (struct aclk_cloud_req_v2 *)this_query->data;
-
-#ifdef NETDATA_WITH_ZLIB
- int z_ret;
- BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- char *start, *end;
-#endif
-
- struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
- w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
- w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
- w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
- w->acl = 0x1f;
-
- char *mysep = strchr(this_query->query, '?');
- if (mysep) {
- url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
- *mysep = '\0';
- } else
- url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
-
- mysep = strrchr(this_query->query, '/');
-
- // execute the query
- w->tv_in = this_query->tv_in;
- now_realtime_timeval(&w->tv_ready);
- t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
- size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
- size_t sent = size;
-
-#ifdef NETDATA_WITH_ZLIB
- // check if gzip encoding can and should be used
- if ((start = strstr(cloud_req->data, WEB_HDR_ACCEPT_ENC))) {
- start += strlen(WEB_HDR_ACCEPT_ENC);
- end = strstr(start, "\x0D\x0A");
- start = strstr(start, "gzip");
-
- if (start && start < end) {
- w->response.zstream.zalloc = Z_NULL;
- w->response.zstream.zfree = Z_NULL;
- w->response.zstream.opaque = Z_NULL;
- if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
- w->response.zinitialized = 1;
- w->response.zoutput = 1;
- } else
- error("Failed to initialize zlib. Proceeding without compression.");
- }
- }
-
- if (w->response.data->len && w->response.zinitialized) {
- w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
- w->response.zstream.avail_in = w->response.data->len;
- do {
- w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
- w->response.zstream.next_out = w->response.zbuffer;
- z_ret = deflate(&w->response.zstream, Z_FINISH);
- if(z_ret < 0) {
- if(w->response.zstream.msg)
- error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg);
- else
- error("Unknown error during zlib compression.");
- retval = 1;
- goto cleanup;
- }
- int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
- buffer_need_bytes(z_buffer, bytes_to_cpy);
- memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
- z_buffer->len += bytes_to_cpy;
- } while(z_ret != Z_STREAM_END);
- // so that web_client_build_http_header
- // puts correct content length into header
- buffer_free(w->response.data);
- w->response.data = z_buffer;
- z_buffer = NULL;
- }
-#endif
-
- w->response.data->date = w->tv_ready.tv_sec;
- web_client_build_http_header(w);
- local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
- buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code);
- buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A");
- buffer_strcat(local_buffer, w->response.header_output->buffer);
-
- if (w->response.data->len) {
-#ifdef NETDATA_WITH_ZLIB
- if (w->response.zinitialized) {
- buffer_need_bytes(local_buffer, w->response.data->len);
- memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
- local_buffer->len += w->response.data->len;
- sent = sent - size + w->response.data->len;
- } else {
-#endif
- buffer_strcat(local_buffer, w->response.data->buffer);
-#ifdef NETDATA_WITH_ZLIB
- }
-#endif
- }
-
- aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
-
- struct timeval tv;
- now_realtime_timeval(&tv);
-
- log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
- w->id
- , gettid()
- , this_query->idx
- , "DATA"
- , sent
- , size
- , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
- , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
- , dt_usec(&tv, &w->tv_ready) / 1000.0
- , dt_usec(&tv, &w->tv_in) / 1000.0
- , w->response.code
- , strip_control_characters(this_query->query)
- );
-cleanup:
-#ifdef NETDATA_WITH_ZLIB
- if(w->response.zinitialized)
- deflateEnd(&w->response.zstream);
- buffer_free(z_buffer);
-#endif
- buffer_free(w->response.data);
- buffer_free(w->response.header);
- buffer_free(w->response.header_output);
- freez(w);
- buffer_free(local_buffer);
- return retval;
-}
-
-#define ACLK_HOST_PTR_COMPULSORY(x) \
- if (unlikely(!host)) { \
- errno = 0; \
- error(x " needs host pointer"); \
- break; \
- }
-
-/*
- * This function will fetch the next pending command and process it
- *
- */
-static int aclk_process_query(struct aclk_query_thread *t_info)
-{
- struct aclk_query *this_query;
- static long int query_count = 0;
- ACLK_METADATA_STATE meta_state;
- RRDHOST *host;
-
- if (!aclk_connected)
- return 0;
-
- this_query = aclk_queue_pop();
- if (likely(!this_query)) {
- return 0;
- }
-
- if (unlikely(this_query->deleted)) {
- debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
- aclk_query_free(this_query);
- return 1;
- }
- query_count++;
-
- host = (RRDHOST*)this_query->data;
- this_query->idx = t_info->idx;
-
- debug(
- D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
- this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS);
-
- switch (this_query->cmd) {
- case ACLK_CMD_ONCONNECT:
- ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
-#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
- if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
- error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
- break;
- }
-#else
-#warning "This check became unnecessary. Remove"
-#endif
-
- debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"",
- host->hostname,
- host->machine_guid);
-
- rrdhost_aclk_state_lock(host);
- meta_state = host->aclk_state.metadata;
- host->aclk_state.metadata = ACLK_METADATA_SENT;
- rrdhost_aclk_state_unlock(host);
- aclk_send_metadata(meta_state, host);
- break;
-
- case ACLK_CMD_CHART:
- ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART");
-
- debug(D_ACLK, "EXECUTING a chart update command");
- aclk_send_single_chart(host, this_query->query);
- break;
-
- case ACLK_CMD_CHARTDEL:
- ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL");
-
- debug(D_ACLK, "EXECUTING a chart delete command");
- //TODO: This send the info metadata for now
- legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host);
- break;
-
- case ACLK_CMD_ALARM:
- debug(D_ACLK, "EXECUTING an alarm update command");
- aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
- break;
-
- case ACLK_CMD_CLOUD:
- debug(D_ACLK, "EXECUTING a cloud command");
- aclk_execute_query(this_query);
- break;
- case ACLK_CMD_CLOUD_QUERY_2:
- debug(D_ACLK, "EXECUTING Cloud Query v2");
- aclk_execute_query_v2(this_query);
- break;
-
- case ACLK_CMD_CHILD_CONNECT:
- case ACLK_CMD_CHILD_DISCONNECT:
- ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT");
-
- debug(
- D_ACLK, "Execution Child %s command",
- this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect");
- aclk_send_info_child_connection(host, this_query->cmd);
- break;
-
- default:
- errno = 0;
- error("Unknown ACLK Query Command");
- break;
- }
- debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
-
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.queries_dispatched++;
- legacy_aclk_queries_per_thread[t_info->idx]++;
- LEGACY_ACLK_STATS_UNLOCK;
-
- if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) {
- getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]);
- getrusage_called_this_tick[t_info->idx]++;
- }
-
- }
-
- aclk_query_free(this_query);
-
- return 1;
-}
-
-void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
-{
- if (query_threads && query_threads->thread_list) {
- for (int i = 0; i < query_threads->count; i++) {
- netdata_thread_join(query_threads->thread_list[i].thread, NULL);
- }
- freez(query_threads->thread_list);
- }
-
- struct aclk_query *this_query;
-
- do {
- this_query = aclk_queue_pop();
- aclk_query_free(this_query);
- } while (this_query);
-}
-
-#define TASK_LEN_MAX 22
-void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads)
-{
- info("Starting %d query threads.", query_threads->count);
-
- char thread_name[TASK_LEN_MAX];
- query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
- for (int i = 0; i < query_threads->count; i++) {
- query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
-
- if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
- error("snprintf encoding error");
- netdata_thread_create(
- &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread,
- &query_threads->thread_list[i]);
- }
-}
-
-/**
- * Checks and updates popcorning state of rrdhost
- * returns actual/updated popcorning state
- */
-
-ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host)
-{
- rrdhost_aclk_state_lock(host);
- ACLK_AGENT_STATE ret = host->aclk_state.state;
- if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
- rrdhost_aclk_state_unlock(host);
- return ret;
- }
-
- if (!host->aclk_state.t_last_popcorn_update){
- rrdhost_aclk_state_unlock(host);
- return ret;
- }
-
- time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update;
-
- if (t_diff >= ACLK_STABLE_TIMEOUT) {
- host->aclk_state.state = ACLK_HOST_STABLE;
- host->aclk_state.t_last_popcorn_update = 0;
- rrdhost_aclk_state_unlock(host);
- info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff);
- return ACLK_HOST_STABLE;
- }
-
- rrdhost_aclk_state_unlock(host);
- return ret;
-}
-
-/**
- * Main query processing thread
- *
- * On startup wait for the agent collectors to initialize
- * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
- * of no new collectors coming in in order to mark the agent
- * as stable (set agent_state = AGENT_STABLE)
- */
-void *legacy_aclk_query_main_thread(void *ptr)
-{
- struct aclk_query_thread *info = ptr;
-
- while (!netdata_exit) {
- if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) {
-#ifdef ACLK_DEBUG
- _dump_collector_list();
-#endif
- break;
- }
- sleep_usec(USEC_PER_SEC * 1);
- }
-
- while (!netdata_exit) {
- if(aclk_disable_runtime) {
- sleep(1);
- continue;
- }
- legacy_aclk_shared_state_LOCK;
- if (unlikely(!legacy_aclk_shared_state.version_neg)) {
- if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
- legacy_aclk_shared_state_UNLOCK;
- info("Waiting for ACLK Version Negotiation message from Cloud");
- sleep(1);
- continue;
- }
- info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds."
- " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
- legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN;
- aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg);
- }
- legacy_aclk_shared_state_UNLOCK;
-
- rrdhost_aclk_state_lock(localhost);
- if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
- if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
- rrdhost_aclk_state_unlock(localhost);
- errno = 0;
- error("ACLK failed to queue on_connect command");
- sleep(1);
- continue;
- }
- localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED;
- }
- rrdhost_aclk_state_unlock(localhost);
-
- legacy_aclk_shared_state_LOCK;
- if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
- legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
- legacy_aclk_shared_state.next_popcorn_host = NULL;
- aclk_update_next_child_to_popcorn();
- }
- legacy_aclk_shared_state_UNLOCK;
-
- while (aclk_process_query(info)) {
- // Process all commands
- };
-
- LEGACY_QUERY_THREAD_LOCK;
-
- // TODO: Need to check if there are queries awaiting already
- if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait)))
- sleep_usec(USEC_PER_SEC * 1);
-
- LEGACY_QUERY_THREAD_UNLOCK;
- }
-
- return NULL;
-}
-
-#ifndef __GNUC__
-#pragma endregion
-#endif
diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h
deleted file mode 100644
index 622b66e2c..000000000
--- a/aclk/legacy/aclk_query.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_ACLK_QUERY_H
-#define NETDATA_ACLK_QUERY_H
-
-#include "libnetdata/libnetdata.h"
-#include "web/server/web_client.h"
-
-#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
-
-#define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread.
-
-extern pthread_cond_t legacy_query_cond_wait;
-extern pthread_mutex_t legacy_query_lock_wait;
-extern uint8_t *getrusage_called_this_tick;
-#define LEGACY_QUERY_THREAD_WAKEUP pthread_cond_signal(&legacy_query_cond_wait)
-#define LEGACY_QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&legacy_query_cond_wait)
-struct aclk_query_thread {
- netdata_thread_t thread;
- int idx;
-};
-
-struct aclk_query_threads {
- struct aclk_query_thread *thread_list;
- int count;
-};
-
-struct aclk_cloud_req_v2 {
- char *data;
- RRDHOST *host;
- char *query_endpoint;
-};
-
-void *legacy_aclk_query_main_thread(void *ptr);
-int legacy_aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
-
-void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads);
-void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
-unsigned int aclk_query_size();
-
-#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
deleted file mode 100644
index d4778bbcf..000000000
--- a/aclk/legacy/aclk_rx_msgs.c
+++ /dev/null
@@ -1,388 +0,0 @@
-
-#include "aclk_rx_msgs.h"
-
-#include "aclk_common.h"
-#include "aclk_stats.h"
-#include "aclk_query.h"
-#include "agent_cloud_link.h"
-
-#ifndef UUID_STR_LEN
-#define UUID_STR_LEN 37
-#endif
-
-static inline int aclk_extract_v2_data(char *payload, char **data)
-{
- char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR);
- if(!ptr)
- return 1;
- ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR);
- *data = strdupz(ptr);
- return 0;
-}
-
-#define ACLK_GET_REQ "GET "
-#define ACLK_CHILD_REQ "/host/"
-#define ACLK_CLOUD_REQ_V2_PREFIX "/api/v1/"
-#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref))
-static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req)
-{
- const char *start, *end, *ptr, *query_type;
- char uuid_str[UUID_STR_LEN];
- uuid_t uuid;
-
- errno = 0;
-
- if(STRNCMP_CONSTANT_PREFIX(cloud_req->data, ACLK_GET_REQ)) {
- error("Only accepting GET HTTP requests from CLOUD");
- return 1;
- }
- start = ptr = cloud_req->data + strlen(ACLK_GET_REQ);
-
- if(!STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CHILD_REQ)) {
- ptr += strlen(ACLK_CHILD_REQ);
- if(strlen(ptr) < UUID_STR_LEN) {
- error("the child id in URL too short \"%s\"", start);
- return 1;
- }
-
- strncpyz(uuid_str, ptr, UUID_STR_LEN - 1);
-
- for(int i = 0; i < UUID_STR_LEN && uuid_str[i]; i++)
- uuid_str[i] = tolower(uuid_str[i]);
-
- if(ptr[0] && uuid_parse(uuid_str, uuid)) {
- error("Got Child query (/host/XXX/...) host id \"%s\" doesn't look like valid GUID", uuid_str);
- return 1;
- }
- ptr += UUID_STR_LEN - 1;
-
- cloud_req->host = rrdhost_find_by_guid(uuid_str, 0);
- if(!cloud_req->host) {
- error("Cannot find host with GUID \"%s\"", uuid_str);
- return 1;
- }
- }
-
- if(STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CLOUD_REQ_V2_PREFIX)) {
- error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
- return 1;
- }
- ptr += strlen(ACLK_CLOUD_REQ_V2_PREFIX);
- query_type = ptr;
-
- if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) {
- errno = 0;
- error("Doesn't look like HTTP GET request.");
- return 1;
- }
-
- if(!(ptr = strchr(ptr, '?')) || ptr > end)
- ptr = end;
- cloud_req->query_endpoint = mallocz((ptr - query_type) + 1);
- strncpyz(cloud_req->query_endpoint, query_type, ptr - query_type);
-
- req->payload = mallocz((end - start) + 1);
- strncpyz(req->payload, start, end - start);
-
- return 0;
-}
-
-#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\
- if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\
- debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
- rrdhost_aclk_state_unlock(localhost);\
- return 1;\
- }\
- rrdhost_aclk_state_unlock(localhost);
-
-/*
- * Parse the incoming payload and queue a command if valid
- */
-static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, char *raw_payload)
-{
- UNUSED(raw_payload);
- HTTP_CHECK_AGENT_INITIALIZED();
-
- errno = 0;
- if (unlikely(cloud_to_agent->version != 1)) {
- error(
- "Received \"http\" message from Cloud with version %d, but ACLK version %d is used",
- cloud_to_agent->version,
- legacy_aclk_shared_state.version_neg);
- return 1;
- }
-
- if (unlikely(!cloud_to_agent->payload)) {
- error("payload missing");
- return 1;
- }
-
- if (unlikely(!cloud_to_agent->callback_topic)) {
- error("callback_topic missing");
- return 1;
- }
-
- if (unlikely(!cloud_to_agent->msg_id)) {
- error("msg_id missing");
- return 1;
- }
-
- if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
- debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
-
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.cloud_req_v1++;
- legacy_aclk_metrics_per_sample.cloud_req_ok++;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-
- return 0;
-}
-
-static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
-{
- HTTP_CHECK_AGENT_INITIALIZED();
-
- struct aclk_cloud_req_v2 *cloud_req;
- char *data;
- int stat_idx;
-
- errno = 0;
- if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
- error(
- "This handler cannot reply to request with version older than %d, received %d.",
- ACLK_V_COMPRESSION,
- cloud_to_agent->version);
- return 1;
- }
-
- if (unlikely(aclk_extract_v2_data(raw_payload, &data))) {
- error("Error extracting payload expected after the JSON dictionary.");
- return 1;
- }
-
- cloud_req = mallocz(sizeof(struct aclk_cloud_req_v2));
- cloud_req->data = data;
- cloud_req->host = localhost;
-
- if (unlikely(aclk_v2_payload_get_query(cloud_req, cloud_to_agent))) {
- error("Could not extract payload from query");
- goto cleanup;
- }
-
- if (unlikely(!cloud_to_agent->callback_topic)) {
- error("Missing callback_topic");
- goto cleanup;
- }
-
- if (unlikely(!cloud_to_agent->msg_id)) {
- error("Missing msg_id");
- goto cleanup;
- }
-
- // we do this here due to cloud_req being taken over by query thread
- // which if crazy quick can free it after legacy_aclk_queue_query
- stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint);
-
- // legacy_aclk_queue_query takes ownership of data pointer
- if (unlikely(legacy_aclk_queue_query(
- cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
- ACLK_CMD_CLOUD_QUERY_2))) {
- error("ACLK failed to queue incoming \"http\" v2 message");
- goto cleanup;
- }
-
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.cloud_req_v2++;
- legacy_aclk_metrics_per_sample.cloud_req_ok++;
- legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-
- return 0;
-cleanup:
- freez(cloud_req->query_endpoint);
- freez(cloud_req->data);
- freez(cloud_req);
- return 1;
-}
-
-// This handles `version` message from cloud used to negotiate
-// protocol version we will use
-static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload)
-{
- UNUSED(raw_payload);
- int version = -1;
- errno = 0;
-
- if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
- error(
- "Unsupported version of \"version\" message from cloud. Expected %d, Got %d",
- ACLK_VERSION_NEG_VERSION,
- cloud_to_agent->version);
- return 1;
- }
- if (unlikely(!cloud_to_agent->min_version)) {
- error("Min version missing or 0");
- return 1;
- }
- if (unlikely(!cloud_to_agent->max_version)) {
- error("Max version missing or 0");
- return 1;
- }
- if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
- error(
- "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version,
- cloud_to_agent->min_version);
- return 1;
- }
-
- if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
- error(
- "Agent too old for this cloud. Minimum version required by cloud %d."
- " Maximum version supported by this agent %d.",
- cloud_to_agent->min_version, ACLK_VERSION_MAX);
- aclk_kill_link = 1;
- aclk_disable_runtime = 1;
- return 1;
- }
- if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
- error(
- "Cloud version is too old for this agent. Maximum version supported by cloud %d."
- " Minimum (oldest) version supported by this agent %d.",
- cloud_to_agent->max_version, ACLK_VERSION_MIN);
- aclk_kill_link = 1;
- return 1;
- }
-
- version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
-
- legacy_aclk_shared_state_LOCK;
- if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) {
- errno = 0;
- error("The \"version\" message came too late ignoring.");
- goto err_cleanup;
- }
- if (unlikely(legacy_aclk_shared_state.version_neg)) {
- errno = 0;
- error("Version has already been set to %d", legacy_aclk_shared_state.version_neg);
- goto err_cleanup;
- }
- legacy_aclk_shared_state.version_neg = version;
- legacy_aclk_shared_state_UNLOCK;
-
- info("Choosing version %d of ACLK", version);
-
- aclk_set_rx_handlers(version);
-
- return 0;
-
-err_cleanup:
- legacy_aclk_shared_state_UNLOCK;
- return 1;
-}
-
-typedef struct aclk_incoming_msg_type{
- char *name;
- int(*fnc)(struct aclk_request *, char *);
-}aclk_incoming_msg_type;
-
-aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = {
- { .name = "http", .fnc = aclk_handle_cloud_request_v1 },
- { .name = "version", .fnc = aclk_handle_version_response },
- { .name = NULL, .fnc = NULL }
-};
-
-aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = {
- { .name = "http", .fnc = aclk_handle_cloud_request_v2 },
- { .name = "version", .fnc = aclk_handle_version_response },
- { .name = NULL, .fnc = NULL }
-};
-
-struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
-
-void aclk_set_rx_handlers(int version)
-{
- if(version >= ACLK_V_COMPRESSION) {
- legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression;
- return;
- }
-
- legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
-}
-
-int legacy_aclk_handle_cloud_message(char *payload)
-{
- struct aclk_request cloud_to_agent;
- memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
-
- if (unlikely(!payload)) {
- errno = 0;
- error("ACLK incoming message is empty");
- goto err_cleanup_nojson;
- }
-
- debug(D_ACLK, "ACLK incoming message (%s)", payload);
-
- int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse);
-
- if (unlikely(rc != JSON_OK)) {
- errno = 0;
- error("Malformed json request (%s)", payload);
- goto err_cleanup;
- }
-
- if (!cloud_to_agent.type_id) {
- errno = 0;
- error("Cloud message is missing compulsory key \"type\"");
- goto err_cleanup;
- }
-
- if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
- error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
- goto err_cleanup;
- }
-
- for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) {
- if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) {
- if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
- // in case of success handler is supposed to clean up after itself
- // or as in the case of aclk_handle_cloud_request take
- // ownership of the pointers (done to avoid copying)
- // see what `legacy_aclk_queue_query` parameter `internal` does
-
- // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
- // msg handlers (namely aclk_handle_version_response)
- // can freely change what legacy_aclk_incoming_msg_types points to
- // so either exit or restart this for loop
- freez(cloud_to_agent.type_id);
- return 0;
- }
- goto err_cleanup;
- }
- }
-
- errno = 0;
- error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
-
-err_cleanup:
- if (cloud_to_agent.payload)
- freez(cloud_to_agent.payload);
- if (cloud_to_agent.type_id)
- freez(cloud_to_agent.type_id);
- if (cloud_to_agent.msg_id)
- freez(cloud_to_agent.msg_id);
- if (cloud_to_agent.callback_topic)
- freez(cloud_to_agent.callback_topic);
-
-err_cleanup_nojson:
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics_per_sample.cloud_req_err++;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-
- return 1;
-}
diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h
deleted file mode 100644
index f1f99114f..000000000
--- a/aclk/legacy/aclk_rx_msgs.h
+++ /dev/null
@@ -1,13 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_ACLK_RX_MSGS_H
-#define NETDATA_ACLK_RX_MSGS_H
-
-#include "daemon/common.h"
-#include "libnetdata/libnetdata.h"
-
-int legacy_aclk_handle_cloud_message(char *payload);
-void aclk_set_rx_handlers(int version);
-
-
-#endif /* NETDATA_ACLK_RX_MSGS_H */
diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c
deleted file mode 100644
index fbbb322a1..000000000
--- a/aclk/legacy/aclk_stats.c
+++ /dev/null
@@ -1,411 +0,0 @@
-#include "aclk_stats.h"
-
-netdata_mutex_t legacy_aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
-
-int legacy_query_thread_count;
-
-// data ACLK stats need per query thread
-struct legacy_aclk_qt_data {
- RRDDIM *dim;
-} *legacy_aclk_qt_data = NULL;
-
-// ACLK per query thread cpu stats
-struct legacy_aclk_cpu_data {
- RRDDIM *user;
- RRDDIM *system;
- RRDSET *st;
-} *legacy_aclk_cpu_data = NULL;
-
-uint32_t *legacy_aclk_queries_per_thread = NULL;
-uint32_t *legacy_aclk_queries_per_thread_sample = NULL;
-struct rusage *rusage_per_thread;
-uint8_t *getrusage_called_this_tick = NULL;
-
-static struct legacy_aclk_metrics legacy_aclk_metrics = {
- .online = 0,
-};
-
-struct legacy_aclk_metrics_per_sample legacy_aclk_metrics_per_sample;
-
-struct aclk_mat_metrics aclk_mat_metrics = {
-#ifdef NETDATA_INTERNAL_CHECKS
- .latency = { .name = "aclk_latency_mqtt",
- .prio = 200002,
- .st = NULL,
- .rd_avg = NULL,
- .rd_max = NULL,
- .rd_total = NULL,
- .unit = "ms",
- .title = "ACLK Message Publish Latency" },
-#endif
-
- .cloud_q_db_query_time = { .name = "aclk_db_query_time",
- .prio = 200006,
- .st = NULL,
- .rd_avg = NULL,
- .rd_max = NULL,
- .rd_total = NULL,
- .unit = "us",
- .title = "Time it took to process cloud requested DB queries" },
-
- .cloud_q_recvd_to_processed = { .name = "aclk_cloud_q_recvd_to_processed",
- .prio = 200007,
- .st = NULL,
- .rd_avg = NULL,
- .rd_max = NULL,
- .rd_total = NULL,
- .unit = "us",
- .title = "Time from receiving the Cloud Query until it was picked up "
- "by query thread (just before passing to the database)." }
-};
-
-void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement)
-{
- if (aclk_stats_enabled) {
- LEGACY_ACLK_STATS_LOCK;
- if (metric->max < measurement)
- metric->max = measurement;
-
- metric->total += measurement;
- metric->count++;
- LEGACY_ACLK_STATS_UNLOCK;
- }
-}
-
-static void aclk_stats_collect(struct legacy_aclk_metrics_per_sample *per_sample, struct legacy_aclk_metrics *permanent)
-{
- static RRDSET *st_aclkstats = NULL;
- static RRDDIM *rd_online_status = NULL;
-
- if (unlikely(!st_aclkstats)) {
- st_aclkstats = rrdset_create_localhost(
- "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status",
- "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE);
-
- rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st_aclkstats);
-
- rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online);
-
- rrdset_done(st_aclkstats);
-}
-
-static void aclk_stats_query_queue(struct legacy_aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st_query_thread = NULL;
- static RRDDIM *rd_queued = NULL;
- static RRDDIM *rd_dispatched = NULL;
-
- if (unlikely(!st_query_thread)) {
- st_query_thread = rrdset_create_localhost(
- "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s",
- "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA);
-
- rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st_query_thread);
-
- rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued);
- rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched);
-
- rrdset_done(st_query_thread);
-}
-
-static void aclk_stats_write_q(struct legacy_aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st = NULL;
- static RRDDIM *rd_wq_add = NULL;
- static RRDDIM *rd_wq_consumed = NULL;
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "KiB/s",
- "netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA);
-
- rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_wq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
-
- rrddim_set_by_pointer(st, rd_wq_add, per_sample->write_q_added);
- rrddim_set_by_pointer(st, rd_wq_consumed, per_sample->write_q_consumed);
-
- rrdset_done(st);
-}
-
-static void aclk_stats_read_q(struct legacy_aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st = NULL;
- static RRDDIM *rd_rq_add = NULL;
- static RRDDIM *rd_rq_consumed = NULL;
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "KiB/s",
- "netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA);
-
- rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_rq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
-
- rrddim_set_by_pointer(st, rd_rq_add, per_sample->read_q_added);
- rrddim_set_by_pointer(st, rd_rq_consumed, per_sample->read_q_consumed);
-
- rrdset_done(st);
-}
-
-static void aclk_stats_cloud_req(struct legacy_aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st = NULL;
- static RRDDIM *rd_rq_ok = NULL;
- static RRDDIM *rd_rq_err = NULL;
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
- "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
-
- rd_rq_ok = rrddim_add(st, "accepted", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_rq_err = rrddim_add(st, "rejected", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
-
- rrddim_set_by_pointer(st, rd_rq_ok, per_sample->cloud_req_ok);
- rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
-
- rrdset_done(st);
-}
-
-static void aclk_stats_cloud_req_version(struct legacy_aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st = NULL;
- static RRDDIM *rd_rq_v1 = NULL;
- static RRDDIM *rd_rq_v2 = NULL;
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_cloud_req_version", NULL, "aclk", NULL, "Requests received from cloud by their version", "req/s",
- "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
-
- rd_rq_v1 = rrddim_add(st, "v1", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_rq_v2 = rrddim_add(st, "v2+", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
-
- rrddim_set_by_pointer(st, rd_rq_v1, per_sample->cloud_req_v1);
- rrddim_set_by_pointer(st, rd_rq_v2, per_sample->cloud_req_v2);
-
- rrdset_done(st);
-}
-
-static char *cloud_req_type_names[ACLK_STATS_CLOUD_REQ_TYPE_CNT] = {
- "other",
- "info",
- "data",
- "alarms",
- "alarm_log",
- "chart",
- "charts"
- // if you change update:
- // #define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
-};
-
-int aclk_cloud_req_type_to_idx(const char *name)
-{
- for (int i = 1; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
- if (!strcmp(cloud_req_type_names[i], name))
- return i;
- return 0;
-}
-
-static void aclk_stats_cloud_req_cmd(struct legacy_aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st;
- static int initialized = 0;
- static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_REQ_TYPE_CNT];
-
- if (unlikely(!initialized)) {
- initialized = 1;
- st = rrdset_create_localhost(
- "netdata", "aclk_cloud_req_cmd", NULL, "aclk", NULL, "Requests received from cloud by their type (api endpoint queried)", "req/s",
- "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
-
- for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
- rd_rq_types[i] = rrddim_add(st, cloud_req_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
-
- for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
- rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_by_type[i]);
-
- rrdset_done(st);
-}
-
-#define MAX_DIM_NAME 22
-static void aclk_stats_query_threads(uint32_t *queries_per_thread)
-{
- static RRDSET *st = NULL;
-
- char dim_name[MAX_DIM_NAME];
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
- "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
-
- for (int i = 0; i < legacy_query_thread_count; i++) {
- if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
- error("snprintf encoding error");
- legacy_aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- }
- } else
- rrdset_next(st);
-
- for (int i = 0; i < legacy_query_thread_count; i++) {
- rrddim_set_by_pointer(st, legacy_aclk_qt_data[i].dim, queries_per_thread[i]);
- }
-
- rrdset_done(st);
-}
-
-static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct aclk_metric_mat_data *data)
-{
- if(unlikely(!metric->st)) {
- metric->st = rrdset_create_localhost(
- "netdata", metric->name, NULL, "aclk", NULL, metric->title, metric->unit, "netdata", "stats", metric->prio,
- localhost->rrd_update_every, RRDSET_TYPE_LINE);
-
- metric->rd_avg = rrddim_add(metric->st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- metric->rd_max = rrddim_add(metric->st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- metric->rd_total = rrddim_add(metric->st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(metric->st);
-
- if(data->count)
- rrddim_set_by_pointer(metric->st, metric->rd_avg, roundf((float)data->total / data->count));
- else
- rrddim_set_by_pointer(metric->st, metric->rd_avg, 0);
- rrddim_set_by_pointer(metric->st, metric->rd_max, data->max);
- rrddim_set_by_pointer(metric->st, metric->rd_total, data->total);
-
- rrdset_done(metric->st);
-}
-
-static void aclk_stats_cpu_threads(void)
-{
- char id[100 + 1];
- char title[100 + 1];
-
- for (int i = 0; i < legacy_query_thread_count; i++) {
- if (unlikely(!legacy_aclk_cpu_data[i].st)) {
-
- snprintfz(id, 100, "aclk_thread%d_cpu", i);
- snprintfz(title, 100, "Cpu Usage For Thread No %d", i);
-
- legacy_aclk_cpu_data[i].st = rrdset_create_localhost(
- "netdata", id, NULL, "aclk", NULL, title, "milliseconds/s",
- "netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
-
- legacy_aclk_cpu_data[i].user = rrddim_add(legacy_aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- legacy_aclk_cpu_data[i].system = rrddim_add(legacy_aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
-
- } else
- rrdset_next(legacy_aclk_cpu_data[i].st);
- }
-
- for (int i = 0; i < legacy_query_thread_count; i++) {
- rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec);
- rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec);
- rrdset_done(legacy_aclk_cpu_data[i].st);
- }
-}
-
-void legacy_aclk_stats_thread_cleanup()
-{
- freez(legacy_aclk_qt_data);
- freez(legacy_aclk_queries_per_thread);
- freez(legacy_aclk_queries_per_thread_sample);
- freez(legacy_aclk_cpu_data);
- freez(rusage_per_thread);
-}
-
-void *legacy_aclk_stats_main_thread(void *ptr)
-{
- struct aclk_stats_thread *args = ptr;
-
- legacy_query_thread_count = args->query_thread_count;
- legacy_aclk_qt_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_qt_data));
- legacy_aclk_cpu_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_cpu_data));
- legacy_aclk_queries_per_thread = callocz(legacy_query_thread_count, sizeof(uint32_t));
- legacy_aclk_queries_per_thread_sample = callocz(legacy_query_thread_count, sizeof(uint32_t));
- rusage_per_thread = callocz(legacy_query_thread_count, sizeof(struct rusage));
- getrusage_called_this_tick = callocz(legacy_query_thread_count, sizeof(uint8_t));
-
- heartbeat_t hb;
- heartbeat_init(&hb);
- usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
-
- memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample));
-
- struct legacy_aclk_metrics_per_sample per_sample;
- struct legacy_aclk_metrics permanent;
-
- while (!netdata_exit) {
- netdata_thread_testcancel();
- // ------------------------------------------------------------------------
- // Wait for the next iteration point.
-
- heartbeat_next(&hb, step_ut);
- if (netdata_exit) break;
-
- LEGACY_ACLK_STATS_LOCK;
- // to not hold lock longer than necessary, especially not to hold it
- // during database rrd* operations
- memcpy(&per_sample, &legacy_aclk_metrics_per_sample, sizeof(struct legacy_aclk_metrics_per_sample));
- memcpy(&permanent, &legacy_aclk_metrics, sizeof(struct legacy_aclk_metrics));
- memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample));
-
- memcpy(legacy_aclk_queries_per_thread_sample, legacy_aclk_queries_per_thread, sizeof(uint32_t) * legacy_query_thread_count);
- memset(legacy_aclk_queries_per_thread, 0, sizeof(uint32_t) * legacy_query_thread_count);
- memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * legacy_query_thread_count);
- LEGACY_ACLK_STATS_UNLOCK;
-
- aclk_stats_collect(&per_sample, &permanent);
- aclk_stats_query_queue(&per_sample);
-
- aclk_stats_write_q(&per_sample);
- aclk_stats_read_q(&per_sample);
-
- aclk_stats_cloud_req(&per_sample);
- aclk_stats_cloud_req_version(&per_sample);
-
- aclk_stats_cloud_req_cmd(&per_sample);
-
- aclk_stats_query_threads(legacy_aclk_queries_per_thread_sample);
-
- aclk_stats_cpu_threads();
-
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency);
-#endif
- aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_db_query_time, &per_sample.cloud_q_db_query_time);
- aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_recvd_to_processed, &per_sample.cloud_q_recvd_to_processed);
- }
-
- return 0;
-}
-
-void legacy_aclk_stats_upd_online(int online) {
- if(!aclk_stats_enabled)
- return;
-
- LEGACY_ACLK_STATS_LOCK;
- legacy_aclk_metrics.online = online;
-
- if(!online)
- legacy_aclk_metrics_per_sample.offline_during_sample = 1;
- LEGACY_ACLK_STATS_UNLOCK;
-}
diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h
deleted file mode 100644
index 560de3b5e..000000000
--- a/aclk/legacy/aclk_stats.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_ACLK_STATS_H
-#define NETDATA_ACLK_STATS_H
-
-#include "daemon/common.h"
-#include "libnetdata/libnetdata.h"
-#include "aclk_common.h"
-
-#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
-
-extern netdata_mutex_t legacy_aclk_stats_mutex;
-
-#define LEGACY_ACLK_STATS_LOCK netdata_mutex_lock(&legacy_aclk_stats_mutex)
-#define LEGACY_ACLK_STATS_UNLOCK netdata_mutex_unlock(&legacy_aclk_stats_mutex)
-
-struct aclk_stats_thread {
- netdata_thread_t *thread;
- int query_thread_count;
-};
-
-// preserve between samples
-struct legacy_aclk_metrics {
- volatile uint8_t online;
-};
-
-//mat = max average total
-struct aclk_metric_mat_data {
- volatile uint32_t total;
- volatile uint32_t count;
- volatile uint32_t max;
-};
-
-//mat = max average total
-struct aclk_metric_mat {
- char *name;
- char *title;
- RRDSET *st;
- RRDDIM *rd_avg;
- RRDDIM *rd_max;
- RRDDIM *rd_total;
- long prio;
- char *unit;
-};
-
-extern struct aclk_mat_metrics {
-#ifdef NETDATA_INTERNAL_CHECKS
- struct aclk_metric_mat latency;
-#endif
- struct aclk_metric_mat cloud_q_db_query_time;
- struct aclk_metric_mat cloud_q_recvd_to_processed;
-} aclk_mat_metrics;
-
-void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
-
-#define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
-// if you change update cloud_req_type_names
-
-int aclk_cloud_req_type_to_idx(const char *name);
-
-// reset to 0 on every sample
-extern struct legacy_aclk_metrics_per_sample {
- /* in the unlikely event of ACLK disconnecting
- and reconnecting under 1 sampling rate
- we want to make sure we record the disconnection
- despite it being then seemingly longer in graph */
- volatile uint8_t offline_during_sample;
-
- volatile uint32_t queries_queued;
- volatile uint32_t queries_dispatched;
-
- volatile uint32_t write_q_added;
- volatile uint32_t write_q_consumed;
-
- volatile uint32_t read_q_added;
- volatile uint32_t read_q_consumed;
-
- volatile uint32_t cloud_req_ok;
- volatile uint32_t cloud_req_err;
-
- volatile uint16_t cloud_req_v1;
- volatile uint16_t cloud_req_v2;
-
- volatile uint16_t cloud_req_by_type[ACLK_STATS_CLOUD_REQ_TYPE_CNT];
-
-#ifdef NETDATA_INTERNAL_CHECKS
- struct aclk_metric_mat_data latency;
-#endif
- struct aclk_metric_mat_data cloud_q_db_query_time;
- struct aclk_metric_mat_data cloud_q_recvd_to_processed;
-} legacy_aclk_metrics_per_sample;
-
-extern uint32_t *legacy_aclk_queries_per_thread;
-extern struct rusage *rusage_per_thread;
-
-void *legacy_aclk_stats_main_thread(void *ptr);
-void legacy_aclk_stats_thread_cleanup();
-void legacy_aclk_stats_upd_online(int online);
-
-#endif /* NETDATA_ACLK_STATS_H */
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c
deleted file mode 100644
index 80ca23971..000000000
--- a/aclk/legacy/agent_cloud_link.c
+++ /dev/null
@@ -1,1502 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "libnetdata/libnetdata.h"
-#include "agent_cloud_link.h"
-#include "aclk_lws_https_client.h"
-#include "aclk_query.h"
-#include "aclk_common.h"
-#include "aclk_stats.h"
-#include "../aclk_collector_list.h"
-
-#ifdef ENABLE_ACLK
-#include <libwebsockets.h>
-#endif
-
-int aclk_shutting_down = 0;
-
-// Other global state
-static int aclk_subscribed = 0;
-static char *aclk_username = NULL;
-static char *aclk_password = NULL;
-
-static char *global_base_topic = NULL;
-static int aclk_connecting = 0;
-int aclk_force_reconnect = 0; // Indication from lower layers
-
-static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
-
-#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
-#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
-
-void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
-void aclk_lws_wss_destroy_context();
-
-char *create_uuid()
-{
- uuid_t uuid;
- char *uuid_str = mallocz(36 + 1);
-
- uuid_generate(uuid);
- uuid_unparse(uuid, uuid_str);
-
- return uuid_str;
-}
-
-int legacy_cloud_to_agent_parse(JSON_ENTRY *e)
-{
- struct aclk_request *data = e->callback_data;
-
- switch (e->type) {
- case JSON_OBJECT:
- case JSON_ARRAY:
- break;
- case JSON_STRING:
- if (!strcmp(e->name, "msg-id")) {
- data->msg_id = strdupz(e->data.string);
- break;
- }
- if (!strcmp(e->name, "type")) {
- data->type_id = strdupz(e->data.string);
- break;
- }
- if (!strcmp(e->name, "callback-topic")) {
- data->callback_topic = strdupz(e->data.string);
- break;
- }
- if (!strcmp(e->name, "payload")) {
- if (likely(e->data.string)) {
- size_t len = strlen(e->data.string);
- data->payload = mallocz(len+1);
- if (!url_decode_r(data->payload, e->data.string, len + 1))
- strcpy(data->payload, e->data.string);
- }
- break;
- }
- break;
- case JSON_NUMBER:
- if (!strcmp(e->name, "version")) {
- data->version = e->data.number;
- break;
- }
- if (!strcmp(e->name, "min-version")) {
- data->min_version = e->data.number;
- break;
- }
- if (!strcmp(e->name, "max-version")) {
- data->max_version = e->data.number;
- break;
- }
-
- break;
-
- case JSON_BOOLEAN:
- break;
-
- case JSON_NULL:
- break;
- }
- return 0;
-}
-
-
-static RSA *aclk_private_key = NULL;
-static int create_private_key()
-{
- if (aclk_private_key != NULL)
- RSA_free(aclk_private_key);
- aclk_private_key = NULL;
- char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
-
- long bytes_read;
- char *private_key = read_by_filename(filename, &bytes_read);
- if (!private_key) {
- error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
- return 1;
- }
- debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
-
- BIO *key_bio = BIO_new_mem_buf(private_key, -1);
- if (key_bio==NULL) {
- error("Claimed agent cannot establish ACLK - failed to create BIO for key");
- goto biofailed;
- }
-
- aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
- BIO_free(key_bio);
- if (aclk_private_key!=NULL)
- {
- freez(private_key);
- return 0;
- }
- char err[512];
- ERR_error_string_n(ERR_get_error(), err, sizeof(err));
- error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
-
-biofailed:
- freez(private_key);
- return 1;
-}
-
-/*
- * After a connection failure -- delay in milliseconds
- * When a connection is established, the delay function
- * should be called with
- *
- * mode 0 to reset the delay
- * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
- *
- */
-unsigned long int aclk_reconnect_delay(int mode)
-{
- static int fail = -1;
- unsigned long int delay;
-
- if (!mode || fail == -1) {
- srandom(time(NULL));
- fail = mode - 1;
- return 0;
- }
-
- delay = (1 << fail);
-
- if (delay >= ACLK_MAX_BACKOFF_DELAY) {
- delay = ACLK_MAX_BACKOFF_DELAY * 1000;
- } else {
- fail++;
- delay *= 1000;
- delay += (random() % (MAX(1000, delay/2)));
- }
-
- return delay;
-}
-
-// This will give the base topic that the agent will publish messages.
-// subtopics will be sent under the base topic e.g. base_topic/subtopic
-// This is called during the connection, we delete any previous topic
-// in-case the user has changed the agent id and reclaimed.
-
-char *create_publish_base_topic()
-{
- char *agent_id = is_agent_claimed();
- if (unlikely(!agent_id))
- return NULL;
-
- ACLK_LOCK;
-
- if (global_base_topic)
- freez(global_base_topic);
- char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
-
- snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
- tmp = strchr(tmp_topic, '\n');
- if (unlikely(tmp))
- *tmp = '\0';
- global_base_topic = strdupz(tmp_topic);
-
- ACLK_UNLOCK;
- freez(agent_id);
- return global_base_topic;
-}
-
-/*
- * Build a topic based on sub_topic and final_topic
- * if the sub topic starts with / assume that is an absolute topic
- *
- */
-
-char *get_topic(char *sub_topic, char *final_topic, int max_size)
-{
- int rc;
-
- if (likely(sub_topic && sub_topic[0] == '/'))
- return sub_topic;
-
- if (unlikely(!global_base_topic))
- return sub_topic;
-
- rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
- if (unlikely(rc >= max_size))
- debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
-
- return final_topic;
-}
-
-/* Avoids the need to scan through all RRDHOSTS
- * every time any Query Thread Wakes Up
- * (every time we need to check child popcorn expiry)
- * call with legacy_aclk_shared_state_LOCK held
- */
-void aclk_update_next_child_to_popcorn(void)
-{
- RRDHOST *host;
- int any = 0;
-
- rrd_rdlock();
- rrdhost_foreach_read(host) {
- if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)))
- continue;
-
- rrdhost_aclk_state_lock(host);
- if (!ACLK_IS_HOST_POPCORNING(host)) {
- rrdhost_aclk_state_unlock(host);
- continue;
- }
-
- any = 1;
-
- if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) {
- legacy_aclk_shared_state.next_popcorn_host = host;
- rrdhost_aclk_state_unlock(host);
- continue;
- }
-
- if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
- legacy_aclk_shared_state.next_popcorn_host = host;
-
- rrdhost_aclk_state_unlock(host);
- }
- if(!any)
- legacy_aclk_shared_state.next_popcorn_host = NULL;
-
- rrd_unlock();
-}
-
-/* If popcorning bump timer.
- * If popcorning or initializing (host not stable) return 1
- * Otherwise return 0
- */
-static int aclk_popcorn_check_bump(RRDHOST *host)
-{
- time_t now = now_monotonic_sec();
- int updated = 0, ret;
- legacy_aclk_shared_state_LOCK;
- rrdhost_aclk_state_lock(host);
-
- ret = ACLK_IS_HOST_INITIALIZING(host);
- if (unlikely(ACLK_IS_HOST_POPCORNING(host))) {
- if(now != host->aclk_state.t_last_popcorn_update) {
- updated = 1;
- info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
- }
- host->aclk_state.t_last_popcorn_update = now;
- rrdhost_aclk_state_unlock(host);
-
- if (host != localhost && updated)
- aclk_update_next_child_to_popcorn();
-
- legacy_aclk_shared_state_UNLOCK;
- return ret;
- }
-
- rrdhost_aclk_state_unlock(host);
- legacy_aclk_shared_state_UNLOCK;
- return ret;
-}
-
-inline static int aclk_host_initializing(RRDHOST *host)
-{
- rrdhost_aclk_state_lock(host);
- int ret = ACLK_IS_HOST_INITIALIZING(host);
- rrdhost_aclk_state_unlock(host);
- return ret;
-}
-
-static void aclk_start_host_popcorning(RRDHOST *host)
-{
- usec_t now = now_monotonic_sec();
- info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
- legacy_aclk_shared_state_LOCK;
- rrdhost_aclk_state_lock(host);
- if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
- errno = 0;
- error("Localhost is allowed to do popcorning only once after startup!");
- rrdhost_aclk_state_unlock(host);
- legacy_aclk_shared_state_UNLOCK;
- return;
- }
-
- host->aclk_state.state = ACLK_HOST_INITIALIZING;
- host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
- host->aclk_state.t_last_popcorn_update = now;
- rrdhost_aclk_state_unlock(host);
- if (host != localhost)
- aclk_update_next_child_to_popcorn();
- legacy_aclk_shared_state_UNLOCK;
-}
-
-static void aclk_stop_host_popcorning(RRDHOST *host)
-{
- legacy_aclk_shared_state_LOCK;
- rrdhost_aclk_state_lock(host);
- if (!ACLK_IS_HOST_POPCORNING(host)) {
- rrdhost_aclk_state_unlock(host);
- legacy_aclk_shared_state_UNLOCK;
- return;
- }
-
- info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid);
- host->aclk_state.t_last_popcorn_update = 0;
- host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
- rrdhost_aclk_state_unlock(host);
-
- if(host == legacy_aclk_shared_state.next_popcorn_host) {
- legacy_aclk_shared_state.next_popcorn_host = NULL;
- aclk_update_next_child_to_popcorn();
- }
- legacy_aclk_shared_state_UNLOCK;
-}
-
-/*
- * Add a new collector to the list
- * If it exists, update the chart count
- */
-void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
- return;
- }
-
- COLLECTOR_LOCK;
-
- tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
-
- if (unlikely(tmp_collector->count != 1)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- COLLECTOR_UNLOCK;
-
- if(aclk_popcorn_check_bump(host))
- return;
-
- if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
- debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
-}
-
-/*
- * Delete a collector from the list
- * If the chart count reaches zero the collector will be removed
- * from the list by calling del_collector.
- *
- * This function will release the memory used and schedule
- * a cloud update
- */
-void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
- return;
- }
-
- COLLECTOR_LOCK;
-
- tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
-
- if (unlikely(!tmp_collector || tmp_collector->count)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- debug(
- D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
- tmp_collector->count);
-
- COLLECTOR_UNLOCK;
-
- _free_collector(tmp_collector);
-
- if (aclk_popcorn_check_bump(host))
- return;
-
- if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
- debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
-}
-
-static void aclk_graceful_disconnect()
-{
- size_t write_q, write_q_bytes, read_q;
- time_t event_loop_timeout;
-
- // Send a graceful disconnect message
- BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg);
- buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}");
- aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
- buffer_free(b);
-
- event_loop_timeout = now_realtime_sec() + 5;
- write_q = 1;
- while (write_q && event_loop_timeout > now_realtime_sec()) {
- _link_event_loop();
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
- }
-
- aclk_shutting_down = 1;
- _link_shutdown();
- aclk_lws_wss_mqtt_layer_disconnect_notif();
-
- write_q = 1;
- event_loop_timeout = now_realtime_sec() + 5;
- while (write_q && event_loop_timeout > now_realtime_sec()) {
- _link_event_loop();
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
- }
- aclk_shutting_down = 0;
-}
-
-#ifndef __GNUC__
-#pragma region Incoming Msg Parsing
-#endif
-
-struct dictionary_singleton {
- char *key;
- char *result;
-};
-
-int json_extract_singleton(JSON_ENTRY *e)
-{
- struct dictionary_singleton *data = e->callback_data;
-
- switch (e->type) {
- case JSON_OBJECT:
- case JSON_ARRAY:
- break;
- case JSON_STRING:
- if (!strcmp(e->name, data->key)) {
- data->result = strdupz(e->data.string);
- break;
- }
- break;
- case JSON_NUMBER:
- case JSON_BOOLEAN:
- case JSON_NULL:
- break;
- }
- return 0;
-}
-
-#ifndef __GNUC__
-#pragma endregion
-#endif
-
-
-#ifndef __GNUC__
-#pragma region Challenge Response
-#endif
-
-// Base-64 decoder.
-// Note: This is non-validating, invalid input will be decoded without an error.
-// Challenges are packed into json strings so we don't skip newlines.
-// Size errors (i.e. invalid input size or insufficient output space) are caught.
-size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
-{
- static char lookup[256];
- static int first_time=1;
- if (first_time)
- {
- first_time = 0;
- for(int i=0; i<256; i++)
- lookup[i] = -1;
- for(int i='A'; i<='Z'; i++)
- lookup[i] = i-'A';
- for(int i='a'; i<='z'; i++)
- lookup[i] = i-'a' + 26;
- for(int i='0'; i<='9'; i++)
- lookup[i] = i-'0' + 52;
- lookup['+'] = 62;
- lookup['/'] = 63;
- }
- if ((input_size & 3) != 0)
- {
- error("Can't decode base-64 input length %zu", input_size);
- return 0;
- }
- size_t unpadded_size = (input_size/4) * 3;
- if ( unpadded_size > output_size )
- {
- error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
- return 0;
- }
- // Don't check padding within full quantums
- for (size_t i = 0 ; i < input_size-4 ; i+=4 )
- {
- uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
- output[0] = value >> 16;
- output[1] = value >> 8;
- output[2] = value;
- //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
- output += 3;
- input += 4;
- }
- // Handle padding only in last quantum
- if (input[2] == '=') {
- uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
- output[0] = value >> 4;
- //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
- return unpadded_size-2;
- }
- else if (input[3] == '=') {
- uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
- output[0] = value >> 10;
- output[1] = value >> 2;
- //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
- return unpadded_size-1;
- }
- else
- {
- uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
- output[0] = value >> 16;
- output[1] = value >> 8;
- output[2] = value;
- //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
- return unpadded_size;
- }
-}
-
-size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
-{
- uint32_t value;
- static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz"
- "0123456789+/";
- if ((input_size/3+1)*4 >= output_size)
- {
- error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
- return 0;
- }
- size_t count = 0;
- while (input_size>3)
- {
- value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
- output[0] = lookup[value >> 18];
- output[1] = lookup[(value >> 12) & 0x3f];
- output[2] = lookup[(value >> 6) & 0x3f];
- output[3] = lookup[value & 0x3f];
- //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
- output += 4;
- input += 3;
- input_size -= 3;
- count += 4;
- }
- switch (input_size)
- {
- case 2:
- value = (input[0] << 10) + (input[1] << 2);
- output[0] = lookup[(value >> 12) & 0x3f];
- output[1] = lookup[(value >> 6) & 0x3f];
- output[2] = lookup[value & 0x3f];
- output[3] = '=';
- //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
- count += 4;
- break;
- case 1:
- value = input[0] << 4;
- output[0] = lookup[(value >> 6) & 0x3f];
- output[1] = lookup[value & 0x3f];
- output[2] = '=';
- output[3] = '=';
- //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
- count += 4;
- break;
- case 0:
- break;
- }
- return count;
-}
-
-
-
-int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
-{
- int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
- if (result == -1) {
- char err[512];
- ERR_error_string_n(ERR_get_error(), err, sizeof(err));
- error("Decryption of the challenge failed: %s", err);
- }
- return result;
-}
-
-void aclk_get_challenge(char *aclk_hostname, int port)
-{
- char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- debug(D_ACLK, "Performing challenge-response sequence");
- if (aclk_password != NULL)
- {
- freez(aclk_password);
- aclk_password = NULL;
- }
- // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
- // TODO - target host?
- char *agent_id = is_agent_claimed();
- if (agent_id == NULL)
- {
- error("Agent was not claimed - cannot perform challenge/response");
- goto CLEANUP;
- }
- char url[1024];
- sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
- info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url);
- if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
- {
- error("Challenge failed: %s", data_buffer);
- goto CLEANUP;
- }
- struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
-
- debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
- if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
- {
- freez(challenge.result);
- error("Could not parse the json response with the challenge: %s", data_buffer);
- goto CLEANUP;
- }
- if (challenge.result == NULL ) {
- error("Could not retrieve challenge from auth response: %s", data_buffer);
- goto CLEANUP;
- }
-
-
- size_t challenge_len = strlen(challenge.result);
- unsigned char decoded[512];
- size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
-
- unsigned char plaintext[4096]={};
- int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
- freez(challenge.result);
- char encoded[512];
- size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
- encoded[encoded_len] = 0;
- debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
-
- char response_json[4096]={};
- sprintf(response_json, "{\"response\":\"%s\"}", encoded);
- debug(D_ACLK, "Password phase: %s",response_json);
- // TODO - host
- sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
- if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
- {
- error("Challenge-response failed: %s", data_buffer);
- goto CLEANUP;
- }
-
- debug(D_ACLK, "Password response from cloud: %s", data_buffer);
-
- struct dictionary_singleton password = { .key = "password", .result = NULL };
- if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
- {
- freez(password.result);
- error("Could not parse the json response with the password: %s", data_buffer);
- goto CLEANUP;
- }
-
- if (password.result == NULL ) {
- error("Could not retrieve password from auth response");
- goto CLEANUP;
- }
- if (aclk_password != NULL )
- freez(aclk_password);
- aclk_password = password.result;
- if (aclk_username != NULL)
- freez(aclk_username);
- aclk_username = agent_id;
- agent_id = NULL;
-
-CLEANUP:
- if (agent_id != NULL)
- freez(agent_id);
- freez(data_buffer);
- return;
-}
-
-#ifndef __GNUC__
-#pragma endregion
-#endif
-
-static void aclk_try_to_connect(char *hostname, int port)
-{
- int rc;
-
-// this is useful for developers working on ACLK
-// allows connecting agent to any MQTT broker
-// for debugging, development and testing purposes
-#ifndef ACLK_DISABLE_CHALLENGE
- if (!aclk_private_key) {
- error("Cannot try to establish the agent cloud link - no private key available!");
- return;
- }
-#endif
-
- info("Attempting to establish the agent cloud link");
-#ifdef ACLK_DISABLE_CHALLENGE
- error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing "
- "and development purposes only. Warranty void. Won't be able "
- "to connect to Netdata Cloud.");
- if (aclk_password == NULL)
- aclk_password = strdupz("anon");
-#else
- aclk_get_challenge(hostname, port);
- if (aclk_password == NULL)
- return;
-#endif
-
- aclk_connecting = 1;
- create_publish_base_topic();
-
- legacy_aclk_shared_state_LOCK;
- legacy_aclk_shared_state.version_neg = 0;
- legacy_aclk_shared_state.version_neg_wait_till = 0;
- legacy_aclk_shared_state_UNLOCK;
-
- rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password);
- if (unlikely(rc)) {
- error("Failed to initialize the agent cloud link library");
- }
-}
-
-// Sends "hello" message to negotiate ACLK version with cloud
-static inline void aclk_hello_msg()
-{
- BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
-
- char *msg_id = create_uuid();
-
- legacy_aclk_shared_state_LOCK;
- legacy_aclk_shared_state.version_neg = 0;
- legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
- legacy_aclk_shared_state_UNLOCK;
-
- //Hello message is versioned separately from the rest of the protocol
- aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
- buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX);
- aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id);
- freez(msg_id);
- buffer_free(buf);
-}
-
-/**
- * Main agent cloud link thread
- *
- * This thread will simply call the main event loop that handles
- * pending requests - both inbound and outbound
- *
- * @param ptr is a pointer to the netdata_static_thread structure.
- *
- * @return It always returns NULL
- */
-void *legacy_aclk_main(void *ptr)
-{
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- struct aclk_query_threads query_threads;
- struct aclk_stats_thread *stats_thread = NULL;
- time_t last_periodic_query_wakeup = 0;
-
- query_threads.thread_list = NULL;
-
- // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
- // as it must notify the far end that it shutdown gracefully and avoid the LWT.
- netdata_thread_disable_cancelability();
-
-#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
- info("Killing ACLK thread -> cloud functionality has been disabled");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- return NULL;
-#endif
-
-#ifndef LWS_WITH_SOCKS5
- ACLK_PROXY_TYPE proxy_type;
- aclk_get_proxy(&proxy_type);
- if(proxy_type == PROXY_TYPE_SOCKS5) {
- error("Disabling ACLK due to requested SOCKS5 proxy.");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- return NULL;
- }
-#endif
-
- info("Waiting for netdata to be ready");
- while (!netdata_ready) {
- sleep_usec(USEC_PER_MS * 300);
- }
-
- info("Waiting for Cloud to be enabled");
- while (!netdata_cloud_setting) {
- sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit) {
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- return NULL;
- }
- }
-
- query_threads.count = MIN(processors/2, 6);
- query_threads.count = MAX(query_threads.count, 2);
- query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
- if(query_threads.count < 1) {
- error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count);
- query_threads.count = 1;
- config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
- }
-
- //start localhost popcorning
- aclk_start_host_popcorning(localhost);
-
- aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
- if (aclk_stats_enabled) {
- stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
- stats_thread->thread = mallocz(sizeof(netdata_thread_t));
- stats_thread->query_thread_count = query_threads.count;
- netdata_thread_create(
- stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread,
- stats_thread);
- }
-
- char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
- int port_num = 0;
- info("Waiting for netdata to be claimed");
- while(1) {
- char *agent_id = is_agent_claimed();
- while (likely(!agent_id)) {
- sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
- goto exited;
- agent_id = is_agent_claimed();
- }
- freez(agent_id);
- // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
- // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
- char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
- if (cloud_base_url == NULL) {
- error("Do not move the cloud base url out of post_conf_load!!");
- goto exited;
- }
- if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num))
- error("Agent is claimed but the configuration is invalid, please fix");
- else if (!create_private_key() && !_mqtt_lib_init())
- break;
-
- for (int i=0; i<60; i++) {
- if (netdata_exit)
- goto exited;
-
- sleep_usec(USEC_PER_SEC * 1);
- }
- }
-
- usec_t reconnect_expiry = 0; // In usecs
-
- while (!netdata_exit) {
- static int first_init = 0;
- /* size_t write_q, write_q_bytes, read_q;
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
-
- if (aclk_disable_runtime && !aclk_connected) {
- sleep(1);
- continue;
- }
-
- if (aclk_kill_link) { // User has reloaded the claiming state
- aclk_kill_link = 0;
- aclk_graceful_disconnect();
- create_private_key();
- continue;
- }
-
- if (aclk_force_reconnect) {
- aclk_lws_wss_destroy_context();
- aclk_force_reconnect = 0;
- }
- if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
- if (unlikely(!first_init)) {
- aclk_try_to_connect(aclk_hostname, port_num);
- first_init = 1;
- } else {
- if (aclk_connecting == 0) {
- if (reconnect_expiry == 0) {
- unsigned long int delay = aclk_reconnect_delay(1);
- reconnect_expiry = now_realtime_usec() + delay * 1000;
- info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
- }
- if (now_realtime_usec() >= reconnect_expiry) {
- reconnect_expiry = 0;
- aclk_try_to_connect(aclk_hostname, port_num);
- }
- sleep_usec(USEC_PER_MS * 100);
- }
- }
- if (aclk_connecting) {
- _link_event_loop();
- sleep_usec(USEC_PER_MS * 100);
- }
- continue;
- }
-
- _link_event_loop();
- if (unlikely(!aclk_connected || aclk_force_reconnect))
- continue;
- /*static int stress_counter = 0;
- if (write_q_bytes==0 && stress_counter ++ >5)
- {
- aclk_send_stress_test(8000000);
- stress_counter = 0;
- }*/
-
- if (unlikely(!aclk_subscribed)) {
- aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
- aclk_hello_msg();
- }
-
- if (unlikely(!query_threads.thread_list)) {
- legacy_aclk_query_threads_start(&query_threads);
- }
-
- time_t now = now_monotonic_sec();
- if(aclk_connected && last_periodic_query_wakeup < now) {
- // to make `legacy_aclk_queue_query()` param `run_after` work
- // also makes per child popcorning work
- last_periodic_query_wakeup = now;
- LEGACY_QUERY_THREAD_WAKEUP;
- }
- } // forever
-exited:
- // Wakeup query thread to cleanup
- LEGACY_QUERY_THREAD_WAKEUP_ALL;
-
- freez(aclk_username);
- freez(aclk_password);
- freez(aclk_hostname);
- if (aclk_private_key != NULL)
- RSA_free(aclk_private_key);
-
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
-
- char *agent_id = is_agent_claimed();
- if (agent_id && aclk_connected) {
- freez(agent_id);
- // Wakeup thread to cleanup
- LEGACY_QUERY_THREAD_WAKEUP;
- aclk_graceful_disconnect();
- }
-
- legacy_aclk_query_threads_cleanup(&query_threads);
-
- _reset_collector_list();
- freez(collector_list);
-
- if(aclk_stats_enabled) {
- netdata_thread_join(*stats_thread->thread, NULL);
- legacy_aclk_stats_thread_cleanup();
- freez(stats_thread->thread);
- freez(stats_thread);
- }
-
- /*
- * this must be last -> if all static threads signal
- * THREAD_EXITED rrdengine will dealloc the RRDSETs
- * and RRDDIMs that are used by still running stat thread.
- * see netdata_cleanup_and_exit() for reference
- */
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- return NULL;
-}
-
-/*
- * Send a message to the cloud, using a base topic and sib_topic
- * The final topic will be in the form <base_topic>/<sub_topic>
- * If base_topic is missing then the global_base_topic will be used (if available)
- *
- */
-int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id)
-{
- int rc;
- int mid;
- char topic[ACLK_MAX_TOPIC + 1];
- char *final_topic;
-
- UNUSED(msg_id);
-
- if (!aclk_connected)
- return 0;
-
- if (unlikely(!message))
- return 0;
-
- final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
-
- if (unlikely(!final_topic)) {
- errno = 0;
- error("Unable to build outgoing topic; truncated?");
- return 1;
- }
-
- ACLK_LOCK;
- rc = _link_send_message(final_topic, message, len, &mid);
- // TODO: link the msg_id with the mid so we can trace it
- ACLK_UNLOCK;
-
- if (unlikely(rc)) {
- errno = 0;
- error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
- }
-
- return rc;
-}
-
-int aclk_send_message(char *sub_topic, char *message, char *msg_id)
-{
- return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id);
-}
-
-/*
- * Subscribe to a topic in the cloud
- * The final subscription will be in the form
- * /agent/claim_id/<sub_topic>
- */
-int aclk_subscribe(char *sub_topic, int qos)
-{
- int rc;
- char topic[ACLK_MAX_TOPIC + 1];
- char *final_topic;
-
- final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
- if (unlikely(!final_topic)) {
- errno = 0;
- error("Unable to build outgoing topic; truncated?");
- return 1;
- }
-
- if (!aclk_connected) {
- error("Cannot subscribe to %s - not connected!", topic);
- return 1;
- }
-
- ACLK_LOCK;
- rc = _link_subscribe(final_topic, qos);
- ACLK_UNLOCK;
-
- // TODO: Add better handling -- error will flood the logfile here
- if (unlikely(rc)) {
- errno = 0;
- error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
- }
-
- return rc;
-}
-
-// This is called from a callback when the link goes up
-void aclk_connect()
-{
- info("Connection detected (%u queued queries)", aclk_query_size());
-
- legacy_aclk_stats_upd_online(1);
-
- aclk_connected = 1;
- aclk_reconnect_delay(0);
-
- LEGACY_QUERY_THREAD_WAKEUP;
- return;
-}
-
-// This is called from a callback when the link goes down
-void aclk_disconnect()
-{
- if (likely(aclk_connected))
- info("Disconnect detected (%u queued queries)", aclk_query_size());
-
- legacy_aclk_stats_upd_online(0);
-
- aclk_subscribed = 0;
- rrdhost_aclk_state_lock(localhost);
- localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED;
- rrdhost_aclk_state_unlock(localhost);
- aclk_connected = 0;
- aclk_connecting = 0;
- aclk_force_reconnect = 1;
-}
-
-inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version)
-{
- uuid_t uuid;
- char uuid_str[36 + 1];
-
- if (unlikely(!msg_id)) {
- uuid_generate(uuid);
- uuid_unparse(uuid, uuid_str);
- msg_id = uuid_str;
- }
-
- if (ts_secs == 0) {
- ts_us = now_realtime_usec();
- ts_secs = ts_us / USEC_PER_SEC;
- ts_us = ts_us % USEC_PER_SEC;
- }
-
- buffer_sprintf(
- dest,
- "{\t\"type\": \"%s\",\n"
- "\t\"msg-id\": \"%s\",\n"
- "\t\"timestamp\": %ld,\n"
- "\t\"timestamp-offset-usec\": %llu,\n"
- "\t\"connect\": %ld,\n"
- "\t\"connect-offset-usec\": %llu,\n"
- "\t\"version\": %d",
- type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version);
-
- debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs);
-}
-
-
-/*
- * This will send alarm information which includes
- * configured alarms
- * alarm_log
- * active alarms
- */
-void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
-
-void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
-{
- BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
-
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- debug(D_ACLK, "Metadata alarms start");
-
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
-
- if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
- else
- aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
- buffer_strcat(local_buffer, ",\n\t\"payload\": ");
-
-
- buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
- health_alarms2json(localhost, local_buffer, 1);
- debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
- // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
- // health_alarm_log2json(localhost, local_buffer, 0);
- // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
- buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
- health_active_log_alarms_2json(localhost, local_buffer);
- //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
-
-
-
- buffer_sprintf(local_buffer, "\n}\n}");
- aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
-
- freez(msg_id);
- buffer_free(local_buffer);
-}
-
-/*
- * This will send the agent metadata
- * /api/v1/info
- * charts
- */
-int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
-{
- BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
-
- debug(D_ACLK, "Metadata /info start");
-
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
- if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
- else
- aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
- buffer_strcat(local_buffer, ",\n\t\"payload\": ");
-
- buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
- web_client_api_request_v1_info_fill_buffer(host, local_buffer);
- debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
-
- buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
- charts2json(host, local_buffer, 1, 0);
- buffer_sprintf(local_buffer, "\n}\n}");
- debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
-
- aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
-
- freez(msg_id);
- buffer_free(local_buffer);
- return 0;
-}
-
-int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
-{
- BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
- fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg);
-
- debug(D_ACLK, "Sending Child Disconnect");
-
- char *msg_id = create_uuid();
-
- aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
-
- buffer_strcat(local_buffer, ",\"payload\":");
-
- buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid);
- rrdhost_aclk_state_lock(host);
- if(host->aclk_state.claimed_id)
- buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id);
- else
- buffer_strcat(local_buffer, "null}}");
-
- rrdhost_aclk_state_unlock(host);
-
- aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
-
- freez(msg_id);
- buffer_free(local_buffer);
- return 0;
-}
-
-void legacy_aclk_host_state_update(RRDHOST *host, int connect)
-{
-#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
- if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
- return;
-#else
-#warning "This check became unnecessary. Remove"
-#endif
-
- if (unlikely(aclk_host_initializing(localhost)))
- return;
-
- if (connect) {
- debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
- aclk_start_host_popcorning(host);
- legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
- } else {
- debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
- aclk_stop_host_popcorning(host);
- legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
- }
-}
-
-void aclk_send_stress_test(size_t size)
-{
- char *buffer = mallocz(size);
- if (buffer != NULL)
- {
- for(size_t i=0; i<size; i++)
- buffer[i] = 'x';
- buffer[size-1] = 0;
- time_t time_created = now_realtime_sec();
- sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
- buffer[strlen(buffer)] = '"';
- buffer[size-2] = '}';
- buffer[size-3] = '"';
- aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
- error("Sending stress of size %zu at time %ld", size, time_created);
- }
- free(buffer);
-}
-
-// Send info metadata message to the cloud if the link is established
-// or on request
-int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
-{
- legacy_aclk_send_info_metadata(state, host);
-
- if(host == localhost)
- legacy_aclk_send_alarm_metadata(state);
-
- return 0;
-}
-
-// Triggered by a health reload, sends the alarm metadata
-void legacy_aclk_alarm_reload()
-{
- if (unlikely(aclk_host_initializing(localhost)))
- return;
-
- if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
- if (likely(aclk_connected)) {
- errno = 0;
- error("ACLK failed to queue on_connect command on alarm reload");
- }
- }
-}
-//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
-
-int aclk_send_single_chart(RRDHOST *host, char *chart)
-{
- RRDSET *st = rrdset_find(host, chart);
- if (!st)
- st = rrdset_find_byname(host, chart);
- if (!st) {
- info("FAILED to find chart %s", chart);
- return 1;
- }
-
- BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
- buffer_strcat(local_buffer, ",\n\t\"payload\": ");
-
- rrdset2json(st, local_buffer, NULL, NULL, 1);
- buffer_sprintf(local_buffer, "\t\n}");
-
- aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
-
- freez(msg_id);
- buffer_free(local_buffer);
- return 0;
-}
-
-int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
-{
-#ifndef ENABLE_ACLK
- UNUSED(host);
- UNUSED(chart_name);
- return 0;
-#else
- if (unlikely(!netdata_ready))
- return 0;
-
- if (!netdata_cloud_setting)
- return 0;
-
- if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
- return 0;
-
- if (aclk_host_initializing(localhost))
- return 0;
-
- if (unlikely(aclk_disable_single_updates))
- return 0;
-
- if (aclk_popcorn_check_bump(host))
- return 0;
-
- if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) {
- if (likely(aclk_connected)) {
- errno = 0;
- error("ACLK failed to queue chart_update command");
- }
- }
-
- return 0;
-#endif
-}
-
-int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
-{
- BUFFER *local_buffer = NULL;
-
- if (unlikely(!netdata_ready))
- return 0;
-
- if (host != localhost)
- return 0;
-
- if(unlikely(aclk_host_initializing(localhost)))
- return 0;
-
- /*
- * Check if individual updates have been disabled
- * This will be the case when we do health reload
- * and all the alarms will be dropped and recreated.
- * At the end of the health reload the complete alarm metadata
- * info will be sent
- */
- if (unlikely(aclk_disable_single_updates))
- return 0;
-
- local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- char *msg_id = create_uuid();
-
- buffer_flush(local_buffer);
- aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
- buffer_strcat(local_buffer, ",\n\t\"payload\": ");
-
- netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
- health_alarm_entry2json_nolock(local_buffer, ae, host);
- netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
-
- buffer_sprintf(local_buffer, "\n}");
-
- if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
- if (likely(aclk_connected)) {
- errno = 0;
- error("ACLK failed to queue alarm_command on alarm_update");
- }
- }
-
- freez(msg_id);
- buffer_free(local_buffer);
-
- return 0;
-}
-
-char *legacy_aclk_state(void)
-{
- BUFFER *wb = buffer_create(1024);
- char *ret;
-
- buffer_strcat(wb,
- "ACLK Available: Yes\n"
- "ACLK Implementation: Legacy\n"
- "Claimed: "
- );
-
- char *agent_id = is_agent_claimed();
- if (agent_id == NULL)
- buffer_strcat(wb, "No\n");
- else {
- buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
- freez(agent_id);
- }
-
- buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No");
-
- ret = strdupz(buffer_tostring(wb));
- buffer_free(wb);
- return ret;
-}
-
-char *legacy_aclk_state_json(void)
-{
- BUFFER *wb = buffer_create(1024);
- char *agent_id = is_agent_claimed();
-
- buffer_sprintf(wb,
- "{\"aclk-available\":true,"
- "\"aclk-implementation\":\"Legacy\","
- "\"agent-claimed\":%s,"
- "\"claimed-id\":",
- agent_id ? "true" : "false"
- );
-
- if (agent_id) {
- buffer_sprintf(wb, "\"%s\"", agent_id);
- freez(agent_id);
- } else
- buffer_strcat(wb, "null");
-
- buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false");
-
- return strdupz(buffer_tostring(wb));
-}
diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h
deleted file mode 100644
index 8954a337a..000000000
--- a/aclk/legacy/agent_cloud_link.h
+++ /dev/null
@@ -1,85 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_AGENT_CLOUD_LINK_H
-#define NETDATA_AGENT_CLOUD_LINK_H
-
-#include "daemon/common.h"
-#include "mqtt.h"
-#include "aclk_common.h"
-
-#define ACLK_CHART_TOPIC "outbound/meta"
-#define ACLK_ALARMS_TOPIC "outbound/alarms"
-#define ACLK_METADATA_TOPIC "outbound/meta"
-#define ACLK_COMMAND_TOPIC "inbound/cmd"
-#define ACLK_TOPIC_STRUCTURE "/agent/%s"
-
-#define ACLK_MAX_BACKOFF_DELAY 1024 // maximum backoff delay in seconds
-
-#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg)
-#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds
-#define ACLK_PING_INTERVAL 60
-#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop
-
-#define ACLK_MAX_TOPIC 255
-
-#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff strategy for now
-#define ACLK_DEFAULT_PORT 9002
-#define ACLK_DEFAULT_HOST "localhost"
-
-#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
-
-struct aclk_request {
- char *type_id;
- char *msg_id;
- char *callback_topic;
- char *payload;
- int version;
- int min_version;
- int max_version;
-};
-
-typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION;
-
-void *legacy_aclk_main(void *ptr);
-
-extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
-extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id);
-
-extern char *is_agent_claimed(void);
-extern void aclk_lws_wss_mqtt_layer_disconnect_notif();
-char *create_uuid();
-
-// callbacks for agent cloud link
-int aclk_subscribe(char *topic, int qos);
-int legacy_cloud_to_agent_parse(JSON_ENTRY *e);
-void aclk_disconnect();
-void aclk_connect();
-
-#ifdef ENABLE_ACLK
-int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host);
-int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
-void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
-
-int aclk_wait_for_initialization();
-char *create_publish_base_topic();
-
-int aclk_send_single_chart(RRDHOST *host, char *chart);
-int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
-int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version);
-int legacy_aclk_handle_cloud_message(char *payload);
-void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void legacy_aclk_alarm_reload(void);
-unsigned long int aclk_reconnect_delay(int mode);
-extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
-
-void legacy_aclk_host_state_update(RRDHOST *host, int connect);
-int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd);
-void aclk_update_next_child_to_popcorn(void);
-
-char *legacy_aclk_state(void);
-char *legacy_aclk_state_json(void);
-#endif
-
-#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c
deleted file mode 100644
index 0e4bb2ec9..000000000
--- a/aclk/legacy/mqtt.c
+++ /dev/null
@@ -1,370 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include <libnetdata/json/json.h>
-#include "daemon/common.h"
-#include "mqtt.h"
-#include "aclk_lws_wss_client.h"
-#include "aclk_stats.h"
-#include "aclk_rx_msgs.h"
-
-#include "agent_cloud_link.h"
-
-#define ACLK_QOS 1
-
-extern usec_t aclk_session_us;
-extern time_t aclk_session_sec;
-
-inline const char *_link_strerror(int rc)
-{
- return mosquitto_strerror(rc);
-}
-
-#ifdef NETDATA_INTERNAL_CHECKS
-static struct timeval sendTimes[1024];
-#endif
-
-static struct mosquitto *mosq = NULL;
-
-
-void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
-{
- UNUSED(mosq);
- UNUSED(obj);
-
- legacy_aclk_handle_cloud_message(msg->payload);
-}
-
-void publish_callback(struct mosquitto *mosq, void *obj, int rc)
-{
- UNUSED(mosq);
- UNUSED(obj);
- UNUSED(rc);
-#ifdef NETDATA_INTERNAL_CHECKS
- struct timeval now, *orig;
- now_realtime_timeval(&now);
- orig = &sendTimes[ rc & 0x3ff ];
- int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec);
- diff /= 1000;
-
- info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff);
-
- legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.latency, diff);
-#endif
- return;
-}
-
-void connect_callback(struct mosquitto *mosq, void *obj, int rc)
-{
- UNUSED(mosq);
- UNUSED(obj);
- UNUSED(rc);
-
- info("Connection to cloud established");
- aclk_connect();
-
- return;
-}
-
-void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
-{
- UNUSED(mosq);
- UNUSED(obj);
- UNUSED(rc);
-
- if (netdata_exit)
- info("Connection to cloud terminated due to agent shutdown");
- else {
- errno = 0;
- error("Connection to cloud failed");
- }
- aclk_disconnect();
-
- aclk_lws_wss_mqtt_layer_disconnect_notif();
-
- return;
-}
-
-void _show_mqtt_info()
-{
- int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
- libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
-
- info(
- "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
- libmosq_revision);
-}
-
-size_t _mqtt_external_write_hook(void *buf, size_t count)
-{
- return aclk_lws_wss_client_write(buf, count);
-}
-
-size_t _mqtt_external_read_hook(void *buf, size_t count)
-{
- return aclk_lws_wss_client_read(buf, count);
-}
-
-int _mqtt_lib_init()
-{
- int rc;
- //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
- /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version.
- char *ca_crt;
- char *server_crt;
- char *server_key;
-
- // show library info so can have it in the logfile
- //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
- ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*");
- server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*");
- server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*");
-
- if (ca_crt[0] == '*') {
- freez(ca_crt);
- ca_crt = NULL;
- }
-
- if (server_crt[0] == '*') {
- freez(server_crt);
- server_crt = NULL;
- }
-
- if (server_key[0] == '*') {
- freez(server_key);
- server_key = NULL;
- }
- */
-
- // info(
- // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
- // libmosq_revision);
-
- rc = mosquitto_lib_init();
- if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
- error("Failed to initialize MQTT (libmosquitto library)");
- return 1;
- }
- return 0;
-}
-
-static int _mqtt_create_connection(char *username, char *password)
-{
- if (mosq != NULL)
- mosquitto_destroy(mosq);
- mosq = mosquitto_new(username, true, NULL);
- if (unlikely(!mosq)) {
- mosquitto_lib_cleanup();
- error("MQTT new structure -- %s", mosquitto_strerror(errno));
- return MOSQ_ERR_UNKNOWN;
- }
-
- // Record the session start time to allow a nominal LWT timestamp
- usec_t now = now_realtime_usec();
- aclk_session_sec = now / USEC_PER_SEC;
- aclk_session_us = now % USEC_PER_SEC;
-
- _link_set_lwt("outbound/meta", 2);
-
- mosquitto_connect_callback_set(mosq, connect_callback);
- mosquitto_disconnect_callback_set(mosq, disconnect_callback);
- mosquitto_publish_callback_set(mosq, publish_callback);
-
- info("Using challenge-response: %s / %s", username, password);
- mosquitto_username_pw_set(mosq, username, password);
-
- int rc = mosquitto_threaded_set(mosq, 1);
- if (unlikely(rc != MOSQ_ERR_SUCCESS))
- error("Failed to tune the thread model for libmosquitto (%s)", mosquitto_strerror(rc));
-
-#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000
- rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0);
- if (unlikely(rc != MOSQ_ERR_SUCCESS))
- error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc));
-
- rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1);
- info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc));
-#endif
-
- return rc;
-}
-
-static int _link_mqtt_connect(char *aclk_hostname, int aclk_port)
-{
- int rc;
-
- rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
-
- if (unlikely(rc != MOSQ_ERR_SUCCESS))
- error(
- "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc,
- mosquitto_strerror(rc));
- else
- info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port);
-
- return rc;
-}
-
-static inline void _link_mosquitto_write()
-{
- int rc;
-
- if (unlikely(!mosq)) {
- return;
- }
-
- rc = mosquitto_loop_misc(mosq);
- if (unlikely(rc != MOSQ_ERR_SUCCESS))
- debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
-
- if (likely(mosquitto_want_write(mosq))) {
- rc = mosquitto_loop_write(mosq, 1);
- if (rc != MOSQ_ERR_SUCCESS)
- debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc));
- }
-}
-
-void aclk_lws_connection_established(char *hostname, int port)
-{
- _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected.
- _link_mosquitto_write();
-}
-
-void aclk_lws_connection_data_received()
-{
- int rc = mosquitto_loop_read(mosq, 1);
- if (rc != MOSQ_ERR_SUCCESS)
- debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc));
-}
-
-void aclk_lws_connection_closed()
-{
- aclk_disconnect();
-
-}
-
-
-int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password)
-{
- if(aclk_lws_wss_connect(aclk_hostname, aclk_port))
- return MOSQ_ERR_UNKNOWN;
- aclk_lws_wss_service_loop();
-
- int rc = _mqtt_create_connection(username, password);
- if (rc!= MOSQ_ERR_SUCCESS)
- return rc;
-
- mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook);
- return rc;
-}
-
-inline int _link_event_loop()
-{
-
- // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1).
- _link_mosquitto_write();
- aclk_lws_wss_service_loop();
-
- // this is because if use LWS we don't want
- // mqtt to reconnect by itself
- return MOSQ_ERR_SUCCESS;
-}
-
-void _link_shutdown()
-{
- int rc;
-
- if (likely(!mosq))
- return;
-
- rc = mosquitto_disconnect(mosq);
- switch (rc) {
- case MOSQ_ERR_SUCCESS:
- info("MQTT disconnected from broker");
- break;
- default:
- info("MQTT invalid structure");
- break;
- };
-}
-
-
-int _link_set_lwt(char *sub_topic, int qos)
-{
- int rc;
- char topic[ACLK_MAX_TOPIC + 1];
- char *final_topic;
-
- final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
- if (unlikely(!final_topic)) {
- errno = 0;
- error("Unable to build outgoing topic; truncated?");
- return 1;
- }
-
- usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1;
- BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION);
- buffer_strcat(b, ", \"payload\": \"unexpected\" }");
- rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0);
- buffer_free(b);
-
- return rc;
-}
-
-int _link_subscribe(char *topic, int qos)
-{
- int rc;
-
- if (unlikely(!mosq))
- return 1;
-
- mosquitto_message_callback_set(mosq, mqtt_message_callback);
-
- rc = mosquitto_subscribe(mosq, NULL, topic, qos);
- if (unlikely(rc)) {
- errno = 0;
- error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc));
- return 1;
- }
-
- _link_mosquitto_write();
- return 0;
-}
-
-/*
- * Send a message to the cloud to specific topic
- *
- */
-
-int _link_send_message(char *topic, const void *message, size_t len, int *mid)
-{
- int rc;
- size_t write_q, write_q_bytes, read_q;
-
- rc = mosquitto_pub_topic_check(topic);
-
- if (unlikely(rc != MOSQ_ERR_SUCCESS))
- return rc;
-
- lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
- rc = mosquitto_publish(mosq, mid, topic, len, message, ACLK_QOS, 0);
-
-#ifdef NETDATA_INTERNAL_CHECKS
- char msg_head[64];
- memset(msg_head, 0, sizeof(msg_head));
- strncpy(msg_head, (char*)message, 60);
- for (size_t i = 0; i < sizeof(msg_head); i++)
- if(msg_head[i] == '\n') msg_head[i] = ' ';
- info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", (int)len,
- *mid, write_q, write_q_bytes, read_q, msg_head);
- now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]);
-#endif
-
- // TODO: Add better handling -- error will flood the logfile here
- if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
- errno = 0;
- error("MQTT message failed : %s", mosquitto_strerror(rc));
- }
- _link_mosquitto_write();
- return rc;
-}
diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h
deleted file mode 100644
index 98d599f51..000000000
--- a/aclk/legacy/mqtt.h
+++ /dev/null
@@ -1,25 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_MQTT_H
-#define NETDATA_MQTT_H
-
-#ifdef ENABLE_ACLK
-#include "externaldeps/mosquitto/mosquitto.h"
-#endif
-
-void _show_mqtt_info();
-int _link_event_loop();
-void _link_shutdown();
-int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password);
-//int _link_lib_init();
-int _mqtt_lib_init();
-int _link_subscribe(char *topic, int qos);
-int _link_send_message(char *topic, const void *message, size_t len, int *mid);
-const char *_link_strerror(int rc);
-int _link_set_lwt(char *topic, int qos);
-
-
-int legacy_aclk_handle_cloud_message(char *);
-extern char *get_topic(char *sub_topic, char *final_topic, int max_size);
-
-#endif //NETDATA_MQTT_H
diff --git a/aclk/legacy/tests/fake-charts.d.plugin b/aclk/legacy/tests/fake-charts.d.plugin
deleted file mode 100644
index a13c6bab8..000000000
--- a/aclk/legacy/tests/fake-charts.d.plugin
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-
-sleep 45 # Wait until popcorning finishes
-
-echo "CHART aclk_test.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1"
-sleep 5
-echo "DIMENSION aclk1 '' percentage-of-absolute 1 1"
-sleep 5
-echo "BEGIN aclk_test.newcol 1000000"
-echo "SET aclk1 = 3"
-echo "END"
-sleep 5
-echo "DIMENSION aclk2 '' percentage-of-absolute 1 1"
-sleep 5
-echo "BEGIN aclk_test.newcol 1000000"
-echo "SET aclk1 = 3"
-echo "SET aclk2 = 3"
-echo "END"
-sleep 5
-echo "CHART aclk_test2.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1"
-echo "DIMENSION aclk1 '' percentage-of-absolute 1 1"
-
-sleep 5
-exit 0 # Signal that we are done
diff --git a/aclk/legacy/tests/install-fake-charts.d.sh.in b/aclk/legacy/tests/install-fake-charts.d.sh.in
deleted file mode 100644
index ac002a2bd..000000000
--- a/aclk/legacy/tests/install-fake-charts.d.sh.in
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/usr/bin/env bash
-
-TARGET="@pluginsdir_POST@"
-BASE="$(cd "$(dirname "$0")" && pwd)"
-
-cp "$BASE/fake-charts.d.plugin" "$TARGET/charts.d.plugin"
diff --git a/aclk/legacy/tests/launch-paho.sh b/aclk/legacy/tests/launch-paho.sh
deleted file mode 100755
index 1c2cb5f2c..000000000
--- a/aclk/legacy/tests/launch-paho.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/usr/bin/env bash
-
-docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client
-docker run -it paho-client
diff --git a/aclk/legacy/tests/paho-inspection.py b/aclk/legacy/tests/paho-inspection.py
deleted file mode 100644
index 14e99b65b..000000000
--- a/aclk/legacy/tests/paho-inspection.py
+++ /dev/null
@@ -1,59 +0,0 @@
-import ssl
-import paho.mqtt.client as mqtt
-import json
-import time
-import sys
-
-def on_connect(mqttc, obj, flags, rc):
- if rc==0:
- print("Successful connection", flush=True)
- else :
- print(f"Connection error rc={rc}", flush=True)
- mqttc.subscribe("/agent/#",0)
-
-def on_disconnect(mqttc, obj, flags, rc):
- print("disconnected rc: "+str(rc), flush=True)
-
-def on_message(mqttc, obj, msg):
- print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True)
- try:
- print(f"Trying decode of {msg.payload[:60]}",flush=True)
- api_msg = json.loads(msg.payload)
- except Exception as e:
- print(e,flush=True)
- return
- ts = api_msg["timestamp"]
- mtype = api_msg["type"]
- print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True)
- now = time.time()
- print(f"Current {now} -> Delay {now-ts}", flush=True)
- if mtype=="disconnect":
- print(f"Message dump: {api_msg}", flush=True)
-
-def on_publish(mqttc, obj, mid):
- print("mid: "+str(mid), flush=True)
-
-def on_subscribe(mqttc, obj, mid, granted_qos):
- print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True)
-
-def on_log(mqttc, obj, level, string):
- print(string)
-
-print(f"Starting paho-inspection on {sys.argv[1]}", flush=True)
-mqttc = mqtt.Client(transport='websockets',client_id="paho")
-#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
-#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
-mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
-mqttc.tls_insecure_set(True)
-mqttc.on_message = on_message
-mqttc.on_connect = on_connect
-mqttc.on_disconnect = on_disconnect
-mqttc.on_publish = on_publish
-mqttc.on_subscribe = on_subscribe
-mqttc.username_pw_set("paho","paho")
-mqttc.connect(sys.argv[1], 8443, 60)
-
-#mqttc.publish("/agent/mine","Test1")
-#mqttc.subscribe("$SYS/#", 0)
-print("Connected successfully, monitoring /agent/#", flush=True)
-mqttc.loop_forever()
diff --git a/aclk/legacy/tests/paho.Dockerfile b/aclk/legacy/tests/paho.Dockerfile
deleted file mode 100644
index d67cc4cb0..000000000
--- a/aclk/legacy/tests/paho.Dockerfile
+++ /dev/null
@@ -1,14 +0,0 @@
-FROM archlinux/base:latest
-
-RUN pacman -Syyu --noconfirm
-RUN pacman --noconfirm --needed -S python-pip
-
-RUN pip install paho-mqtt
-
-RUN mkdir -p /opt/paho
-COPY paho-inspection.py /opt/paho/
-
-WORKDIR /opt/paho
-ARG HOST_HOSTNAME
-RUN echo $HOST_HOSTNAME >host
-CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"]
diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc
index f6638aa5f..f6f15ffb2 100644
--- a/aclk/schema-wrappers/node_info.cc
+++ b/aclk/schema-wrappers/node_info.cc
@@ -62,6 +62,10 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl
if (data->machine_guid)
info->set_machine_guid(data->machine_guid);
+ nodeinstance::info::v1::MachineLearningInfo *ml_info = info->mutable_ml_info();
+ ml_info->set_ml_capable(data->ml_info.ml_capable);
+ ml_info->set_ml_enabled(data->ml_info.ml_enabled);
+
map = info->mutable_host_labels();
label = data->host_labels_head;
while (label) {
@@ -86,6 +90,10 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in
msg.set_machine_guid(info->machine_guid);
msg.set_child(info->child);
+ nodeinstance::info::v1::MachineLearningInfo *ml_info = msg.mutable_ml_info();
+ ml_info->set_ml_capable(info->ml_info.ml_capable);
+ ml_info->set_ml_enabled(info->ml_info.ml_enabled);
+
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)malloc(*len);
if (bin)
diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h
index 4acb671a5..41daf94c8 100644
--- a/aclk/schema-wrappers/node_info.h
+++ b/aclk/schema-wrappers/node_info.h
@@ -11,6 +11,11 @@
extern "C" {
#endif
+struct machine_learning_info {
+ bool ml_capable;
+ bool ml_enabled;
+};
+
struct aclk_node_info {
char *name;
@@ -49,6 +54,8 @@ struct aclk_node_info {
char *machine_guid;
struct label *host_labels_head;
+
+ struct machine_learning_info ml_info;
};
struct update_node_info {
@@ -58,6 +65,8 @@ struct update_node_info {
struct timeval updated_at;
char *machine_guid;
int child;
+
+ struct machine_learning_info ml_info;
};
char *generate_update_node_info_message(size_t *len, struct update_node_info *info);