summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-01 06:15:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-12-01 06:15:04 +0000
commite970e0b37b8bd7f246feb3f70c4136418225e434 (patch)
tree0b67c0ca45f56f2f9d9c5c2e725279ecdf52d2eb /aclk/aclk.c
parentAdding upstream version 1.31.0. (diff)
downloadnetdata-e970e0b37b8bd7f246feb3f70c4136418225e434.tar.xz
netdata-e970e0b37b8bd7f246feb3f70c4136418225e434.zip
Adding upstream version 1.32.0.upstream/1.32.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c459
1 files changed, 368 insertions, 91 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 35549cfea..a24d258c5 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -13,6 +13,8 @@
#include "aclk_collector_list.h"
#include "https_client.h"
+#include "aclk_proxy.h"
+
#ifdef ACLK_LOG_CONVERSATION_DIR
#include <sys/types.h>
#include <sys/stat.h>
@@ -21,20 +23,12 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
-//TODO remove most (as in 99.999999999%) of this crap
-int aclk_connected = 0;
-int aclk_disable_runtime = 0;
-int aclk_disable_single_updates = 0;
-int aclk_kill_link = 0;
-
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+int disconnect_req = 0;
-time_t aclk_block_until = 0;
+int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
-usec_t aclk_session_us = 0; // Used by the mqtt layer
-time_t aclk_session_sec = 0; // Used by the mqtt layer
-
-aclk_env_t *aclk_env = NULL;
+time_t aclk_block_until = 0;
mqtt_wss_client mqttwss_client;
@@ -43,22 +37,12 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
- .agent_state = AGENT_INITIALIZING,
+ .agent_state = ACLK_HOST_INITIALIZING,
.last_popcorn_interrupt = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
-void aclk_single_update_disable()
-{
- aclk_disable_single_updates = 1;
-}
-
-void aclk_single_update_enable()
-{
- aclk_disable_single_updates = 0;
-}
-
//ENDTODO
static RSA *aclk_private_key = NULL;
@@ -197,8 +181,9 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
//TODO prevent big buffer on stack
#define RX_MSGLEN_MAX 4096
-static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
+static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
{
+ UNUSED(qos);
char cmsg[RX_MSGLEN_MAX];
size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
@@ -233,13 +218,61 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
error("Received message on unexpected topic %s", topic);
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
- error("Link is shutting down. Ignoring message.");
+ error("Link is shutting down. Ignoring incoming message.");
return;
}
aclk_handle_cloud_message(cmsg);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
+{
+ UNUSED(qos);
+ if (msglen > RX_MSGLEN_MAX)
+ error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
+
+ debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+
+ if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+ error("Link is shutting down. Ignoring incoming message.");
+ return;
+ }
+
+ const char *msgtype = strrchr(topic, '/');
+ if (unlikely(!msgtype)) {
+ error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+ msgtype++;
+ if (unlikely(!*msgtype)) {
+ error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
+ return;
+ }
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+ char filename[FN_MAX_LEN];
+ int logfd;
+ snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
+ logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+ if(logfd < 0)
+ error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ write(logfd, msg, msglen);
+ close(logfd);
+#endif
+
+ aclk_handle_new_cloud_msg(msgtype, msg, msglen);
+}
+
+static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
+ if (aclk_use_new_cloud_arch)
+ msg_callback_new_protocol(topic, msg, msglen, qos);
+ else
+ msg_callback_old_protocol(topic, msg, msglen, qos);
+}
+#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
+
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
@@ -250,7 +283,7 @@ static void puback_callback(uint16_t packet_id)
#endif
if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
- error("Got PUBACK for shutdown message. Can exit gracefully.");
+ info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
}
}
@@ -268,6 +301,8 @@ static int read_query_thread_count()
return threads;
}
+void aclk_graceful_disconnect(mqtt_wss_client client);
+
/* Keeps connection alive and handles all network comms.
* Returns on error or when netdata is shutting down.
* @param client instance of mqtt_wss_client
@@ -281,7 +316,16 @@ static int handle_connection(mqtt_wss_client client)
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
- error("Connection Error or Dropped");
+ error_report("Connection Error or Dropped");
+ return 1;
+ }
+
+ if (disconnect_req) {
+ disconnect_req = 0;
+ aclk_graceful_disconnect(client);
+ aclk_queue_unlock();
+ aclk_shared_state.mqtt_shutdown_msg_id = -1;
+ aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
return 1;
}
@@ -298,10 +342,21 @@ static int handle_connection(mqtt_wss_client client)
return 0;
}
+inline static int aclk_popcorn_check()
+{
+ ACLK_SHARED_STATE_LOCK;
+ if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
+ ACLK_SHARED_STATE_UNLOCK;
+ return 1;
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+ return 0;
+}
+
inline static int aclk_popcorn_check_bump()
{
ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
ACLK_SHARED_STATE_UNLOCK;
return 1;
@@ -323,11 +378,6 @@ static inline void queue_connect_payloads(void)
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
- // TODO global vars?
- usec_t now = now_realtime_usec();
- aclk_session_sec = now / USEC_PER_SEC;
- aclk_session_us = now % USEC_PER_SEC;
-
const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
@@ -335,16 +385,34 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ if (aclk_use_new_cloud_arch) {
+ topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
+ }
+#endif
+
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
- error("Sending `connect` payload immediately as popcorning was finished already.");
- queue_connect_payloads();
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ if (!aclk_use_new_cloud_arch) {
+#endif
+ ACLK_SHARED_STATE_LOCK;
+ if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
+ error("Sending `connect` payload immediately as popcorning was finished already.");
+ queue_connect_payloads();
+ }
+ ACLK_SHARED_STATE_UNLOCK;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ } else {
+ aclk_send_agent_connection_update(client, 1);
}
- ACLK_SHARED_STATE_UNLOCK;
+#endif
}
/* Waits until agent is ready or needs to exit
@@ -354,29 +422,29 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
* @return 0 - Popcorning Finished - Agent STABLE,
* !0 - netdata_exit
*/
-static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads)
+static int wait_popcorning_finishes()
{
time_t elapsed;
int need_wait;
+ if (aclk_use_new_cloud_arch)
+ return 0;
+
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
- if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) {
+ if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
if (elapsed >= ACLK_STABLE_TIMEOUT) {
- aclk_shared_state.agent_state = AGENT_STABLE;
+ aclk_shared_state.agent_state = ACLK_HOST_STABLE;
ACLK_SHARED_STATE_UNLOCK;
- error("ACLK localhost popocorn finished");
- if (unlikely(!query_threads->thread_list))
- aclk_query_threads_start(query_threads, client);
- queue_connect_payloads();
+ error("ACLK localhost popcorn timer finished");
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
need_wait = ACLK_STABLE_TIMEOUT - elapsed;
- error("ACLK localhost popocorn wait %d seconds longer", need_wait);
+ error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait);
sleep(need_wait);
}
return 1;
@@ -384,10 +452,16 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th
void aclk_graceful_disconnect(mqtt_wss_client client)
{
- error("Preparing to Gracefully Shutdown the ACLK");
+ info("Preparing to gracefully shutdown ACLK connection");
aclk_queue_lock();
aclk_queue_flush();
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ if (aclk_use_new_cloud_arch)
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
+ else
+#endif
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
if (now_monotonic_sec() - t >= 2) {
@@ -395,14 +469,16 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
break;
}
if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
- error("MQTT App Layer `disconnect` message sent successfully");
+ info("MQTT App Layer `disconnect` message sent successfully");
break;
}
}
+ info("ACLK link is down");
+ log_access("ACLK DISCONNECTED");
aclk_stats_upd_online(0);
aclk_connected = 0;
- error("Attempting to Gracefully Shutdown MQTT/WSS connection");
+ info("Attempting to gracefully shutdown the MQTT/WSS connection");
mqtt_wss_disconnect(client, 1000);
}
@@ -433,7 +509,7 @@ static unsigned long aclk_reconnect_delay() {
return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
}
-/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled
+/* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
* @return 0 - Go ahead and connect (delay expired)
* 1 - netdata_exit
*/
@@ -455,7 +531,7 @@ static int aclk_block_till_recon_allowed() {
sleep_usec(recon_delay * USEC_PER_MS);
recon_delay = 0;
}
- return 0;
+ return netdata_exit;
}
#ifndef ACLK_DISABLE_CHALLENGE
@@ -477,7 +553,7 @@ static int aclk_get_transport_idx(aclk_env_t *env) {
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
- * @return 0 - Successfull Connection,
+ * @return 0 - Successful Connection,
* <0 - Irrecoverable Error -> Kill ACLK,
* >0 - netdata_exit
*/
@@ -498,7 +574,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- json_object *lwt;
+ json_object *lwt = NULL;
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
@@ -529,9 +605,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.will_topic = "lwt",
.will_msg = NULL,
.will_flags = MQTT_WSS_PUB_QOS2,
- .keep_alive = 60
+ .keep_alive = 60,
+ .drop_on_publish_fail = 1
};
+#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE)
+ aclk_use_new_cloud_arch = 1;
+ info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE.");
+#else
+ aclk_use_new_cloud_arch = 0;
+#endif
+
#ifndef ACLK_DISABLE_CHALLENGE
if (aclk_env) {
aclk_env_t_destroy(aclk_env);
@@ -547,6 +631,24 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
continue;
}
+ if (netdata_exit)
+ return 1;
+
+#ifndef ACLK_NEWARCH_DEVMODE
+ if (aclk_env->encoding == ACLK_ENC_PROTO) {
+#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+ error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
+ continue;
+#endif
+ if (!aclk_env_has_capa("proto")) {
+ error ("Can't encoding=proto without at least \"proto\" capability.");
+ continue;
+ }
+ info("Switching ACLK to new protobuf protocol. Due to /env response.");
+ aclk_use_new_cloud_arch = 1;
+ }
+#endif
+
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
@@ -563,7 +665,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// aclk_get_topic moved here as during OTP we
// generate the topic cache
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ if (aclk_use_new_cloud_arch)
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
+ else
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+
if (!mqtt_conn_params.will_topic) {
error("Couldn't get LWT topic. Will not send LWT.");
continue;
@@ -584,9 +690,21 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
#endif
- lwt = aclk_generate_disconnect(NULL);
- mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
- mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+ aclk_session_newarch = now_realtime_usec();
+ aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
+ aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ if (aclk_use_new_cloud_arch) {
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
+ } else {
+#endif
+ lwt = aclk_generate_disconnect(NULL);
+ mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
+ mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ }
+#endif
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -600,15 +718,19 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- json_object_put(lwt);
+ if (aclk_use_new_cloud_arch)
+ freez((char *)mqtt_conn_params.will_msg);
+ else
+ json_object_put(lwt);
if (!ret) {
- info("MQTTWSS connection succeeded");
+ info("ACLK connection successfully established");
+ log_access("ACLK CONNECTED");
mqtt_connected_actions(client);
return 0;
}
- error("Connect failed\n");
+ error_report("Connect failed");
}
return 1;
@@ -659,11 +781,20 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+#else
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) {
+#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
+ // Enable MQTT buffer growth if necessary
+ // e.g. old cloud architecture clients with huge nodes
+ // that send JSON payloads of 10 MB as single messages
+ mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
+
aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
if (aclk_stats_enabled) {
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
@@ -683,12 +814,19 @@ void *aclk_main(void *ptr)
// warning this assumes the popcorning is relative short (3s)
// if that changes call mqtt_wss_service from within
// to keep OpenSSL, WSS and MQTT connection alive
- if (wait_popcorning_finishes(mqttwss_client, &query_threads))
+ if (wait_popcorning_finishes())
goto exit_full;
+
+ if (unlikely(!query_threads.thread_list))
+ aclk_query_threads_start(&query_threads, mqttwss_client);
+
+ if (!aclk_use_new_cloud_arch)
+ queue_connect_payloads();
- if (!handle_connection(mqttwss_client)) {
+ if (handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
aclk_connected = 0;
+ log_access("ACLK DISCONNECTED");
}
} while (!netdata_exit);
@@ -721,10 +859,10 @@ exit:
// fix this in both old and new ACLK
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
-void aclk_alarm_reload(void)
+void ng_aclk_alarm_reload(void)
{
ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return;
}
@@ -733,7 +871,7 @@ void aclk_alarm_reload(void)
aclk_queue_query(aclk_query_new(METADATA_ALARMS));
}
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
+int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer;
json_object *msg;
@@ -742,7 +880,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
+ if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
@@ -764,11 +902,11 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
}
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
+int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
struct aclk_query *query;
- if (aclk_popcorn_check_bump())
+ if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check())
return 0;
query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
@@ -788,11 +926,11 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
* Add a new collector to the list
* If it exists, update the chart count
*/
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -831,11 +969,11 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
* This function will release the memory used and schedule
* a cloud update
*/
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
- if (unlikely(!netdata_ready)) {
+ if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
@@ -872,26 +1010,165 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
aclk_queue_query(query);
}
-struct label *add_aclk_host_labels(struct label *label) {
-#ifdef ENABLE_ACLK
- ACLK_PROXY_TYPE aclk_proxy;
- char *proxy_str;
- aclk_get_proxy(&aclk_proxy);
+void ng_aclk_host_state_update(RRDHOST *host, int cmd)
+{
+ uuid_t node_id;
+ int ret;
- switch(aclk_proxy) {
- case PROXY_TYPE_SOCKS5:
- proxy_str = "SOCKS5";
- break;
- case PROXY_TYPE_HTTP:
- proxy_str = "HTTP";
- break;
- default:
- proxy_str = "none";
- break;
+ if (!aclk_connected || !aclk_use_new_cloud_arch)
+ return;
+
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = (uint32_t) host->system_info->hops;
+ create_query->data.node_creation.hostname = strdupz(host->hostname);
+ create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+ info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
+ aclk_queue_query(create_query);
+ return;
}
- label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
- return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
+
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ query->data.node_update.hops = (uint32_t) host->system_info->hops;
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = cmd;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd,
+ host->system_info->hops);
+ aclk_queue_query(query);
+}
+
+void aclk_send_node_instances()
+{
+ struct node_instance_list *list_head = get_node_list();
+ struct node_instance_list *list = list_head;
+ if (unlikely(!list)) {
+ error_report("Failure to get_node_list from DB!");
+ return;
+ }
+ while (!uuid_is_null(list->host_id)) {
+ if (!uuid_is_null(list->node_id)) {
+ aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+ rrdhost_aclk_state_lock(localhost);
+ query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ query->data.node_update.live = list->live;
+ query->data.node_update.hops = list->hops;
+ query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
+ query->data.node_update.queryable = 1;
+ query->data.node_update.session_id = aclk_session_newarch;
+ info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
+ list->live,
+ list->hops);
+ aclk_queue_query(query);
+ } else {
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.node_creation.hops = list->hops;
+ create_query->data.node_creation.hostname = list->hostname;
+ create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
+ uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
+ info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid,
+ list->hops);
+ aclk_queue_query(create_query);
+ }
+
+ list++;
+ }
+ freez(list_head);
+}
+
+void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
+{
+ aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
+}
+
+char *ng_aclk_state(void)
+{
+ BUFFER *wb = buffer_create(1024);
+ char *ret;
+
+ buffer_strcat(wb,
+ "ACLK Available: Yes\n"
+ "ACLK Implementation: Next Generation\n"
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ "New Cloud Protocol Support: Yes\n"
#else
- return label;
+ "New Cloud Protocol Support: No\n"
#endif
+ "Claimed: "
+ );
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
+ buffer_strcat(wb, "No\n");
+ else {
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
+ freez(agent_id);
+ }
+
+ buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy");
+
+ ret = strdupz(buffer_tostring(wb));
+ buffer_free(wb);
+ return ret;
+}
+
+char *ng_aclk_state_json(void)
+{
+ json_object *tmp, *msg = json_object_new_object();
+
+ tmp = json_object_new_boolean(1);
+ json_object_object_add(msg, "aclk-available", tmp);
+
+ tmp = json_object_new_string("Next Generation");
+ json_object_object_add(msg, "aclk-implementation", tmp);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ tmp = json_object_new_boolean(1);
+#else
+ tmp = json_object_new_boolean(0);
+#endif
+ json_object_object_add(msg, "new-cloud-protocol-supported", tmp);
+
+ char *agent_id = is_agent_claimed();
+ tmp = json_object_new_boolean(agent_id != NULL);
+ json_object_object_add(msg, "agent-claimed", tmp);
+
+ if (agent_id) {
+ tmp = json_object_new_string(agent_id);
+ freez(agent_id);
+ } else
+ tmp = NULL;
+ json_object_object_add(msg, "claimed-id", tmp);
+
+ tmp = json_object_new_boolean(aclk_connected);
+ json_object_object_add(msg, "online", tmp);
+
+ tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy");
+ json_object_object_add(msg, "used-cloud-protocol", tmp);
+
+ char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
+ json_object_put(msg);
+ return str;
}