diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-12-01 06:15:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-12-01 06:15:04 +0000 |
commit | e970e0b37b8bd7f246feb3f70c4136418225e434 (patch) | |
tree | 0b67c0ca45f56f2f9d9c5c2e725279ecdf52d2eb /aclk/aclk.c | |
parent | Adding upstream version 1.31.0. (diff) | |
download | netdata-e970e0b37b8bd7f246feb3f70c4136418225e434.tar.xz netdata-e970e0b37b8bd7f246feb3f70c4136418225e434.zip |
Adding upstream version 1.32.0.upstream/1.32.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 459 |
1 files changed, 368 insertions, 91 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 35549cfea..a24d258c5 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -13,6 +13,8 @@ #include "aclk_collector_list.h" #include "https_client.h" +#include "aclk_proxy.h" + #ifdef ACLK_LOG_CONVERSATION_DIR #include <sys/types.h> #include <sys/stat.h> @@ -21,20 +23,12 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable -//TODO remove most (as in 99.999999999%) of this crap -int aclk_connected = 0; -int aclk_disable_runtime = 0; -int aclk_disable_single_updates = 0; -int aclk_kill_link = 0; - int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +int disconnect_req = 0; -time_t aclk_block_until = 0; +int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload -usec_t aclk_session_us = 0; // Used by the mqtt layer -time_t aclk_session_sec = 0; // Used by the mqtt layer - -aclk_env_t *aclk_env = NULL; +time_t aclk_block_until = 0; mqtt_wss_client mqttwss_client; @@ -43,22 +37,12 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { - .agent_state = AGENT_INITIALIZING, + .agent_state = ACLK_HOST_INITIALIZING, .last_popcorn_interrupt = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} - //ENDTODO static RSA *aclk_private_key = NULL; @@ -197,8 +181,9 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) //TODO prevent big buffer on stack #define RX_MSGLEN_MAX 4096 -static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) +static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) { + UNUSED(qos); char cmsg[RX_MSGLEN_MAX]; size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -233,13 +218,61 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int error("Received message on unexpected topic %s", topic); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring message."); + error("Link is shutting down. Ignoring incoming message."); return; } aclk_handle_cloud_message(cmsg); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) +{ + UNUSED(qos); + if (msglen > RX_MSGLEN_MAX) + error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); + + debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); + + if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { + error("Link is shutting down. Ignoring incoming message."); + return; + } + + const char *msgtype = strrchr(topic, '/'); + if (unlikely(!msgtype)) { + error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic); + return; + } + msgtype++; + if (unlikely(!*msgtype)) { + error_report("Message type empty. Ignoring message from topic \"%s\"", topic); + return; + } + +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 512 + char filename[FN_MAX_LEN]; + int logfd; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype); + logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); + if(logfd < 0) + error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + write(logfd, msg, msglen); + close(logfd); +#endif + + aclk_handle_new_cloud_msg(msgtype, msg, msglen); +} + +static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { + if (aclk_use_new_cloud_arch) + msg_callback_new_protocol(topic, msg, msglen, qos); + else + msg_callback_old_protocol(topic, msg, msglen, qos); +} +#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ + static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) @@ -250,7 +283,7 @@ static void puback_callback(uint16_t packet_id) #endif if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { - error("Got PUBACK for shutdown message. Can exit gracefully."); + info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; } } @@ -268,6 +301,8 @@ static int read_query_thread_count() return threads; } +void aclk_graceful_disconnect(mqtt_wss_client client); + /* Keeps connection alive and handles all network comms. * Returns on error or when netdata is shutting down. * @param client instance of mqtt_wss_client @@ -281,7 +316,16 @@ static int handle_connection(mqtt_wss_client client) // timeout 1000 to check at least once a second // for netdata_exit if (mqtt_wss_service(client, 1000) < 0){ - error("Connection Error or Dropped"); + error_report("Connection Error or Dropped"); + return 1; + } + + if (disconnect_req) { + disconnect_req = 0; + aclk_graceful_disconnect(client); + aclk_queue_unlock(); + aclk_shared_state.mqtt_shutdown_msg_id = -1; + aclk_shared_state.mqtt_shutdown_msg_rcvd = 0; return 1; } @@ -298,10 +342,21 @@ static int handle_connection(mqtt_wss_client client) return 0; } +inline static int aclk_popcorn_check() +{ + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; + return 1; + } + ACLK_SHARED_STATE_UNLOCK; + return 0; +} + inline static int aclk_popcorn_check_bump() { ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); ACLK_SHARED_STATE_UNLOCK; return 1; @@ -323,11 +378,6 @@ static inline void queue_connect_payloads(void) static inline void mqtt_connected_actions(mqtt_wss_client client) { - // TODO global vars? - usec_t now = now_realtime_usec(); - aclk_session_sec = now / USEC_PER_SEC; - aclk_session_us = now % USEC_PER_SEC; - const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) @@ -335,16 +385,34 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (aclk_use_new_cloud_arch) { + topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); + } +#endif + aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (!aclk_use_new_cloud_arch) { +#endif + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { + error("Sending `connect` payload immediately as popcorning was finished already."); + queue_connect_payloads(); + } + ACLK_SHARED_STATE_UNLOCK; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + } else { + aclk_send_agent_connection_update(client, 1); } - ACLK_SHARED_STATE_UNLOCK; +#endif } /* Waits until agent is ready or needs to exit @@ -354,29 +422,29 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) * @return 0 - Popcorning Finished - Agent STABLE, * !0 - netdata_exit */ -static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads) +static int wait_popcorning_finishes() { time_t elapsed; int need_wait; + if (aclk_use_new_cloud_arch) + return 0; + while (!netdata_exit) { ACLK_SHARED_STATE_LOCK; - if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) { + if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { ACLK_SHARED_STATE_UNLOCK; return 0; } elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; if (elapsed >= ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = AGENT_STABLE; + aclk_shared_state.agent_state = ACLK_HOST_STABLE; ACLK_SHARED_STATE_UNLOCK; - error("ACLK localhost popocorn finished"); - if (unlikely(!query_threads->thread_list)) - aclk_query_threads_start(query_threads, client); - queue_connect_payloads(); + error("ACLK localhost popcorn timer finished"); return 0; } ACLK_SHARED_STATE_UNLOCK; need_wait = ACLK_STABLE_TIMEOUT - elapsed; - error("ACLK localhost popocorn wait %d seconds longer", need_wait); + error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait); sleep(need_wait); } return 1; @@ -384,10 +452,16 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th void aclk_graceful_disconnect(mqtt_wss_client client) { - error("Preparing to Gracefully Shutdown the ACLK"); + info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (aclk_use_new_cloud_arch) + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); + else +#endif + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { if (now_monotonic_sec() - t >= 2) { @@ -395,14 +469,16 @@ void aclk_graceful_disconnect(mqtt_wss_client client) break; } if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { - error("MQTT App Layer `disconnect` message sent successfully"); + info("MQTT App Layer `disconnect` message sent successfully"); break; } } + info("ACLK link is down"); + log_access("ACLK DISCONNECTED"); aclk_stats_upd_online(0); aclk_connected = 0; - error("Attempting to Gracefully Shutdown MQTT/WSS connection"); + info("Attempting to gracefully shutdown the MQTT/WSS connection"); mqtt_wss_disconnect(client, 1000); } @@ -433,7 +509,7 @@ static unsigned long aclk_reconnect_delay() { return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s); } -/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled +/* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled * @return 0 - Go ahead and connect (delay expired) * 1 - netdata_exit */ @@ -455,7 +531,7 @@ static int aclk_block_till_recon_allowed() { sleep_usec(recon_delay * USEC_PER_MS); recon_delay = 0; } - return 0; + return netdata_exit; } #ifndef ACLK_DISABLE_CHALLENGE @@ -477,7 +553,7 @@ static int aclk_get_transport_idx(aclk_env_t *env) { /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client - * @return 0 - Successfull Connection, + * @return 0 - Successful Connection, * <0 - Irrecoverable Error -> Kill ACLK, * >0 - netdata_exit */ @@ -498,7 +574,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt; + json_object *lwt = NULL; while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); @@ -529,9 +605,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .will_topic = "lwt", .will_msg = NULL, .will_flags = MQTT_WSS_PUB_QOS2, - .keep_alive = 60 + .keep_alive = 60, + .drop_on_publish_fail = 1 }; +#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE) + aclk_use_new_cloud_arch = 1; + info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE."); +#else + aclk_use_new_cloud_arch = 0; +#endif + #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { aclk_env_t_destroy(aclk_env); @@ -547,6 +631,24 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) continue; } + if (netdata_exit) + return 1; + +#ifndef ACLK_NEWARCH_DEVMODE + if (aclk_env->encoding == ACLK_ENC_PROTO) { +#ifndef ENABLE_NEW_CLOUD_PROTOCOL + error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); + continue; +#endif + if (!aclk_env_has_capa("proto")) { + error ("Can't encoding=proto without at least \"proto\" capability."); + continue; + } + info("Switching ACLK to new protobuf protocol. Due to /env response."); + aclk_use_new_cloud_arch = 1; + } +#endif + memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); @@ -563,7 +665,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (aclk_use_new_cloud_arch) + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); + else + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); continue; @@ -584,9 +690,21 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } #endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); + aclk_session_newarch = now_realtime_usec(); + aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; + aclk_session_us = aclk_session_newarch % USEC_PER_SEC; + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (aclk_use_new_cloud_arch) { + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); + } else { +#endif + lwt = aclk_generate_disconnect(NULL); + mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); + mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + } +#endif #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -600,15 +718,19 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - json_object_put(lwt); + if (aclk_use_new_cloud_arch) + freez((char *)mqtt_conn_params.will_msg); + else + json_object_put(lwt); if (!ret) { - info("MQTTWSS connection succeeded"); + info("ACLK connection successfully established"); + log_access("ACLK CONNECTED"); mqtt_connected_actions(client); return 0; } - error("Connect failed\n"); + error_report("Connect failed"); } return 1; @@ -659,11 +781,20 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { +#else + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) { +#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } + // Enable MQTT buffer growth if necessary + // e.g. old cloud architecture clients with huge nodes + // that send JSON payloads of 10 MB as single messages + mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024); + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); if (aclk_stats_enabled) { stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); @@ -683,12 +814,19 @@ void *aclk_main(void *ptr) // warning this assumes the popcorning is relative short (3s) // if that changes call mqtt_wss_service from within // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes(mqttwss_client, &query_threads)) + if (wait_popcorning_finishes()) goto exit_full; + + if (unlikely(!query_threads.thread_list)) + aclk_query_threads_start(&query_threads, mqttwss_client); + + if (!aclk_use_new_cloud_arch) + queue_connect_payloads(); - if (!handle_connection(mqttwss_client)) { + if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); aclk_connected = 0; + log_access("ACLK DISCONNECTED"); } } while (!netdata_exit); @@ -721,10 +859,10 @@ exit: // fix this in both old and new ACLK extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); -void aclk_alarm_reload(void) +void ng_aclk_alarm_reload(void) { ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { ACLK_SHARED_STATE_UNLOCK; return; } @@ -733,7 +871,7 @@ void aclk_alarm_reload(void) aclk_queue_query(aclk_query_new(METADATA_ALARMS)); } -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) { BUFFER *local_buffer; json_object *msg; @@ -742,7 +880,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { ACLK_SHARED_STATE_UNLOCK; return 0; } @@ -764,11 +902,11 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; } -int aclk_update_chart(RRDHOST *host, char *chart_name, int create) +int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create) { struct aclk_query *query; - if (aclk_popcorn_check_bump()) + if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check()) return 0; query = aclk_query_new(create ? CHART_NEW : CHART_DEL); @@ -788,11 +926,11 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create) * Add a new collector to the list * If it exists, update the chart count */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -831,11 +969,11 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu * This function will release the memory used and schedule * a cloud update */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -872,26 +1010,165 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu aclk_queue_query(query); } -struct label *add_aclk_host_labels(struct label *label) { -#ifdef ENABLE_ACLK - ACLK_PROXY_TYPE aclk_proxy; - char *proxy_str; - aclk_get_proxy(&aclk_proxy); +void ng_aclk_host_state_update(RRDHOST *host, int cmd) +{ + uuid_t node_id; + int ret; - switch(aclk_proxy) { - case PROXY_TYPE_SOCKS5: - proxy_str = "SOCKS5"; - break; - case PROXY_TYPE_HTTP: - proxy_str = "HTTP"; - break; - default: - proxy_str = "none"; - break; + if (!aclk_connected || !aclk_use_new_cloud_arch) + return; + + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = (uint32_t) host->system_info->hops; + create_query->data.node_creation.hostname = strdupz(host->hostname); + create_query->data.node_creation.machine_guid = strdupz(host->machine_guid); + info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); + aclk_queue_query(create_query); + return; } - label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO); - return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = (uint32_t) host->system_info->hops; + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = cmd; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd, + host->system_info->hops); + aclk_queue_query(query); +} + +void aclk_send_node_instances() +{ + struct node_instance_list *list_head = get_node_list(); + struct node_instance_list *list = list_head; + if (unlikely(!list)) { + error_report("Failure to get_node_list from DB!"); + return; + } + while (!uuid_is_null(list->host_id)) { + if (!uuid_is_null(list->node_id)) { + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = list->live; + query->data.node_update.hops = list->hops; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id, + list->live, + list->hops); + aclk_queue_query(query); + } else { + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = list->hops; + create_query->data.node_creation.hostname = list->hostname; + create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid); + info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid, + list->hops); + aclk_queue_query(create_query); + } + + list++; + } + freez(list_head); +} + +void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +{ + aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); +} + +char *ng_aclk_state(void) +{ + BUFFER *wb = buffer_create(1024); + char *ret; + + buffer_strcat(wb, + "ACLK Available: Yes\n" + "ACLK Implementation: Next Generation\n" +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + "New Cloud Protocol Support: Yes\n" #else - return label; + "New Cloud Protocol Support: No\n" #endif + "Claimed: " + ); + + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + buffer_strcat(wb, "No\n"); + else { + buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + freez(agent_id); + } + + buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy"); + + ret = strdupz(buffer_tostring(wb)); + buffer_free(wb); + return ret; +} + +char *ng_aclk_state_json(void) +{ + json_object *tmp, *msg = json_object_new_object(); + + tmp = json_object_new_boolean(1); + json_object_object_add(msg, "aclk-available", tmp); + + tmp = json_object_new_string("Next Generation"); + json_object_object_add(msg, "aclk-implementation", tmp); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + tmp = json_object_new_boolean(1); +#else + tmp = json_object_new_boolean(0); +#endif + json_object_object_add(msg, "new-cloud-protocol-supported", tmp); + + char *agent_id = is_agent_claimed(); + tmp = json_object_new_boolean(agent_id != NULL); + json_object_object_add(msg, "agent-claimed", tmp); + + if (agent_id) { + tmp = json_object_new_string(agent_id); + freez(agent_id); + } else + tmp = NULL; + json_object_object_add(msg, "claimed-id", tmp); + + tmp = json_object_new_boolean(aclk_connected); + json_object_object_add(msg, "online", tmp); + + tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy"); + json_object_object_add(msg, "used-cloud-protocol", tmp); + + char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); + json_object_put(msg); + return str; } |