summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c133
1 files changed, 92 insertions, 41 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 3b035b849..e80897221 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -49,6 +49,8 @@ float last_backoff_value = 0;
time_t aclk_block_until = 0;
+int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
+
#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
@@ -61,6 +63,26 @@ struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_rcvd = 0
};
+#ifdef MQTT_WSS_DEBUG
+#include <openssl/ssl.h>
+#define DEFAULT_SSKEYLOGFILE_NAME "SSLKEYLOGFILE"
+const char *ssl_log_filename = NULL;
+FILE *ssl_log_file = NULL;
+static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line)
+{
+ (void)ssl;
+ if (!ssl_log_file)
+ ssl_log_file = fopen(ssl_log_filename, "a");
+ if (!ssl_log_file) {
+ error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
+ return;
+ }
+ fputs(line, ssl_log_file);
+ putc('\n', ssl_log_file);
+ fflush(ssl_log_file);
+}
+#endif
+
#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
OSSL_DECODER_CTX *aclk_dctx = NULL;
EVP_PKEY *aclk_private_key = NULL;
@@ -137,7 +159,7 @@ static int wait_till_cloud_enabled()
info("Waiting for Cloud to be enabled");
while (!netdata_cloud_setting) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
}
return 0;
@@ -156,7 +178,7 @@ static int wait_till_agent_claimed(void)
char *agent_id = get_agent_claimid();
while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
agent_id = get_agent_claimid();
}
@@ -176,7 +198,7 @@ static int wait_till_agent_claimed(void)
static int wait_till_agent_claim_ready()
{
url_t url;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
if (wait_till_agent_claimed())
return 1;
@@ -288,7 +310,7 @@ static void puback_callback(uint16_t packet_id)
static int read_query_thread_count()
{
- int threads = MIN(processors/2, 6);
+ int threads = MIN(get_netdata_cpus()/2, 6);
threads = MAX(threads, 2);
threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
if(threads < 1) {
@@ -310,7 +332,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client);
static int handle_connection(mqtt_wss_client client)
{
time_t last_periodic_query_wakeup = now_monotonic_sec();
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
@@ -365,6 +387,10 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_rcvd_cloud_msgs = 0;
aclk_connection_counter++;
+ aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
+ while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
+ mqtt_wss_set_topic_alias(client, topic);
+
aclk_send_agent_connection_update(client, 1);
}
@@ -435,11 +461,11 @@ static int aclk_block_till_recon_allowed() {
next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
- info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
+ info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (recon_delay > NETDATA_EXIT_POLL_MS) {
sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
@@ -449,7 +475,7 @@ static int aclk_block_till_recon_allowed() {
sleep_usec(recon_delay * USEC_PER_MS);
recon_delay = 0;
}
- return netdata_exit;
+ return !service_running(SERVICE_ACLK);
}
#ifndef ACLK_DISABLE_CHALLENGE
@@ -492,7 +518,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error_report("Do not move the cloud base url out of post_conf_load!!");
@@ -512,7 +538,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
- aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type);
struct mqtt_connect_params mqtt_conn_params = {
.clientid = "anon",
@@ -540,7 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
continue;
}
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (aclk_env->encoding != ACLK_ENC_PROTO) {
@@ -610,7 +636,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- freez((char *)mqtt_conn_params.will_msg);
+ freez((char*)mqtt_conn_params.will_msg);
+ freez((char*)proxy_conf.host);
+ freez((char*)proxy_conf.username);
+ freez((char*)proxy_conf.password);
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
@@ -672,11 +701,23 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
+#ifdef MQTT_WSS_DEBUG
+ size_t default_ssl_log_filename_size = strlen(netdata_configured_log_dir) + strlen(DEFAULT_SSKEYLOGFILE_NAME) + 2;
+ char *default_ssl_log_filename = mallocz(default_ssl_log_filename_size);
+ snprintfz(default_ssl_log_filename, default_ssl_log_filename_size, "%s/%s", netdata_configured_log_dir, DEFAULT_SSKEYLOGFILE_NAME);
+ ssl_log_filename = config_get(CONFIG_SECTION_CLOUD, "aclk ssl keylog file", default_ssl_log_filename);
+ freez(default_ssl_log_filename);
+ if (ssl_log_filename) {
+ error_report("SSLKEYLOGFILE active (path:\"%s\")!", ssl_log_filename);
+ mqtt_wss_set_SSL_CTX_keylog_cb(mqttwss_client, aclk_ssl_keylog_cb);
+ }
+#endif
+
// Enable MQTT buffer growth if necessary
// e.g. old cloud architecture clients with huge nodes
// that send JSON payloads of 10 MB as single messages
@@ -690,8 +731,8 @@ void *aclk_main(void *ptr)
stats_thread->client = mqttwss_client;
aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
netdata_thread_create(
- stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
- stats_thread);
+ stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread);
}
// Keep reconnecting and talking until our time has come
@@ -709,10 +750,15 @@ void *aclk_main(void *ptr)
aclk_connected = 0;
log_access("ACLK DISCONNECTED");
}
- } while (!netdata_exit);
+ } while (service_running(SERVICE_ACLK));
aclk_graceful_disconnect(mqttwss_client);
+#ifdef MQTT_WSS_DEBUG
+ if (ssl_log_file)
+ fclose(ssl_log_file);
+#endif
+
exit_full:
// Tear Down
QUERY_THREAD_WAKEUP_ALL;
@@ -739,35 +785,40 @@ exit:
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
- int ret;
+ int ret = 0;
if (!aclk_connected)
return;
- ret = get_node_id(&host->host_uuid, &node_id);
- if (ret > 0) {
- // this means we were not able to check if node_id already present
- error("Unable to check for node_id. Ignoring the host state update.");
- return;
+ if (host->node_id && !uuid_is_null(*host->node_id)) {
+ uuid_copy(node_id, *host->node_id);
}
- if (ret < 0) {
- // node_id not found
- aclk_query_t create_query;
- create_query = aclk_query_new(REGISTER_NODE);
- rrdhost_aclk_state_lock(localhost);
- node_instance_creation_t node_instance_creation = {
- .claim_id = localhost->aclk_state.claimed_id,
- .hops = host->system_info->hops,
- .hostname = rrdhost_hostname(host),
- .machine_guid = host->machine_guid
- };
- create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
- rrdhost_aclk_state_unlock(localhost);
- create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
- create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
- aclk_queue_query(create_query);
- return;
+ else {
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = rrdhost_hostname(host),
+ .machine_guid = host->machine_guid};
+ create_query->data.bin_payload.payload =
+ generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+ aclk_queue_query(create_query);
+ return;
+ }
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
@@ -896,7 +947,7 @@ char *aclk_state(void)
#ifndef ENABLE_ACLK
return strdupz("ACLK Available: No");
#else
- BUFFER *wb = buffer_create(1024);
+ BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
struct tm *tmptr, tmbuf;
char *ret;