diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 217 |
1 files changed, 166 insertions, 51 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 399bc9876..312db076f 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -72,7 +72,7 @@ static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line) if (!ssl_log_file) ssl_log_file = fopen(ssl_log_filename, "a"); if (!ssl_log_file) { - error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename); + netdata_log_error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename); return; } fputs(line, ssl_log_file); @@ -107,14 +107,14 @@ static int load_private_key() long bytes_read; char *private_key = read_by_filename(filename, &bytes_read); if (!private_key) { - error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + netdata_log_error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); return 1; } - debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + netdata_log_debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); BIO *key_bio = BIO_new_mem_buf(private_key, -1); if (key_bio==NULL) { - error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + netdata_log_error("Claimed agent cannot establish ACLK - failed to create BIO for key"); goto biofailed; } @@ -125,13 +125,13 @@ static int load_private_key() NULL, NULL); if (!aclk_dctx) { - error("Loading private key (from claiming) failed - no OpenSSL Decoders found"); + netdata_log_error("Loading private key (from claiming) failed - no OpenSSL Decoders found"); goto biofailed; } // this is necesseary to avoid RSA key with wrong size if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) { - error("Decoding private key (from claiming) failed - invalid format."); + netdata_log_error("Decoding private key (from claiming) failed - invalid format."); goto biofailed; } #else @@ -145,7 +145,7 @@ static int load_private_key() } char err[512]; ERR_error_string_n(ERR_get_error(), err, sizeof(err)); - error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + netdata_log_error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); biofailed: freez(private_key); @@ -154,8 +154,8 @@ biofailed: static int wait_till_cloud_enabled() { - info("Waiting for Cloud to be enabled"); - while (!netdata_cloud_setting) { + netdata_log_info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_enabled) { sleep_usec(USEC_PER_SEC * 1); if (!service_running(SERVICE_ACLK)) return 1; @@ -204,7 +204,7 @@ static int wait_till_agent_claim_ready() // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { - error("Do not move the cloud base url out of post_conf_load!!"); + netdata_log_error("Do not move the cloud base url out of post_conf_load!!"); return 1; } @@ -212,7 +212,7 @@ static int wait_till_agent_claim_ready() // TODO make it without malloc/free memset(&url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &url)) { - error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); + netdata_log_error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); url_t_destroy(&url); sleep(5); continue; @@ -237,13 +237,13 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) error_report("%s", str); return; case MQTT_WSS_LOG_INFO: - info("%s", str); + netdata_log_info("%s", str); return; case MQTT_WSS_LOG_DEBUG: - debug(D_ACLK, "%s", str); + netdata_log_debug(D_ACLK, "%s", str); return; default: - error("Unknown log type from mqtt_wss"); + netdata_log_error("Unknown log type from mqtt_wss"); } } @@ -252,10 +252,10 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int UNUSED(qos); aclk_rcvd_cloud_msgs++; - debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); + netdata_log_debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring incoming message."); + netdata_log_error("Link is shutting down. Ignoring incoming message."); return; } @@ -277,7 +277,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype); logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); if(logfd < 0) - error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + netdata_log_error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); write(logfd, msg, msglen); close(logfd); #endif @@ -297,7 +297,7 @@ static void puback_callback(uint16_t packet_id) #endif if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { - info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); + netdata_log_info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; } } @@ -308,7 +308,7 @@ static int read_query_thread_count() threads = MAX(threads, 2); threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); if(threads < 1) { - error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); + netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); threads = 1; config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); } @@ -335,7 +335,7 @@ static int handle_connection(mqtt_wss_client client) } if (disconnect_req || aclk_kill_link) { - info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", + netdata_log_info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", disconnect_req ? "true" : "false", aclk_kill_link ? "true" : "false"); disconnect_req = 0; @@ -365,13 +365,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) - error("Unable to fetch topic for COMMAND (to subscribe)"); + netdata_log_error("Unable to fetch topic for COMMAND (to subscribe)"); else mqtt_wss_subscribe(client, topic, 1); topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); if (!topic) - error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + netdata_log_error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); else mqtt_wss_subscribe(client, topic, 1); @@ -390,7 +390,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) void aclk_graceful_disconnect(mqtt_wss_client client) { - info("Preparing to gracefully shutdown ACLK connection"); + netdata_log_info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); @@ -399,21 +399,21 @@ void aclk_graceful_disconnect(mqtt_wss_client client) time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { if (now_monotonic_sec() - t >= 2) { - error("Wasn't able to gracefully shutdown ACLK in time!"); + netdata_log_error("Wasn't able to gracefully shutdown ACLK in time!"); break; } if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { - info("MQTT App Layer `disconnect` message sent successfully"); + netdata_log_info("MQTT App Layer `disconnect` message sent successfully"); break; } } - info("ACLK link is down"); - log_access("ACLK DISCONNECTED"); + netdata_log_info("ACLK link is down"); + netdata_log_access("ACLK DISCONNECTED"); aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); aclk_connected = 0; - info("Attempting to gracefully shutdown the MQTT/WSS connection"); + netdata_log_info("Attempting to gracefully shutdown the MQTT/WSS connection"); mqtt_wss_disconnect(client, 1000); } @@ -455,7 +455,7 @@ static int aclk_block_till_recon_allowed() { next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC); last_backoff_value = (float)recon_delay / MSEC_PER_SEC; - info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC); + netdata_log_info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC); // we want to wake up from time to time to check netdata_exit while (recon_delay) { @@ -489,6 +489,74 @@ static int aclk_get_transport_idx(aclk_env_t *env) { } #endif +ACLK_STATUS aclk_status = ACLK_STATUS_NONE; + +const char *aclk_status_to_string(void) { + switch(aclk_status) { + case ACLK_STATUS_CONNECTED: + return "connected"; + + case ACLK_STATUS_NONE: + return "none"; + + case ACLK_STATUS_DISABLED: + return "disabled"; + + case ACLK_STATUS_NO_CLOUD_URL: + return "no_cloud_url"; + + case ACLK_STATUS_INVALID_CLOUD_URL: + return "invalid_cloud_url"; + + case ACLK_STATUS_NOT_CLAIMED: + return "not_claimed"; + + case ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE: + return "env_endpoint_unreachable"; + + case ACLK_STATUS_ENV_RESPONSE_NOT_200: + return "env_response_not_200"; + + case ACLK_STATUS_ENV_RESPONSE_EMPTY: + return "env_response_empty"; + + case ACLK_STATUS_ENV_RESPONSE_NOT_JSON: + return "env_response_not_json"; + + case ACLK_STATUS_ENV_FAILED: + return "env_failed"; + + case ACLK_STATUS_BLOCKED: + return "blocked"; + + case ACLK_STATUS_NO_OLD_PROTOCOL: + return "no_old_protocol"; + + case ACLK_STATUS_NO_PROTOCOL_CAPABILITY: + return "no_protocol_capability"; + + case ACLK_STATUS_INVALID_ENV_AUTH_URL: + return "invalid_env_auth_url"; + + case ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX: + return "invalid_env_transport_idx"; + + case ACLK_STATUS_INVALID_ENV_TRANSPORT_URL: + return "invalid_env_transport_url"; + + case ACLK_STATUS_INVALID_OTP: + return "invalid_otp"; + + case ACLK_STATUS_NO_LWT_TOPIC: + return "no_lwt_topic"; + + default: + return "unknown"; + } +} + +const char *aclk_cloud_base_url = NULL; + /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client * @return 0 - Successful Connection, @@ -513,18 +581,22 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) #endif while (service_running(SERVICE_ACLK)) { - char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); - if (cloud_base_url == NULL) { + aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (aclk_cloud_base_url == NULL) { error_report("Do not move the cloud base url out of post_conf_load!!"); + aclk_status = ACLK_STATUS_NO_CLOUD_URL; return -1; } - if (aclk_block_till_recon_allowed()) + if (aclk_block_till_recon_allowed()) { + aclk_status = ACLK_STATUS_BLOCKED; return 1; + } - info("Attempting connection now"); + netdata_log_info("Attempting connection now"); memset(&base_url, 0, sizeof(url_t)); - if (url_parse(cloud_base_url, &base_url)) { + if (url_parse(aclk_cloud_base_url, &base_url)) { + aclk_status = ACLK_STATUS_INVALID_CLOUD_URL; error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); url_t_destroy(&base_url); @@ -554,28 +626,65 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_env(aclk_env, base_url.host, base_url.port); url_t_destroy(&base_url); - if (ret) { - error_report("Failed to Get ACLK environment"); - // delay handled by aclk_block_till_recon_allowed - continue; + if(ret) switch(ret) { + case 1: + aclk_status = ACLK_STATUS_NOT_CLAIMED; + error_report("Failed to Get ACLK environment (agent is not claimed)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 2: + aclk_status = ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE; + error_report("Failed to Get ACLK environment (cannot contact ENV endpoint)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 3: + aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_200; + error_report("Failed to Get ACLK environment (ENV response code is not 200)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 4: + aclk_status = ACLK_STATUS_ENV_RESPONSE_EMPTY; + error_report("Failed to Get ACLK environment (ENV response is empty)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 5: + aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_JSON; + error_report("Failed to Get ACLK environment (ENV response is not JSON)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + default: + aclk_status = ACLK_STATUS_ENV_FAILED; + error_report("Failed to Get ACLK environment (unknown error)"); + // delay handled by aclk_block_till_recon_allowed + continue; } - if (!service_running(SERVICE_ACLK)) + if (!service_running(SERVICE_ACLK)) { + aclk_status = ACLK_STATUS_DISABLED; return 1; + } if (aclk_env->encoding != ACLK_ENC_PROTO) { + aclk_status = ACLK_STATUS_NO_OLD_PROTOCOL; error_report("This agent can only use the new cloud protocol but cloud requested old one."); continue; } if (!aclk_env_has_capa("proto")) { + aclk_status = ACLK_STATUS_NO_PROTOCOL_CAPABILITY; error_report("Can't use encoding=proto without at least \"proto\" capability."); continue; } - info("New ACLK protobuf protocol negotiated successfully (/env response)."); + netdata_log_info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { + aclk_status = ACLK_STATUS_INVALID_ENV_AUTH_URL; error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); url_t_destroy(&auth_url); continue; @@ -584,6 +693,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); url_t_destroy(&auth_url); if (ret) { + aclk_status = ACLK_STATUS_INVALID_OTP; error_report("Error passing Challenge/Response to get OTP"); continue; } @@ -593,6 +703,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { + aclk_status = ACLK_STATUS_NO_LWT_TOPIC; error_report("Couldn't get LWT topic. Will not send LWT."); continue; } @@ -600,12 +711,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // Do the MQTT connection ret = aclk_get_transport_idx(aclk_env); if (ret < 0) { + aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX; error_report("Cloud /env endpoint didn't return any transport usable by this Agent."); continue; } memset(&mqtt_url, 0, sizeof(url_t)); if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ + aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_URL; error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); url_t_destroy(&mqtt_url); continue; @@ -637,8 +750,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (!ret) { last_conn_time_mqtt = now_realtime_sec(); - info("ACLK connection successfully established"); - log_access("ACLK CONNECTED"); + netdata_log_info("ACLK connection successfully established"); + aclk_status = ACLK_STATUS_CONNECTED; + netdata_log_access("ACLK CONNECTED"); mqtt_connected_actions(client); return 0; } @@ -646,6 +760,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) error_report("Connect failed"); } + aclk_status = ACLK_STATUS_DISABLED; return 1; } @@ -671,7 +786,7 @@ void *aclk_main(void *ptr) ACLK_PROXY_TYPE proxy_type; aclk_get_proxy(&proxy_type); if (proxy_type == PROXY_TYPE_SOCKS5) { - error("SOCKS5 proxy is not supported by ACLK-NG yet."); + netdata_log_error("SOCKS5 proxy is not supported by ACLK-NG yet."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; } @@ -683,7 +798,7 @@ void *aclk_main(void *ptr) netdata_thread_disable_cancelability(); #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) - info("Killing ACLK thread -> cloud functionality has been disabled"); + netdata_log_info("Killing ACLK thread -> cloud functionality has been disabled"); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif @@ -696,7 +811,7 @@ void *aclk_main(void *ptr) goto exit; if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { - error("Couldn't initialize MQTT_WSS network library"); + netdata_log_error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -742,7 +857,7 @@ void *aclk_main(void *ptr) aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); aclk_connected = 0; - log_access("ACLK DISCONNECTED"); + netdata_log_access("ACLK DISCONNECTED"); } } while (service_running(SERVICE_ACLK)); @@ -791,7 +906,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) ret = get_node_id(&host->host_uuid, &node_id); if (ret > 0) { // this means we were not able to check if node_id already present - error("Unable to check for node_id. Ignoring the host state update."); + netdata_log_error("Unable to check for node_id. Ignoring the host state update."); return; } if (ret < 0) { @@ -809,7 +924,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) rrdhost_aclk_state_unlock(localhost); create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; create_query->data.bin_payload.msg_name = "CreateNodeInstance"; - info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); + netdata_log_info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); aclk_queue_query(create_query); return; } @@ -832,7 +947,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, + netdata_log_info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; @@ -875,7 +990,7 @@ void aclk_send_node_instances() node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, + netdata_log_info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); @@ -899,7 +1014,7 @@ void aclk_send_node_instances() node_instance_creation.claim_id = localhost->aclk_state.claimed_id, create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, + netdata_log_info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); freez((void *)node_instance_creation.machine_guid); aclk_queue_query(create_query); |