diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 495 |
1 files changed, 90 insertions, 405 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 6426c5b5e..7b3641b1e 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -10,7 +10,6 @@ #include "aclk_query_queue.h" #include "aclk_util.h" #include "aclk_rx_msgs.h" -#include "aclk_collector_list.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" @@ -46,17 +45,29 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { - .agent_state = ACLK_HOST_INITIALIZING, - .last_popcorn_interrupt = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +OSSL_DECODER_CTX *aclk_dctx = NULL; +EVP_PKEY *aclk_private_key = NULL; +#else static RSA *aclk_private_key = NULL; +#endif static int load_private_key() { - if (aclk_private_key != NULL) + if (aclk_private_key != NULL) { +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + EVP_PKEY_free(aclk_private_key); + if (aclk_dctx) + OSSL_DECODER_CTX_free(aclk_dctx); + + aclk_dctx = NULL; +#else RSA_free(aclk_private_key); +#endif + } aclk_private_key = NULL; char filename[FILENAME_MAX + 1]; snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); @@ -75,7 +86,25 @@ static int load_private_key() goto biofailed; } +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL, + "RSA", + OSSL_KEYMGMT_SELECT_PRIVATE_KEY, + NULL, NULL); + + if (!aclk_dctx) { + error("Loading private key (from claiming) failed - no OpenSSL Decoders found"); + goto biofailed; + } + + // this is necesseary to avoid RSA key with wrong size + if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) { + error("Decoding private key (from claiming) failed - invalid format."); + goto biofailed; + } +#else aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); +#endif BIO_free(key_bio); if (aclk_private_key!=NULL) { @@ -112,12 +141,12 @@ static int wait_till_cloud_enabled() static int wait_till_agent_claimed(void) { //TODO prevent malloc and freez - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) return 1; - agent_id = is_agent_claimed(); + agent_id = get_agent_claimid(); } freez(agent_id); return 0; @@ -188,54 +217,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) //TODO prevent big buffer on stack #define RX_MSGLEN_MAX 4096 -static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) -{ - UNUSED(qos); - char cmsg[RX_MSGLEN_MAX]; - size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); - const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); - if (!cmd_topic) { - error("Error retrieving command topic"); - return; - } - - if (msglen > RX_MSGLEN_MAX - 1) - error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); - - memcpy(cmsg, - msg, - len); - cmsg[len] = 0; - -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 512 - char filename[FN_MAX_LEN]; - int logfd; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT()); - logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); - if(logfd < 0) - error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); - write(logfd, msg, msglen); - close(logfd); -#endif - - debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg); - - if (strcmp(cmd_topic, topic)) - error("Received message on unexpected topic %s", topic); - - if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring incoming message."); - return; - } - - aclk_handle_cloud_cmd_message(cmsg); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); + aclk_rcvd_cloud_msgs++; if (msglen > RX_MSGLEN_MAX) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -269,17 +254,8 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t close(logfd); #endif - aclk_handle_new_cloud_msg(msgtype, msg, msglen); -} - -static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { - aclk_rcvd_cloud_msgs++; - if (aclk_use_new_cloud_arch) - msg_callback_new_protocol(topic, msg, msglen, qos); - else - msg_callback_old_protocol(topic, msg, msglen, qos); + aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic); } -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ static void puback_callback(uint16_t packet_id) { @@ -356,40 +332,6 @@ static int handle_connection(mqtt_wss_client client) return 0; } -inline static int aclk_popcorn_check() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -inline static int aclk_popcorn_check_bump() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -static inline void queue_connect_payloads(void) -{ - aclk_query_t query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; - query->data.metadata_info.initial_on_connect = 1; - aclk_queue_query(query); - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 1; - aclk_queue_query(query); -} - static inline void mqtt_connected_actions(mqtt_wss_client client) { char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -399,15 +341,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); - if (!topic) - error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); - else - mqtt_wss_subscribe(client, topic, 1); - } -#endif + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); aclk_stats_upd_online(1); aclk_connected = 1; @@ -415,55 +353,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!aclk_use_new_cloud_arch) { -#endif - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); - } - ACLK_SHARED_STATE_UNLOCK; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } else { - aclk_send_agent_connection_update(client, 1); - } -#endif -} - -/* Waits until agent is ready or needs to exit - * @param client instance of mqtt_wss_client - * @param query_threads pointer to aclk_query_threads - * structure where to store data about started query threads - * @return 0 - Popcorning Finished - Agent STABLE, - * !0 - netdata_exit - */ -static int wait_popcorning_finishes() -{ - time_t elapsed; - int need_wait; - if (aclk_use_new_cloud_arch) - return 0; - - while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; - if (elapsed >= ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = ACLK_HOST_STABLE; - ACLK_SHARED_STATE_UNLOCK; - error("ACLK localhost popcorn timer finished"); - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - need_wait = ACLK_STABLE_TIMEOUT - elapsed; - error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait); - sleep(need_wait); - } - return 1; + aclk_send_agent_connection_update(client, 1); } void aclk_graceful_disconnect(mqtt_wss_client client) @@ -471,12 +361,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client) info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); - else -#endif - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { @@ -594,8 +480,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt = NULL; - while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { @@ -629,8 +513,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .drop_on_publish_fail = 1 }; - aclk_use_new_cloud_arch = 0; - #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { aclk_env_t_destroy(aclk_env); @@ -649,20 +531,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (netdata_exit) return 1; - if (aclk_env->encoding == ACLK_ENC_PROTO) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); + if (aclk_env->encoding != ACLK_ENC_PROTO) { + error_report("This agent can only use the new cloud protocol but cloud requested old one."); continue; -#else - if (!aclk_env_has_capa("proto")) { - error ("Can't encoding=proto without at least \"proto\" capability."); - continue; - } - info("Switching ACLK to new protobuf protocol. Due to /env response."); - aclk_use_new_cloud_arch = 1; -#endif } + if (!aclk_env_has_capa("proto")) { + error ("Can't use encoding=proto without at least \"proto\" capability."); + continue; + } + info("New ACLK protobuf protocol negotiated successfully (/env response)."); + memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); @@ -679,10 +558,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - if (aclk_use_new_cloud_arch) - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); - else - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); @@ -708,17 +584,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; aclk_session_us = aclk_session_newarch % USEC_PER_SEC; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); - } else { -#endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } -#endif + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -732,10 +598,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - if (aclk_use_new_cloud_arch) - freez((char *)mqtt_conn_params.will_msg); - else - json_object_put(lwt); + freez((char *)mqtt_conn_params.will_msg); if (!ret) { last_conn_time_mqtt = now_realtime_sec(); @@ -778,10 +641,7 @@ void *aclk_main(void *ptr) return NULL; } - unsigned int proto_hdl_cnt; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - proto_hdl_cnt = aclk_init_rx_msg_handlers(); -#endif + unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers(); // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. @@ -792,7 +652,6 @@ void *aclk_main(void *ptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif - aclk_popcorn_check_bump(); // start localhost popcorn timer query_threads.count = read_query_thread_count(); if (wait_till_cloud_enabled()) @@ -801,13 +660,9 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); + use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { -#else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { -#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -822,6 +677,7 @@ void *aclk_main(void *ptr) stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; + stats_thread->client = mqttwss_client; aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); netdata_thread_create( stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, @@ -834,28 +690,9 @@ void *aclk_main(void *ptr) if (aclk_attempt_to_connect(mqttwss_client)) goto exit_full; -#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL) - error_report("############################ WARNING ###############################"); - error_report("# Your agent is configured to connect to cloud but has #"); - error_report("# no protobuf protocol support (uses legacy JSON protocol) #"); - error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #"); - error_report("# Visit following link for more info and instructions how to solve #"); - error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #"); - error_report("######################################################################"); -#endif - - // warning this assumes the popcorning is relative short (3s) - // if that changes call mqtt_wss_service from within - // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes()) - goto exit_full; - if (unlikely(!query_threads.thread_list)) aclk_query_threads_start(&query_threads, mqttwss_client); - if (!aclk_use_new_cloud_arch) - queue_connect_payloads(); - if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); @@ -889,168 +726,12 @@ exit: return NULL; } -// TODO this is taken over as workaround from old ACLK -// fix this in both old and new ACLK -extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); - -void aclk_alarm_reload(void) -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return; - } - ACLK_SHARED_STATE_UNLOCK; - - aclk_queue_query(aclk_query_new(METADATA_ALARMS)); -} - -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) -{ - BUFFER *local_buffer; - json_object *msg; - - if (host != localhost) - return 0; - - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - - netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - health_alarm_entry2json_nolock(local_buffer, ae, host); - netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - - msg = json_tokener_parse(local_buffer->buffer); - - struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE); - query->data.alarm_update = msg; - aclk_queue_query(query); - - buffer_free(local_buffer); - return 0; -} - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create) -{ - struct aclk_query *query; - - if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check()) - return 0; - - query = aclk_query_new(create ? CHART_NEW : CHART_DEL); - if(create) { - query->data.chart_add_del.host = host; - query->data.chart_add_del.chart_name = strdupz(chart_name); - } else { - query->data.metadata_info.host = host; - query->data.metadata_info.initial_on_connect = 0; - } - - aclk_queue_query(query); - return 0; -} - -/* - * Add a new collector to the list - * If it exists, update the chart count - */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(tmp_collector->count != 1)) { - COLLECTOR_UNLOCK; - return; - } - - COLLECTOR_UNLOCK; - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -/* - * Delete a collector from the list - * If the chart count reaches zero the collector will be removed - * from the list by calling del_collector. - * - * This function will release the memory used and schedule - * a cloud update - */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(!tmp_collector || tmp_collector->count)) { - COLLECTOR_UNLOCK; - return; - } - - debug( - D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", - tmp_collector->count); - - COLLECTOR_UNLOCK; - - _free_collector(tmp_collector); - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; int ret; - if (!aclk_connected || !aclk_use_new_cloud_arch) + if (!aclk_connected) return; ret = get_node_id(&host->host_uuid, &node_id); @@ -1088,6 +769,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd) }; node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(node_id, (char*)node_state_update.node_id); + + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -1120,6 +811,20 @@ void aclk_send_node_instances() }; node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id); + + char host_id[UUID_STR_LEN]; + uuid_unparse_lower(list->host_id, host_id); + + RRDHOST *host = rrdhost_find_by_guid(host_id, 0); + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -1157,14 +862,12 @@ void aclk_send_node_instances() } freez(list_head); } -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) { struct proto_alert_status status; @@ -1220,7 +923,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) ); freez(stats); } -#endif char *ng_aclk_state(void) { @@ -1231,15 +933,11 @@ char *ng_aclk_state(void) buffer_strcat(wb, "ACLK Available: Yes\n" "ACLK Version: 2\n" -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - "Protocols Supported: Legacy, Protobuf\n" -#else - "Protocols Supported: Legacy\n" -#endif + "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); if (agent_id == NULL) buffer_strcat(wb, "No\n"); else { @@ -1273,7 +971,6 @@ char *ng_aclk_state(void) if (aclk_connected) { buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL RRDHOST *host; rrd_rdlock(); rrdhost_foreach_read(host) { @@ -1308,7 +1005,6 @@ char *ng_aclk_state(void) fill_chart_status_for_host(wb, host); } rrd_unlock(); -#endif } ret = strdupz(buffer_tostring(wb)); @@ -1316,7 +1012,6 @@ char *ng_aclk_state(void) return ret; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { struct proto_alert_status status; @@ -1381,7 +1076,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) freez(stats); } -#endif static json_object *timestamp_to_json(const time_t *t) { @@ -1405,18 +1099,11 @@ char *ng_aclk_state_json(void) json_object_object_add(msg, "aclk-version", tmp); grp = json_object_new_array(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); tmp = json_object_new_string("Protobuf"); json_object_array_add(grp, tmp); -#else - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); -#endif json_object_object_add(msg, "protocols-supported", grp); - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); tmp = json_object_new_boolean(agent_id != NULL); json_object_object_add(msg, "agent-claimed", tmp); @@ -1434,7 +1121,7 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); @@ -1461,7 +1148,6 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_disable_runtime); json_object_object_add(msg, "banned-by-cloud", tmp); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL grp = json_object_new_array(); RRDHOST *host; @@ -1513,7 +1199,6 @@ char *ng_aclk_state_json(void) } rrd_unlock(); json_object_object_add(msg, "node-instances", grp); -#endif char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); |