summaryrefslogtreecommitdiffstats
path: root/src/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/aclk.c')
-rw-r--r--src/aclk/aclk.c345
1 files changed, 128 insertions, 217 deletions
diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c
index 389d7455f..7bc620a61 100644
--- a/src/aclk/aclk.c
+++ b/src/aclk/aclk.c
@@ -2,8 +2,6 @@
#include "aclk.h"
-#ifdef ENABLE_ACLK
-#include "aclk_stats.h"
#include "mqtt_websockets/mqtt_wss_client.h"
#include "aclk_otp.h"
#include "aclk_tx_msgs.h"
@@ -14,7 +12,6 @@
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
#include "aclk_capas.h"
-
#include "aclk_proxy.h"
#ifdef ACLK_LOG_CONVERSATION_DIR
@@ -23,20 +20,38 @@
#include <fcntl.h>
#endif
-#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
-
-#endif /* ENABLE_ACLK */
-
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
int aclk_rcvd_cloud_msgs = 0;
int aclk_connection_counter = 0;
-int disconnect_req = 0;
-int aclk_connected = 0;
+static bool aclk_connected = false;
+static inline void aclk_set_connected(void) {
+ __atomic_store_n(&aclk_connected, true, __ATOMIC_RELAXED);
+}
+static inline void aclk_set_disconnected(void) {
+ __atomic_store_n(&aclk_connected, false, __ATOMIC_RELAXED);
+}
+
+inline bool aclk_online(void) {
+ return __atomic_load_n(&aclk_connected, __ATOMIC_RELAXED);
+}
+
+bool aclk_online_for_contexts(void) {
+ return aclk_online() && aclk_query_scope_has(HTTP_ACL_METRICS);
+}
+
+bool aclk_online_for_alerts(void) {
+ return aclk_online() && aclk_query_scope_has(HTTP_ACL_ALERTS);
+}
+
+bool aclk_online_for_nodes(void) {
+ return aclk_online() && aclk_query_scope_has(HTTP_ACL_NODES);
+}
+
int aclk_ctx_based = 0;
int aclk_disable_runtime = 0;
-int aclk_stats_enabled;
-int aclk_kill_link = 0;
+
+ACLK_DISCONNECT_ACTION disconnect_req = ACLK_NO_DISCONNECT;
usec_t aclk_session_us = 0;
time_t aclk_session_sec = 0;
@@ -49,13 +64,8 @@ float last_backoff_value = 0;
time_t aclk_block_until = 0;
-#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
-//netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
-//#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
-//#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
-
struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
@@ -152,19 +162,6 @@ biofailed:
return 1;
}
-static int wait_till_cloud_enabled()
-{
- nd_log(NDLS_DAEMON, NDLP_INFO,
- "Waiting for Cloud to be enabled");
-
- while (!netdata_cloud_enabled) {
- sleep_usec(USEC_PER_SEC * 1);
- if (!service_running(SERVICE_ACLK))
- return 1;
- }
- return 0;
-}
-
/**
* Will block until agent is claimed. Returns only if agent claimed
* or if agent needs to shutdown.
@@ -174,15 +171,13 @@ static int wait_till_cloud_enabled()
*/
static int wait_till_agent_claimed(void)
{
- //TODO prevent malloc and freez
- char *agent_id = get_agent_claimid();
- while (likely(!agent_id)) {
+ ND_UUID uuid = claim_id_get_uuid();
+ while (likely(UUIDiszero(uuid))) {
sleep_usec(USEC_PER_SEC * 1);
if (!service_running(SERVICE_ACLK))
return 1;
- agent_id = get_agent_claimid();
+ uuid = claim_id_get_uuid();
}
- freez(agent_id);
return 0;
}
@@ -204,9 +199,9 @@ static int wait_till_agent_claim_ready()
// The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
// We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
- char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ const char *cloud_base_url = cloud_config_url_get();
if (cloud_base_url == NULL) {
- netdata_log_error("Do not move the cloud base url out of post_conf_load!!");
+ netdata_log_error("Do not move the \"url\" out of post_conf_load!!");
return 1;
}
@@ -214,7 +209,7 @@ static int wait_till_agent_claim_ready()
// TODO make it without malloc/free
memset(&url, 0, sizeof(url_t));
if (url_parse(cloud_base_url, &url)) {
- netdata_log_error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
+ netdata_log_error("Agent is claimed but the URL in configuration key \"url\" is invalid, please fix");
url_t_destroy(&url);
sleep(5);
continue;
@@ -230,30 +225,6 @@ static int wait_till_agent_claim_ready()
return 1;
}
-void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
-{
- switch(log_type) {
- case MQTT_WSS_LOG_ERROR:
- case MQTT_WSS_LOG_FATAL:
- nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str);
- return;
-
- case MQTT_WSS_LOG_WARN:
- nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str);
- return;
-
- case MQTT_WSS_LOG_INFO:
- nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str);
- return;
-
- case MQTT_WSS_LOG_DEBUG:
- return;
-
- default:
- nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss");
- }
-}
-
static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
@@ -299,9 +270,9 @@ static void puback_callback(uint16_t packet_id)
aclk_tbeb_reset();
}
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_puback(packet_id);
-#endif
+//#ifdef NETDATA_INTERNAL_CHECKS
+// aclk_stats_msg_puback(packet_id);
+//#endif
if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
nd_log(NDLS_DAEMON, NDLP_DEBUG,
@@ -311,21 +282,9 @@ static void puback_callback(uint16_t packet_id)
}
}
-static int read_query_thread_count()
-{
- int threads = MIN(get_netdata_cpus()/2, 6);
- threads = MAX(threads, 2);
- threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
- if(threads < 1) {
- netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
- threads = 1;
- config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
- }
- return threads;
-}
-
void aclk_graceful_disconnect(mqtt_wss_client client);
+bool schedule_node_update = false;
/* Keeps connection alive and handles all network communications.
* Returns on error or when netdata is shutting down.
* @param client instance of mqtt_wss_client
@@ -334,7 +293,6 @@ void aclk_graceful_disconnect(mqtt_wss_client client);
*/
static int handle_connection(mqtt_wss_client client)
{
- time_t last_periodic_query_wakeup = now_monotonic_sec();
while (service_running(SERVICE_ACLK)) {
// timeout 1000 to check at least once a second
// for netdata_exit
@@ -343,30 +301,32 @@ static int handle_connection(mqtt_wss_client client)
return 1;
}
- if (disconnect_req || aclk_kill_link) {
- nd_log(NDLS_DAEMON, NDLP_NOTICE,
- "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
- disconnect_req ? "true" : "false",
- aclk_kill_link ? "true" : "false");
+ if (disconnect_req != ACLK_NO_DISCONNECT) {
+ const char *reason;
+ switch (disconnect_req) {
+ case ACLK_CLOUD_DISCONNECT:
+ reason = "cloud request";
+ break;
+ case ACLK_PING_TIMEOUT:
+ reason = "ping timeout";
+ schedule_node_update = true;
+ break;
+ case ACLK_RELOAD_CONF:
+ reason = "reclaim";
+ break;
+ default:
+ reason = "unknown";
+ break;
+ }
+
+ nd_log(NDLS_DAEMON, NDLP_NOTICE, "Going to restart connection due to \"%s\"", reason);
- disconnect_req = 0;
- aclk_kill_link = 0;
+ disconnect_req = ACLK_NO_DISCONNECT;
aclk_graceful_disconnect(client);
- aclk_queue_unlock();
aclk_shared_state.mqtt_shutdown_msg_id = -1;
aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
return 1;
}
-
- // mqtt_wss_service will return faster than in one second
- // if there is enough work to do
- time_t now = now_monotonic_sec();
- if (last_periodic_query_wakeup < now) {
- // wake up at least one Query Thread at least
- // once per second
- last_periodic_query_wakeup = now;
- QUERY_THREAD_WAKEUP;
- }
}
return 0;
}
@@ -386,13 +346,12 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
- aclk_stats_upd_online(1);
- aclk_connected = 1;
+ aclk_set_connected();
aclk_pubacks_per_conn = 0;
aclk_rcvd_cloud_msgs = 0;
aclk_connection_counter++;
- aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
+ size_t iter = 0;
while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
mqtt_wss_set_topic_alias(client, topic);
@@ -404,9 +363,6 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Preparing to gracefully shutdown ACLK connection");
- aclk_queue_lock();
- aclk_queue_flush();
-
aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
time_t t = now_monotonic_sec();
@@ -425,9 +381,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
nd_log(NDLS_DAEMON, NDLP_WARNING, "ACLK link is down");
nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
- aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
- aclk_connected = 0;
+ aclk_set_disconnected();
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Attempting to gracefully shutdown the MQTT/WSS connection");
@@ -602,9 +557,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
bool fallback_ipv4 = false;
while (service_running(SERVICE_ACLK)) {
- aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ aclk_cloud_base_url = cloud_config_url_get();
if (aclk_cloud_base_url == NULL) {
- error_report("Do not move the cloud base url out of post_conf_load!!");
+ error_report("Do not move the \"url\" out of post_conf_load!!");
aclk_status = ACLK_STATUS_NO_CLOUD_URL;
return -1;
}
@@ -802,12 +757,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
*/
void *aclk_main(void *ptr)
{
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
-
- struct aclk_stats_thread *stats_thread = NULL;
-
- struct aclk_query_threads query_threads;
- query_threads.thread_list = NULL;
+ struct netdata_static_thread *static_thread = ptr;
ACLK_PROXY_TYPE proxy_type;
aclk_get_proxy(&proxy_type);
@@ -817,24 +767,12 @@ void *aclk_main(void *ptr)
return NULL;
}
- unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
-
-#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
- nd_log(NDLS_DAEMON, NDLP_INFO,
- "Killing ACLK thread -> cloud functionality has been disabled");
-
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- return NULL;
-#endif
- query_threads.count = read_query_thread_count();
-
- if (wait_till_cloud_enabled())
- goto exit;
+ aclk_init_rx_msg_handlers();
if (wait_till_agent_claim_ready())
goto exit;
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+ if (!((mqttwss_client = mqtt_wss_new(msg_callback, puback_callback)))) {
netdata_log_error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -856,28 +794,22 @@ void *aclk_main(void *ptr)
// that send JSON payloads of 10 MB as single messages
mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
- aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
- if (aclk_stats_enabled) {
- stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
- stats_thread->query_thread_count = query_threads.count;
- stats_thread->client = mqttwss_client;
- aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
- stats_thread->thread = nd_thread_create("ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread);
- }
-
// Keep reconnecting and talking until our time has come
// and the Grim Reaper (netdata_exit) calls
+ netdata_log_info("Starting ACLK query event loop");
+ aclk_query_init(mqttwss_client);
do {
if (aclk_attempt_to_connect(mqttwss_client))
goto exit_full;
- if (unlikely(!query_threads.thread_list))
- aclk_query_threads_start(&query_threads, mqttwss_client);
+ if (schedule_node_update) {
+ schedule_node_state_update(localhost, 0);
+ schedule_node_update = false;
+ }
if (handle_connection(mqttwss_client)) {
- aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
- aclk_connected = 0;
+ aclk_set_disconnected();
nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
}
} while (service_running(SERVICE_ACLK));
@@ -890,16 +822,6 @@ void *aclk_main(void *ptr)
#endif
exit_full:
-// Tear Down
- QUERY_THREAD_WAKEUP_ALL;
-
- aclk_query_threads_cleanup(&query_threads);
-
- if (aclk_stats_enabled) {
- nd_thread_join(stats_thread->thread);
- aclk_stats_thread_cleanup();
- freez(stats_thread);
- }
free_topic_cache();
mqtt_wss_destroy(mqttwss_client);
exit:
@@ -913,17 +835,16 @@ exit:
void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
{
- nd_uuid_t node_id;
- int ret = 0;
+ ND_UUID node_id;
- if (!aclk_connected)
+ if (!aclk_online())
return;
- if (host->node_id && !uuid_is_null(*host->node_id)) {
- uuid_copy(node_id, *host->node_id);
+ if (!UUIDiszero(host->node_id)) {
+ node_id = host->node_id;
}
else {
- ret = get_node_id(&host->host_uuid, &node_id);
+ int ret = get_node_id(&host->host_id.uuid, &node_id.uuid);
if (ret > 0) {
// this means we were not able to check if node_id already present
netdata_log_error("Unable to check for node_id. Ignoring the host state update.");
@@ -933,21 +854,23 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
// node_id not found
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
- rrdhost_aclk_state_lock(localhost);
+ CLAIM_ID claim_id = claim_id_get();
+
node_instance_creation_t node_instance_creation = {
- .claim_id = localhost->aclk_state.claimed_id,
+ .claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL,
.hops = host->system_info->hops,
.hostname = rrdhost_hostname(host),
.machine_guid = host->machine_guid};
+
create_query->data.bin_payload.payload =
generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
- rrdhost_aclk_state_unlock(localhost);
+
create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
create_query->data.bin_payload.msg_name = "CreateNodeInstance";
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
- aclk_queue_query(create_query);
+ aclk_execute_query(create_query);
return;
}
}
@@ -960,14 +883,13 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
.session_id = aclk_session_newarch
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
- uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
+ uuid_unparse_lower(node_id.uuid, (char*)node_state_update.node_id);
node_state_update.capabilities = aclk_get_agent_capas();
- rrdhost_aclk_state_lock(localhost);
- node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ CLAIM_ID claim_id = claim_id_get();
+ node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
- rrdhost_aclk_state_unlock(localhost);
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Queuing status update for node=%s, live=%d, hops=%u, queryable=%d",
@@ -975,7 +897,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
- aclk_queue_query(query);
+ aclk_execute_query(query);
}
void aclk_send_node_instances()
@@ -1009,10 +931,9 @@ void aclk_send_node_instances()
}
node_state_update.capabilities = aclk_get_node_instance_capas(host);
- rrdhost_aclk_state_lock(localhost);
- node_state_update.claim_id = localhost->aclk_state.claimed_id;
+ CLAIM_ID claim_id = claim_id_get();
+ node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
- rrdhost_aclk_state_unlock(localhost);
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Queuing status update for node=%s, live=%d, hops=%d, queryable=1",
@@ -1022,7 +943,7 @@ void aclk_send_node_instances()
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
- aclk_queue_query(query);
+ aclk_execute_query(query);
} else {
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
@@ -1034,17 +955,17 @@ void aclk_send_node_instances()
uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- rrdhost_aclk_state_lock(localhost);
- node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
+
+ CLAIM_ID claim_id = claim_id_get();
+ node_instance_creation.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL,
create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
- rrdhost_aclk_state_unlock(localhost);
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Queuing registration for host=%s, hops=%d",
(char*)node_instance_creation.machine_guid, list->hops);
freez((void *)node_instance_creation.machine_guid);
- aclk_queue_query(create_query);
+ aclk_execute_query(create_query);
}
freez(list->hostname);
@@ -1089,38 +1010,37 @@ char *aclk_state(void)
);
buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
- char *agent_id = get_agent_claimid();
- if (agent_id == NULL)
+ CLAIM_ID claim_id = claim_id_get();
+ if (!claim_id_is_set(claim_id))
buffer_strcat(wb, "No\n");
else {
- char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
- buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
- freez(agent_id);
+ const char *cloud_base_url = cloud_config_url_get();
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", claim_id.str, cloud_base_url ? cloud_base_url : "null");
}
- buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
- if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
+ buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_online() ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
+ if (last_conn_time_mqtt && ((tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
}
- if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
+ if (last_conn_time_appl && ((tmptr = localtime_r(&last_conn_time_appl, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
}
- if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
+ if (last_disconnect_time && ((tmptr = localtime_r(&last_disconnect_time, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
}
- if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
+ if (!aclk_connected && next_connection_attempt && ((tmptr = localtime_r(&next_connection_attempt, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
}
- if (aclk_connected) {
+ if (aclk_online()) {
buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
RRDHOST *host;
@@ -1129,20 +1049,18 @@ char *aclk_state(void)
buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
buffer_strcat(wb, "\tClaimed ID: ");
- rrdhost_aclk_state_lock(host);
- if (host->aclk_state.claimed_id)
- buffer_strcat(wb, host->aclk_state.claimed_id);
+ claim_id = rrdhost_claim_id_get(host);
+ if(claim_id_is_set(claim_id))
+ buffer_strcat(wb, claim_id.str);
else
buffer_strcat(wb, "null");
- rrdhost_aclk_state_unlock(host);
-
- if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ if (UUIDiszero(host->node_id))
buffer_strcat(wb, "\n\tNode ID: null\n");
- } else {
- char node_id[GUID_LEN + 1];
- uuid_unparse_lower(*host->node_id, node_id);
- buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
+ else {
+ char node_id_str[UUID_STR_LEN];
+ uuid_unparse_lower(host->node_id.uuid, node_id_str);
+ buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id_str);
}
buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
@@ -1183,7 +1101,7 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
static json_object *timestamp_to_json(const time_t *t)
{
struct tm *tmptr, tmbuf;
- if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
+ if (*t && ((tmptr = gmtime_r(t, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
return json_object_new_string(timebuf);
@@ -1206,22 +1124,21 @@ char *aclk_state_json(void)
json_object_array_add(grp, tmp);
json_object_object_add(msg, "protocols-supported", grp);
- char *agent_id = get_agent_claimid();
- tmp = json_object_new_boolean(agent_id != NULL);
+ CLAIM_ID claim_id = claim_id_get();
+ tmp = json_object_new_boolean(claim_id_is_set(claim_id));
json_object_object_add(msg, "agent-claimed", tmp);
- if (agent_id) {
- tmp = json_object_new_string(agent_id);
- freez(agent_id);
- } else
+ if (claim_id_is_set(claim_id))
+ tmp = json_object_new_string(claim_id.str);
+ else
tmp = NULL;
json_object_object_add(msg, "claimed-id", tmp);
- char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ const char *cloud_base_url = cloud_config_url_get();
tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
json_object_object_add(msg, "cloud-url", tmp);
- tmp = json_object_new_boolean(aclk_connected);
+ tmp = json_object_new_boolean(aclk_online());
json_object_object_add(msg, "online", tmp);
tmp = json_object_new_string("Protobuf");
@@ -1242,9 +1159,9 @@ char *aclk_state_json(void)
json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
- json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
+ json_object_object_add(msg, "next-connection-attempt-utc", !aclk_online() ? timestamp_to_json(&next_connection_attempt) : NULL);
tmp = NULL;
- if (!aclk_connected && last_backoff_value)
+ if (!aclk_online() && last_backoff_value)
tmp = json_object_new_double(last_backoff_value);
json_object_object_add(msg, "last-backoff-value", tmp);
@@ -1264,20 +1181,19 @@ char *aclk_state_json(void)
tmp = json_object_new_string(host->machine_guid);
json_object_object_add(nodeinstance, "mguid", tmp);
- rrdhost_aclk_state_lock(host);
- if (host->aclk_state.claimed_id) {
- tmp = json_object_new_string(host->aclk_state.claimed_id);
+ claim_id = rrdhost_claim_id_get(host);
+ if(claim_id_is_set(claim_id)) {
+ tmp = json_object_new_string(claim_id.str);
json_object_object_add(nodeinstance, "claimed_id", tmp);
} else
json_object_object_add(nodeinstance, "claimed_id", NULL);
- rrdhost_aclk_state_unlock(host);
- if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ if (UUIDiszero(host->node_id)) {
json_object_object_add(nodeinstance, "node-id", NULL);
} else {
- char node_id[GUID_LEN + 1];
- uuid_unparse_lower(*host->node_id, node_id);
- tmp = json_object_new_string(node_id);
+ char node_id_str[UUID_STR_LEN];
+ uuid_unparse_lower(host->node_id.uuid, node_id_str);
+ tmp = json_object_new_string(node_id_str);
json_object_object_add(nodeinstance, "node-id", tmp);
}
@@ -1303,12 +1219,10 @@ char *aclk_state_json(void)
json_object_put(msg);
return str;
}
-#endif /* ENABLE_ACLK */
void add_aclk_host_labels(void) {
RRDLABELS *labels = localhost->rrdlabels;
-#ifdef ENABLE_ACLK
rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
ACLK_PROXY_TYPE aclk_proxy;
char *proxy_str;
@@ -1329,9 +1243,6 @@ void add_aclk_host_labels(void) {
rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#else
- rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#endif
}
void aclk_queue_node_info(RRDHOST *host, bool immediate)