summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-07-20 04:50:01 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-07-20 04:50:01 +0000
commitcd4377fab21e0f500bef7f06543fa848a039c1e0 (patch)
treeba00a55e430c052d6bed0b61c0f8bbe8ebedd313 /aclk/aclk.c
parentReleasing debian version 1.40.1-1. (diff)
downloadnetdata-cd4377fab21e0f500bef7f06543fa848a039c1e0.tar.xz
netdata-cd4377fab21e0f500bef7f06543fa848a039c1e0.zip
Merging upstream version 1.41.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r--aclk/aclk.c217
1 files changed, 166 insertions, 51 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 399bc987..312db076 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -72,7 +72,7 @@ static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line)
if (!ssl_log_file)
ssl_log_file = fopen(ssl_log_filename, "a");
if (!ssl_log_file) {
- error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
+ netdata_log_error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
return;
}
fputs(line, ssl_log_file);
@@ -107,14 +107,14 @@ static int load_private_key()
long bytes_read;
char *private_key = read_by_filename(filename, &bytes_read);
if (!private_key) {
- error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
+ netdata_log_error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
return 1;
}
- debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
+ netdata_log_debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
BIO *key_bio = BIO_new_mem_buf(private_key, -1);
if (key_bio==NULL) {
- error("Claimed agent cannot establish ACLK - failed to create BIO for key");
+ netdata_log_error("Claimed agent cannot establish ACLK - failed to create BIO for key");
goto biofailed;
}
@@ -125,13 +125,13 @@ static int load_private_key()
NULL, NULL);
if (!aclk_dctx) {
- error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
+ netdata_log_error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
goto biofailed;
}
// this is necesseary to avoid RSA key with wrong size
if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
- error("Decoding private key (from claiming) failed - invalid format.");
+ netdata_log_error("Decoding private key (from claiming) failed - invalid format.");
goto biofailed;
}
#else
@@ -145,7 +145,7 @@ static int load_private_key()
}
char err[512];
ERR_error_string_n(ERR_get_error(), err, sizeof(err));
- error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
+ netdata_log_error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
biofailed:
freez(private_key);
@@ -154,8 +154,8 @@ biofailed:
static int wait_till_cloud_enabled()
{
- info("Waiting for Cloud to be enabled");
- while (!netdata_cloud_setting) {
+ netdata_log_info("Waiting for Cloud to be enabled");
+ while (!netdata_cloud_enabled) {
sleep_usec(USEC_PER_SEC * 1);
if (!service_running(SERVICE_ACLK))
return 1;
@@ -204,7 +204,7 @@ static int wait_till_agent_claim_ready()
// We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
- error("Do not move the cloud base url out of post_conf_load!!");
+ netdata_log_error("Do not move the cloud base url out of post_conf_load!!");
return 1;
}
@@ -212,7 +212,7 @@ static int wait_till_agent_claim_ready()
// TODO make it without malloc/free
memset(&url, 0, sizeof(url_t));
if (url_parse(cloud_base_url, &url)) {
- error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
+ netdata_log_error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
url_t_destroy(&url);
sleep(5);
continue;
@@ -237,13 +237,13 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
error_report("%s", str);
return;
case MQTT_WSS_LOG_INFO:
- info("%s", str);
+ netdata_log_info("%s", str);
return;
case MQTT_WSS_LOG_DEBUG:
- debug(D_ACLK, "%s", str);
+ netdata_log_debug(D_ACLK, "%s", str);
return;
default:
- error("Unknown log type from mqtt_wss");
+ netdata_log_error("Unknown log type from mqtt_wss");
}
}
@@ -252,10 +252,10 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
UNUSED(qos);
aclk_rcvd_cloud_msgs++;
- debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+ netdata_log_debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
- error("Link is shutting down. Ignoring incoming message.");
+ netdata_log_error("Link is shutting down. Ignoring incoming message.");
return;
}
@@ -277,7 +277,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
if(logfd < 0)
- error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+ netdata_log_error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
write(logfd, msg, msglen);
close(logfd);
#endif
@@ -297,7 +297,7 @@ static void puback_callback(uint16_t packet_id)
#endif
if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
- info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
+ netdata_log_info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
}
}
@@ -308,7 +308,7 @@ static int read_query_thread_count()
threads = MAX(threads, 2);
threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
if(threads < 1) {
- error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
+ netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
threads = 1;
config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
}
@@ -335,7 +335,7 @@ static int handle_connection(mqtt_wss_client client)
}
if (disconnect_req || aclk_kill_link) {
- info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
+ netdata_log_info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
disconnect_req ? "true" : "false",
aclk_kill_link ? "true" : "false");
disconnect_req = 0;
@@ -365,13 +365,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!topic)
- error("Unable to fetch topic for COMMAND (to subscribe)");
+ netdata_log_error("Unable to fetch topic for COMMAND (to subscribe)");
else
mqtt_wss_subscribe(client, topic, 1);
topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
if (!topic)
- error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ netdata_log_error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
else
mqtt_wss_subscribe(client, topic, 1);
@@ -390,7 +390,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
void aclk_graceful_disconnect(mqtt_wss_client client)
{
- info("Preparing to gracefully shutdown ACLK connection");
+ netdata_log_info("Preparing to gracefully shutdown ACLK connection");
aclk_queue_lock();
aclk_queue_flush();
@@ -399,21 +399,21 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
if (now_monotonic_sec() - t >= 2) {
- error("Wasn't able to gracefully shutdown ACLK in time!");
+ netdata_log_error("Wasn't able to gracefully shutdown ACLK in time!");
break;
}
if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
- info("MQTT App Layer `disconnect` message sent successfully");
+ netdata_log_info("MQTT App Layer `disconnect` message sent successfully");
break;
}
}
- info("ACLK link is down");
- log_access("ACLK DISCONNECTED");
+ netdata_log_info("ACLK link is down");
+ netdata_log_access("ACLK DISCONNECTED");
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
- info("Attempting to gracefully shutdown the MQTT/WSS connection");
+ netdata_log_info("Attempting to gracefully shutdown the MQTT/WSS connection");
mqtt_wss_disconnect(client, 1000);
}
@@ -455,7 +455,7 @@ static int aclk_block_till_recon_allowed() {
next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
- info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
+ netdata_log_info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
@@ -489,6 +489,74 @@ static int aclk_get_transport_idx(aclk_env_t *env) {
}
#endif
+ACLK_STATUS aclk_status = ACLK_STATUS_NONE;
+
+const char *aclk_status_to_string(void) {
+ switch(aclk_status) {
+ case ACLK_STATUS_CONNECTED:
+ return "connected";
+
+ case ACLK_STATUS_NONE:
+ return "none";
+
+ case ACLK_STATUS_DISABLED:
+ return "disabled";
+
+ case ACLK_STATUS_NO_CLOUD_URL:
+ return "no_cloud_url";
+
+ case ACLK_STATUS_INVALID_CLOUD_URL:
+ return "invalid_cloud_url";
+
+ case ACLK_STATUS_NOT_CLAIMED:
+ return "not_claimed";
+
+ case ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE:
+ return "env_endpoint_unreachable";
+
+ case ACLK_STATUS_ENV_RESPONSE_NOT_200:
+ return "env_response_not_200";
+
+ case ACLK_STATUS_ENV_RESPONSE_EMPTY:
+ return "env_response_empty";
+
+ case ACLK_STATUS_ENV_RESPONSE_NOT_JSON:
+ return "env_response_not_json";
+
+ case ACLK_STATUS_ENV_FAILED:
+ return "env_failed";
+
+ case ACLK_STATUS_BLOCKED:
+ return "blocked";
+
+ case ACLK_STATUS_NO_OLD_PROTOCOL:
+ return "no_old_protocol";
+
+ case ACLK_STATUS_NO_PROTOCOL_CAPABILITY:
+ return "no_protocol_capability";
+
+ case ACLK_STATUS_INVALID_ENV_AUTH_URL:
+ return "invalid_env_auth_url";
+
+ case ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX:
+ return "invalid_env_transport_idx";
+
+ case ACLK_STATUS_INVALID_ENV_TRANSPORT_URL:
+ return "invalid_env_transport_url";
+
+ case ACLK_STATUS_INVALID_OTP:
+ return "invalid_otp";
+
+ case ACLK_STATUS_NO_LWT_TOPIC:
+ return "no_lwt_topic";
+
+ default:
+ return "unknown";
+ }
+}
+
+const char *aclk_cloud_base_url = NULL;
+
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
* @return 0 - Successful Connection,
@@ -513,18 +581,22 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
#endif
while (service_running(SERVICE_ACLK)) {
- char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
- if (cloud_base_url == NULL) {
+ aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ if (aclk_cloud_base_url == NULL) {
error_report("Do not move the cloud base url out of post_conf_load!!");
+ aclk_status = ACLK_STATUS_NO_CLOUD_URL;
return -1;
}
- if (aclk_block_till_recon_allowed())
+ if (aclk_block_till_recon_allowed()) {
+ aclk_status = ACLK_STATUS_BLOCKED;
return 1;
+ }
- info("Attempting connection now");
+ netdata_log_info("Attempting connection now");
memset(&base_url, 0, sizeof(url_t));
- if (url_parse(cloud_base_url, &base_url)) {
+ if (url_parse(aclk_cloud_base_url, &base_url)) {
+ aclk_status = ACLK_STATUS_INVALID_CLOUD_URL;
error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
url_t_destroy(&base_url);
@@ -554,28 +626,65 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
url_t_destroy(&base_url);
- if (ret) {
- error_report("Failed to Get ACLK environment");
- // delay handled by aclk_block_till_recon_allowed
- continue;
+ if(ret) switch(ret) {
+ case 1:
+ aclk_status = ACLK_STATUS_NOT_CLAIMED;
+ error_report("Failed to Get ACLK environment (agent is not claimed)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 2:
+ aclk_status = ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE;
+ error_report("Failed to Get ACLK environment (cannot contact ENV endpoint)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 3:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_200;
+ error_report("Failed to Get ACLK environment (ENV response code is not 200)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 4:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_EMPTY;
+ error_report("Failed to Get ACLK environment (ENV response is empty)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ case 5:
+ aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_JSON;
+ error_report("Failed to Get ACLK environment (ENV response is not JSON)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
+
+ default:
+ aclk_status = ACLK_STATUS_ENV_FAILED;
+ error_report("Failed to Get ACLK environment (unknown error)");
+ // delay handled by aclk_block_till_recon_allowed
+ continue;
}
- if (!service_running(SERVICE_ACLK))
+ if (!service_running(SERVICE_ACLK)) {
+ aclk_status = ACLK_STATUS_DISABLED;
return 1;
+ }
if (aclk_env->encoding != ACLK_ENC_PROTO) {
+ aclk_status = ACLK_STATUS_NO_OLD_PROTOCOL;
error_report("This agent can only use the new cloud protocol but cloud requested old one.");
continue;
}
if (!aclk_env_has_capa("proto")) {
+ aclk_status = ACLK_STATUS_NO_PROTOCOL_CAPABILITY;
error_report("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
- info("New ACLK protobuf protocol negotiated successfully (/env response).");
+ netdata_log_info("New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
+ aclk_status = ACLK_STATUS_INVALID_ENV_AUTH_URL;
error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
url_t_destroy(&auth_url);
continue;
@@ -584,6 +693,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
url_t_destroy(&auth_url);
if (ret) {
+ aclk_status = ACLK_STATUS_INVALID_OTP;
error_report("Error passing Challenge/Response to get OTP");
continue;
}
@@ -593,6 +703,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
+ aclk_status = ACLK_STATUS_NO_LWT_TOPIC;
error_report("Couldn't get LWT topic. Will not send LWT.");
continue;
}
@@ -600,12 +711,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// Do the MQTT connection
ret = aclk_get_transport_idx(aclk_env);
if (ret < 0) {
+ aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX;
error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
continue;
}
memset(&mqtt_url, 0, sizeof(url_t));
if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
+ aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_URL;
error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
url_t_destroy(&mqtt_url);
continue;
@@ -637,8 +750,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
- info("ACLK connection successfully established");
- log_access("ACLK CONNECTED");
+ netdata_log_info("ACLK connection successfully established");
+ aclk_status = ACLK_STATUS_CONNECTED;
+ netdata_log_access("ACLK CONNECTED");
mqtt_connected_actions(client);
return 0;
}
@@ -646,6 +760,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
error_report("Connect failed");
}
+ aclk_status = ACLK_STATUS_DISABLED;
return 1;
}
@@ -671,7 +786,7 @@ void *aclk_main(void *ptr)
ACLK_PROXY_TYPE proxy_type;
aclk_get_proxy(&proxy_type);
if (proxy_type == PROXY_TYPE_SOCKS5) {
- error("SOCKS5 proxy is not supported by ACLK-NG yet.");
+ netdata_log_error("SOCKS5 proxy is not supported by ACLK-NG yet.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
}
@@ -683,7 +798,7 @@ void *aclk_main(void *ptr)
netdata_thread_disable_cancelability();
#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
- info("Killing ACLK thread -> cloud functionality has been disabled");
+ netdata_log_info("Killing ACLK thread -> cloud functionality has been disabled");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
#endif
@@ -696,7 +811,7 @@ void *aclk_main(void *ptr)
goto exit;
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
- error("Couldn't initialize MQTT_WSS network library");
+ netdata_log_error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -742,7 +857,7 @@ void *aclk_main(void *ptr)
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
- log_access("ACLK DISCONNECTED");
+ netdata_log_access("ACLK DISCONNECTED");
}
} while (service_running(SERVICE_ACLK));
@@ -791,7 +906,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
ret = get_node_id(&host->host_uuid, &node_id);
if (ret > 0) {
// this means we were not able to check if node_id already present
- error("Unable to check for node_id. Ignoring the host state update.");
+ netdata_log_error("Unable to check for node_id. Ignoring the host state update.");
return;
}
if (ret < 0) {
@@ -809,7 +924,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
rrdhost_aclk_state_unlock(localhost);
create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+ netdata_log_info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
aclk_queue_query(create_query);
return;
}
@@ -832,7 +947,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
+ netdata_log_info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
host->system_info->hops);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
@@ -875,7 +990,7 @@ void aclk_send_node_instances()
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
+ netdata_log_info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
@@ -899,7 +1014,7 @@ void aclk_send_node_instances()
node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
+ netdata_log_info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
freez((void *)node_instance_creation.machine_guid);
aclk_queue_query(create_query);