diff options
Diffstat (limited to 'src/aclk/aclk.c')
-rw-r--r-- | src/aclk/aclk.c | 345 |
1 files changed, 128 insertions, 217 deletions
diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c index 389d7455f..7bc620a61 100644 --- a/src/aclk/aclk.c +++ b/src/aclk/aclk.c @@ -2,8 +2,6 @@ #include "aclk.h" -#ifdef ENABLE_ACLK -#include "aclk_stats.h" #include "mqtt_websockets/mqtt_wss_client.h" #include "aclk_otp.h" #include "aclk_tx_msgs.h" @@ -14,7 +12,6 @@ #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" #include "aclk_capas.h" - #include "aclk_proxy.h" #ifdef ACLK_LOG_CONVERSATION_DIR @@ -23,20 +20,38 @@ #include <fcntl.h> #endif -#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable - -#endif /* ENABLE_ACLK */ - int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. int aclk_rcvd_cloud_msgs = 0; int aclk_connection_counter = 0; -int disconnect_req = 0; -int aclk_connected = 0; +static bool aclk_connected = false; +static inline void aclk_set_connected(void) { + __atomic_store_n(&aclk_connected, true, __ATOMIC_RELAXED); +} +static inline void aclk_set_disconnected(void) { + __atomic_store_n(&aclk_connected, false, __ATOMIC_RELAXED); +} + +inline bool aclk_online(void) { + return __atomic_load_n(&aclk_connected, __ATOMIC_RELAXED); +} + +bool aclk_online_for_contexts(void) { + return aclk_online() && aclk_query_scope_has(HTTP_ACL_METRICS); +} + +bool aclk_online_for_alerts(void) { + return aclk_online() && aclk_query_scope_has(HTTP_ACL_ALERTS); +} + +bool aclk_online_for_nodes(void) { + return aclk_online() && aclk_query_scope_has(HTTP_ACL_NODES); +} + int aclk_ctx_based = 0; int aclk_disable_runtime = 0; -int aclk_stats_enabled; -int aclk_kill_link = 0; + +ACLK_DISCONNECT_ACTION disconnect_req = ACLK_NO_DISCONNECT; usec_t aclk_session_us = 0; time_t aclk_session_sec = 0; @@ -49,13 +64,8 @@ float last_backoff_value = 0; time_t aclk_block_until = 0; -#ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; -//netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; -//#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) -//#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) - struct aclk_shared_state aclk_shared_state = { .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 @@ -152,19 +162,6 @@ biofailed: return 1; } -static int wait_till_cloud_enabled() -{ - nd_log(NDLS_DAEMON, NDLP_INFO, - "Waiting for Cloud to be enabled"); - - while (!netdata_cloud_enabled) { - sleep_usec(USEC_PER_SEC * 1); - if (!service_running(SERVICE_ACLK)) - return 1; - } - return 0; -} - /** * Will block until agent is claimed. Returns only if agent claimed * or if agent needs to shutdown. @@ -174,15 +171,13 @@ static int wait_till_cloud_enabled() */ static int wait_till_agent_claimed(void) { - //TODO prevent malloc and freez - char *agent_id = get_agent_claimid(); - while (likely(!agent_id)) { + ND_UUID uuid = claim_id_get_uuid(); + while (likely(UUIDiszero(uuid))) { sleep_usec(USEC_PER_SEC * 1); if (!service_running(SERVICE_ACLK)) return 1; - agent_id = get_agent_claimid(); + uuid = claim_id_get_uuid(); } - freez(agent_id); return 0; } @@ -204,9 +199,9 @@ static int wait_till_agent_claim_ready() // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. - char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + const char *cloud_base_url = cloud_config_url_get(); if (cloud_base_url == NULL) { - netdata_log_error("Do not move the cloud base url out of post_conf_load!!"); + netdata_log_error("Do not move the \"url\" out of post_conf_load!!"); return 1; } @@ -214,7 +209,7 @@ static int wait_till_agent_claim_ready() // TODO make it without malloc/free memset(&url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &url)) { - netdata_log_error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); + netdata_log_error("Agent is claimed but the URL in configuration key \"url\" is invalid, please fix"); url_t_destroy(&url); sleep(5); continue; @@ -230,30 +225,6 @@ static int wait_till_agent_claim_ready() return 1; } -void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) -{ - switch(log_type) { - case MQTT_WSS_LOG_ERROR: - case MQTT_WSS_LOG_FATAL: - nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str); - return; - - case MQTT_WSS_LOG_WARN: - nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str); - return; - - case MQTT_WSS_LOG_INFO: - nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str); - return; - - case MQTT_WSS_LOG_DEBUG: - return; - - default: - nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss"); - } -} - static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); @@ -299,9 +270,9 @@ static void puback_callback(uint16_t packet_id) aclk_tbeb_reset(); } -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_puback(packet_id); -#endif +//#ifdef NETDATA_INTERNAL_CHECKS +// aclk_stats_msg_puback(packet_id); +//#endif if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { nd_log(NDLS_DAEMON, NDLP_DEBUG, @@ -311,21 +282,9 @@ static void puback_callback(uint16_t packet_id) } } -static int read_query_thread_count() -{ - int threads = MIN(get_netdata_cpus()/2, 6); - threads = MAX(threads, 2); - threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); - if(threads < 1) { - netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); - threads = 1; - config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); - } - return threads; -} - void aclk_graceful_disconnect(mqtt_wss_client client); +bool schedule_node_update = false; /* Keeps connection alive and handles all network communications. * Returns on error or when netdata is shutting down. * @param client instance of mqtt_wss_client @@ -334,7 +293,6 @@ void aclk_graceful_disconnect(mqtt_wss_client client); */ static int handle_connection(mqtt_wss_client client) { - time_t last_periodic_query_wakeup = now_monotonic_sec(); while (service_running(SERVICE_ACLK)) { // timeout 1000 to check at least once a second // for netdata_exit @@ -343,30 +301,32 @@ static int handle_connection(mqtt_wss_client client) return 1; } - if (disconnect_req || aclk_kill_link) { - nd_log(NDLS_DAEMON, NDLP_NOTICE, - "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", - disconnect_req ? "true" : "false", - aclk_kill_link ? "true" : "false"); + if (disconnect_req != ACLK_NO_DISCONNECT) { + const char *reason; + switch (disconnect_req) { + case ACLK_CLOUD_DISCONNECT: + reason = "cloud request"; + break; + case ACLK_PING_TIMEOUT: + reason = "ping timeout"; + schedule_node_update = true; + break; + case ACLK_RELOAD_CONF: + reason = "reclaim"; + break; + default: + reason = "unknown"; + break; + } + + nd_log(NDLS_DAEMON, NDLP_NOTICE, "Going to restart connection due to \"%s\"", reason); - disconnect_req = 0; - aclk_kill_link = 0; + disconnect_req = ACLK_NO_DISCONNECT; aclk_graceful_disconnect(client); - aclk_queue_unlock(); aclk_shared_state.mqtt_shutdown_msg_id = -1; aclk_shared_state.mqtt_shutdown_msg_rcvd = 0; return 1; } - - // mqtt_wss_service will return faster than in one second - // if there is enough work to do - time_t now = now_monotonic_sec(); - if (last_periodic_query_wakeup < now) { - // wake up at least one Query Thread at least - // once per second - last_periodic_query_wakeup = now; - QUERY_THREAD_WAKEUP; - } } return 0; } @@ -386,13 +346,12 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); - aclk_stats_upd_online(1); - aclk_connected = 1; + aclk_set_connected(); aclk_pubacks_per_conn = 0; aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; - aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER; + size_t iter = 0; while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL) mqtt_wss_set_topic_alias(client, topic); @@ -404,9 +363,6 @@ void aclk_graceful_disconnect(mqtt_wss_client client) nd_log(NDLS_DAEMON, NDLP_DEBUG, "Preparing to gracefully shutdown ACLK connection"); - aclk_queue_lock(); - aclk_queue_flush(); - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); time_t t = now_monotonic_sec(); @@ -425,9 +381,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client) nd_log(NDLS_DAEMON, NDLP_WARNING, "ACLK link is down"); nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED"); - aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); - aclk_connected = 0; + aclk_set_disconnected(); nd_log(NDLS_DAEMON, NDLP_DEBUG, "Attempting to gracefully shutdown the MQTT/WSS connection"); @@ -602,9 +557,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) bool fallback_ipv4 = false; while (service_running(SERVICE_ACLK)) { - aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + aclk_cloud_base_url = cloud_config_url_get(); if (aclk_cloud_base_url == NULL) { - error_report("Do not move the cloud base url out of post_conf_load!!"); + error_report("Do not move the \"url\" out of post_conf_load!!"); aclk_status = ACLK_STATUS_NO_CLOUD_URL; return -1; } @@ -802,12 +757,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) */ void *aclk_main(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - - struct aclk_stats_thread *stats_thread = NULL; - - struct aclk_query_threads query_threads; - query_threads.thread_list = NULL; + struct netdata_static_thread *static_thread = ptr; ACLK_PROXY_TYPE proxy_type; aclk_get_proxy(&proxy_type); @@ -817,24 +767,12 @@ void *aclk_main(void *ptr) return NULL; } - unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers(); - -#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) - nd_log(NDLS_DAEMON, NDLP_INFO, - "Killing ACLK thread -> cloud functionality has been disabled"); - - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - return NULL; -#endif - query_threads.count = read_query_thread_count(); - - if (wait_till_cloud_enabled()) - goto exit; + aclk_init_rx_msg_handlers(); if (wait_till_agent_claim_ready()) goto exit; - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + if (!((mqttwss_client = mqtt_wss_new(msg_callback, puback_callback)))) { netdata_log_error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -856,28 +794,22 @@ void *aclk_main(void *ptr) // that send JSON payloads of 10 MB as single messages mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024); - aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled); - if (aclk_stats_enabled) { - stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); - stats_thread->query_thread_count = query_threads.count; - stats_thread->client = mqttwss_client; - aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); - stats_thread->thread = nd_thread_create("ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread); - } - // Keep reconnecting and talking until our time has come // and the Grim Reaper (netdata_exit) calls + netdata_log_info("Starting ACLK query event loop"); + aclk_query_init(mqttwss_client); do { if (aclk_attempt_to_connect(mqttwss_client)) goto exit_full; - if (unlikely(!query_threads.thread_list)) - aclk_query_threads_start(&query_threads, mqttwss_client); + if (schedule_node_update) { + schedule_node_state_update(localhost, 0); + schedule_node_update = false; + } if (handle_connection(mqttwss_client)) { - aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); - aclk_connected = 0; + aclk_set_disconnected(); nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED"); } } while (service_running(SERVICE_ACLK)); @@ -890,16 +822,6 @@ void *aclk_main(void *ptr) #endif exit_full: -// Tear Down - QUERY_THREAD_WAKEUP_ALL; - - aclk_query_threads_cleanup(&query_threads); - - if (aclk_stats_enabled) { - nd_thread_join(stats_thread->thread); - aclk_stats_thread_cleanup(); - freez(stats_thread); - } free_topic_cache(); mqtt_wss_destroy(mqttwss_client); exit: @@ -913,17 +835,16 @@ exit: void aclk_host_state_update(RRDHOST *host, int cmd, int queryable) { - nd_uuid_t node_id; - int ret = 0; + ND_UUID node_id; - if (!aclk_connected) + if (!aclk_online()) return; - if (host->node_id && !uuid_is_null(*host->node_id)) { - uuid_copy(node_id, *host->node_id); + if (!UUIDiszero(host->node_id)) { + node_id = host->node_id; } else { - ret = get_node_id(&host->host_uuid, &node_id); + int ret = get_node_id(&host->host_id.uuid, &node_id.uuid); if (ret > 0) { // this means we were not able to check if node_id already present netdata_log_error("Unable to check for node_id. Ignoring the host state update."); @@ -933,21 +854,23 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable) // node_id not found aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); - rrdhost_aclk_state_lock(localhost); + CLAIM_ID claim_id = claim_id_get(); + node_instance_creation_t node_instance_creation = { - .claim_id = localhost->aclk_state.claimed_id, + .claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL, .hops = host->system_info->hops, .hostname = rrdhost_hostname(host), .machine_guid = host->machine_guid}; + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); - rrdhost_aclk_state_unlock(localhost); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; create_query->data.bin_payload.msg_name = "CreateNodeInstance"; nd_log(NDLS_DAEMON, NDLP_DEBUG, "Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); - aclk_queue_query(create_query); + aclk_execute_query(create_query); return; } } @@ -960,14 +883,13 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable) .session_id = aclk_session_newarch }; node_state_update.node_id = mallocz(UUID_STR_LEN); - uuid_unparse_lower(node_id, (char*)node_state_update.node_id); + uuid_unparse_lower(node_id.uuid, (char*)node_state_update.node_id); node_state_update.capabilities = aclk_get_agent_capas(); - rrdhost_aclk_state_lock(localhost); - node_state_update.claim_id = localhost->aclk_state.claimed_id; + CLAIM_ID claim_id = claim_id_get(); + node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); - rrdhost_aclk_state_unlock(localhost); nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queuing status update for node=%s, live=%d, hops=%u, queryable=%d", @@ -975,7 +897,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd, int queryable) freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; - aclk_queue_query(query); + aclk_execute_query(query); } void aclk_send_node_instances() @@ -1009,10 +931,9 @@ void aclk_send_node_instances() } node_state_update.capabilities = aclk_get_node_instance_capas(host); - rrdhost_aclk_state_lock(localhost); - node_state_update.claim_id = localhost->aclk_state.claimed_id; + CLAIM_ID claim_id = claim_id_get(); + node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); - rrdhost_aclk_state_unlock(localhost); nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queuing status update for node=%s, live=%d, hops=%d, queryable=1", @@ -1022,7 +943,7 @@ void aclk_send_node_instances() freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; - aclk_queue_query(query); + aclk_execute_query(query); } else { aclk_query_t create_query; create_query = aclk_query_new(REGISTER_NODE); @@ -1034,17 +955,17 @@ void aclk_send_node_instances() uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; create_query->data.bin_payload.msg_name = "CreateNodeInstance"; - rrdhost_aclk_state_lock(localhost); - node_instance_creation.claim_id = localhost->aclk_state.claimed_id, + + CLAIM_ID claim_id = claim_id_get(); + node_instance_creation.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL, create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); - rrdhost_aclk_state_unlock(localhost); nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queuing registration for host=%s, hops=%d", (char*)node_instance_creation.machine_guid, list->hops); freez((void *)node_instance_creation.machine_guid); - aclk_queue_query(create_query); + aclk_execute_query(create_query); } freez(list->hostname); @@ -1089,38 +1010,37 @@ char *aclk_state(void) ); buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5); - char *agent_id = get_agent_claimid(); - if (agent_id == NULL) + CLAIM_ID claim_id = claim_id_get(); + if (!claim_id_is_set(claim_id)) buffer_strcat(wb, "No\n"); else { - char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); - buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null"); - freez(agent_id); + const char *cloud_base_url = cloud_config_url_get(); + buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", claim_id.str, cloud_base_url ? cloud_base_url : "null"); } - buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No"); - if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) { + buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_online() ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No"); + if (last_conn_time_mqtt && ((tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf); } - if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) { + if (last_conn_time_appl && ((tmptr = localtime_r(&last_conn_time_appl, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf); } - if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) { + if (last_disconnect_time && ((tmptr = localtime_r(&last_disconnect_time, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf); } - if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) { + if (!aclk_connected && next_connection_attempt && ((tmptr = localtime_r(&next_connection_attempt, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value); } - if (aclk_connected) { + if (aclk_online()) { buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); RRDHOST *host; @@ -1129,20 +1049,18 @@ char *aclk_state(void) buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host)); buffer_strcat(wb, "\tClaimed ID: "); - rrdhost_aclk_state_lock(host); - if (host->aclk_state.claimed_id) - buffer_strcat(wb, host->aclk_state.claimed_id); + claim_id = rrdhost_claim_id_get(host); + if(claim_id_is_set(claim_id)) + buffer_strcat(wb, claim_id.str); else buffer_strcat(wb, "null"); - rrdhost_aclk_state_unlock(host); - - if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + if (UUIDiszero(host->node_id)) buffer_strcat(wb, "\n\tNode ID: null\n"); - } else { - char node_id[GUID_LEN + 1]; - uuid_unparse_lower(*host->node_id, node_id); - buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id); + else { + char node_id_str[UUID_STR_LEN]; + uuid_unparse_lower(host->node_id.uuid, node_id_str); + buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id_str); } buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child"); @@ -1183,7 +1101,7 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) static json_object *timestamp_to_json(const time_t *t) { struct tm *tmptr, tmbuf; - if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) { + if (*t && ((tmptr = gmtime_r(t, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); return json_object_new_string(timebuf); @@ -1206,22 +1124,21 @@ char *aclk_state_json(void) json_object_array_add(grp, tmp); json_object_object_add(msg, "protocols-supported", grp); - char *agent_id = get_agent_claimid(); - tmp = json_object_new_boolean(agent_id != NULL); + CLAIM_ID claim_id = claim_id_get(); + tmp = json_object_new_boolean(claim_id_is_set(claim_id)); json_object_object_add(msg, "agent-claimed", tmp); - if (agent_id) { - tmp = json_object_new_string(agent_id); - freez(agent_id); - } else + if (claim_id_is_set(claim_id)) + tmp = json_object_new_string(claim_id.str); + else tmp = NULL; json_object_object_add(msg, "claimed-id", tmp); - char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + const char *cloud_base_url = cloud_config_url_get(); tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL; json_object_object_add(msg, "cloud-url", tmp); - tmp = json_object_new_boolean(aclk_connected); + tmp = json_object_new_boolean(aclk_online()); json_object_object_add(msg, "online", tmp); tmp = json_object_new_string("Protobuf"); @@ -1242,9 +1159,9 @@ char *aclk_state_json(void) json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt)); json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl)); json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time)); - json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL); + json_object_object_add(msg, "next-connection-attempt-utc", !aclk_online() ? timestamp_to_json(&next_connection_attempt) : NULL); tmp = NULL; - if (!aclk_connected && last_backoff_value) + if (!aclk_online() && last_backoff_value) tmp = json_object_new_double(last_backoff_value); json_object_object_add(msg, "last-backoff-value", tmp); @@ -1264,20 +1181,19 @@ char *aclk_state_json(void) tmp = json_object_new_string(host->machine_guid); json_object_object_add(nodeinstance, "mguid", tmp); - rrdhost_aclk_state_lock(host); - if (host->aclk_state.claimed_id) { - tmp = json_object_new_string(host->aclk_state.claimed_id); + claim_id = rrdhost_claim_id_get(host); + if(claim_id_is_set(claim_id)) { + tmp = json_object_new_string(claim_id.str); json_object_object_add(nodeinstance, "claimed_id", tmp); } else json_object_object_add(nodeinstance, "claimed_id", NULL); - rrdhost_aclk_state_unlock(host); - if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + if (UUIDiszero(host->node_id)) { json_object_object_add(nodeinstance, "node-id", NULL); } else { - char node_id[GUID_LEN + 1]; - uuid_unparse_lower(*host->node_id, node_id); - tmp = json_object_new_string(node_id); + char node_id_str[UUID_STR_LEN]; + uuid_unparse_lower(host->node_id.uuid, node_id_str); + tmp = json_object_new_string(node_id_str); json_object_object_add(nodeinstance, "node-id", tmp); } @@ -1303,12 +1219,10 @@ char *aclk_state_json(void) json_object_put(msg); return str; } -#endif /* ENABLE_ACLK */ void add_aclk_host_labels(void) { RRDLABELS *labels = localhost->rrdlabels; -#ifdef ENABLE_ACLK rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); ACLK_PROXY_TYPE aclk_proxy; char *proxy_str; @@ -1329,9 +1243,6 @@ void add_aclk_host_labels(void) { rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#else - rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#endif } void aclk_queue_node_info(RRDHOST *host, bool immediate) |