diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 133 |
1 files changed, 92 insertions, 41 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 3b035b849..e80897221 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -49,6 +49,8 @@ float last_backoff_value = 0; time_t aclk_block_until = 0; +int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload + #ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; @@ -61,6 +63,26 @@ struct aclk_shared_state aclk_shared_state = { .mqtt_shutdown_msg_rcvd = 0 }; +#ifdef MQTT_WSS_DEBUG +#include <openssl/ssl.h> +#define DEFAULT_SSKEYLOGFILE_NAME "SSLKEYLOGFILE" +const char *ssl_log_filename = NULL; +FILE *ssl_log_file = NULL; +static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line) +{ + (void)ssl; + if (!ssl_log_file) + ssl_log_file = fopen(ssl_log_filename, "a"); + if (!ssl_log_file) { + error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename); + return; + } + fputs(line, ssl_log_file); + putc('\n', ssl_log_file); + fflush(ssl_log_file); +} +#endif + #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 OSSL_DECODER_CTX *aclk_dctx = NULL; EVP_PKEY *aclk_private_key = NULL; @@ -137,7 +159,7 @@ static int wait_till_cloud_enabled() info("Waiting for Cloud to be enabled"); while (!netdata_cloud_setting) { sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; } return 0; @@ -156,7 +178,7 @@ static int wait_till_agent_claimed(void) char *agent_id = get_agent_claimid(); while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; agent_id = get_agent_claimid(); } @@ -176,7 +198,7 @@ static int wait_till_agent_claimed(void) static int wait_till_agent_claim_ready() { url_t url; - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { if (wait_till_agent_claimed()) return 1; @@ -288,7 +310,7 @@ static void puback_callback(uint16_t packet_id) static int read_query_thread_count() { - int threads = MIN(processors/2, 6); + int threads = MIN(get_netdata_cpus()/2, 6); threads = MAX(threads, 2); threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); if(threads < 1) { @@ -310,7 +332,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client); static int handle_connection(mqtt_wss_client client) { time_t last_periodic_query_wakeup = now_monotonic_sec(); - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { // timeout 1000 to check at least once a second // for netdata_exit if (mqtt_wss_service(client, 1000) < 0){ @@ -365,6 +387,10 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; + aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER; + while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL) + mqtt_wss_set_topic_alias(client, topic); + aclk_send_agent_connection_update(client, 1); } @@ -435,11 +461,11 @@ static int aclk_block_till_recon_allowed() { next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC); last_backoff_value = (float)recon_delay / MSEC_PER_SEC; - info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC); + info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC); // we want to wake up from time to time to check netdata_exit while (recon_delay) { - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; if (recon_delay > NETDATA_EXIT_POLL_MS) { sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS); @@ -449,7 +475,7 @@ static int aclk_block_till_recon_allowed() { sleep_usec(recon_delay * USEC_PER_MS); recon_delay = 0; } - return netdata_exit; + return !service_running(SERVICE_ACLK); } #ifndef ACLK_DISABLE_CHALLENGE @@ -492,7 +518,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - while (!netdata_exit) { + while (service_running(SERVICE_ACLK)) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { error_report("Do not move the cloud base url out of post_conf_load!!"); @@ -512,7 +538,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT }; - aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); + aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type); struct mqtt_connect_params mqtt_conn_params = { .clientid = "anon", @@ -540,7 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) continue; } - if (netdata_exit) + if (!service_running(SERVICE_ACLK)) return 1; if (aclk_env->encoding != ACLK_ENC_PROTO) { @@ -610,7 +636,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - freez((char *)mqtt_conn_params.will_msg); + freez((char*)mqtt_conn_params.will_msg); + freez((char*)proxy_conf.host); + freez((char*)proxy_conf.username); + freez((char*)proxy_conf.password); if (!ret) { last_conn_time_mqtt = now_realtime_sec(); @@ -672,11 +701,23 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { error("Couldn't initialize MQTT_WSS network library"); goto exit; } +#ifdef MQTT_WSS_DEBUG + size_t default_ssl_log_filename_size = strlen(netdata_configured_log_dir) + strlen(DEFAULT_SSKEYLOGFILE_NAME) + 2; + char *default_ssl_log_filename = mallocz(default_ssl_log_filename_size); + snprintfz(default_ssl_log_filename, default_ssl_log_filename_size, "%s/%s", netdata_configured_log_dir, DEFAULT_SSKEYLOGFILE_NAME); + ssl_log_filename = config_get(CONFIG_SECTION_CLOUD, "aclk ssl keylog file", default_ssl_log_filename); + freez(default_ssl_log_filename); + if (ssl_log_filename) { + error_report("SSLKEYLOGFILE active (path:\"%s\")!", ssl_log_filename); + mqtt_wss_set_SSL_CTX_keylog_cb(mqttwss_client, aclk_ssl_keylog_cb); + } +#endif + // Enable MQTT buffer growth if necessary // e.g. old cloud architecture clients with huge nodes // that send JSON payloads of 10 MB as single messages @@ -690,8 +731,8 @@ void *aclk_main(void *ptr) stats_thread->client = mqttwss_client; aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); netdata_thread_create( - stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, - stats_thread); + stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread); } // Keep reconnecting and talking until our time has come @@ -709,10 +750,15 @@ void *aclk_main(void *ptr) aclk_connected = 0; log_access("ACLK DISCONNECTED"); } - } while (!netdata_exit); + } while (service_running(SERVICE_ACLK)); aclk_graceful_disconnect(mqttwss_client); +#ifdef MQTT_WSS_DEBUG + if (ssl_log_file) + fclose(ssl_log_file); +#endif + exit_full: // Tear Down QUERY_THREAD_WAKEUP_ALL; @@ -739,35 +785,40 @@ exit: void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; - int ret; + int ret = 0; if (!aclk_connected) return; - ret = get_node_id(&host->host_uuid, &node_id); - if (ret > 0) { - // this means we were not able to check if node_id already present - error("Unable to check for node_id. Ignoring the host state update."); - return; + if (host->node_id && !uuid_is_null(*host->node_id)) { + uuid_copy(node_id, *host->node_id); } - if (ret < 0) { - // node_id not found - aclk_query_t create_query; - create_query = aclk_query_new(REGISTER_NODE); - rrdhost_aclk_state_lock(localhost); - node_instance_creation_t node_instance_creation = { - .claim_id = localhost->aclk_state.claimed_id, - .hops = host->system_info->hops, - .hostname = rrdhost_hostname(host), - .machine_guid = host->machine_guid - }; - create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); - rrdhost_aclk_state_unlock(localhost); - create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; - create_query->data.bin_payload.msg_name = "CreateNodeInstance"; - info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); - aclk_queue_query(create_query); - return; + else { + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + node_instance_creation_t node_instance_creation = { + .claim_id = localhost->aclk_state.claimed_id, + .hops = host->system_info->hops, + .hostname = rrdhost_hostname(host), + .machine_guid = host->machine_guid}; + create_query->data.bin_payload.payload = + generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); + rrdhost_aclk_state_unlock(localhost); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; + info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); + aclk_queue_query(create_query); + return; + } } aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); @@ -896,7 +947,7 @@ char *aclk_state(void) #ifndef ENABLE_ACLK return strdupz("ACLK Available: No"); #else - BUFFER *wb = buffer_create(1024); + BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk); struct tm *tmptr, tmbuf; char *ret; |