From ab1bb5b7f1c3c3a7b240ab7fc8661459ecd7decb Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 20 Jul 2023 06:49:55 +0200 Subject: Adding upstream version 1.41.0. Signed-off-by: Daniel Baumann --- aclk/aclk.c | 217 +++++++++++++++++++++++++++-------- aclk/aclk.h | 33 ++++++ aclk/aclk_capas.c | 21 ++-- aclk/aclk_otp.c | 144 +++++++++++------------ aclk/aclk_proxy.c | 2 +- aclk/aclk_query.c | 21 ++-- aclk/aclk_query_queue.c | 4 +- aclk/aclk_query_queue.h | 2 +- aclk/aclk_rx_msgs.c | 44 +++---- aclk/aclk_stats.c | 4 +- aclk/aclk_tx_msgs.c | 14 +-- aclk/aclk_util.c | 26 ++--- aclk/https_client.c | 90 +++++++-------- aclk/schema-wrappers/alarm_config.cc | 5 + aclk/schema-wrappers/alarm_config.h | 2 + aclk/schema-wrappers/alarm_stream.cc | 2 +- 16 files changed, 392 insertions(+), 239 deletions(-) (limited to 'aclk') diff --git a/aclk/aclk.c b/aclk/aclk.c index 399bc9876..312db076f 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -72,7 +72,7 @@ static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line) if (!ssl_log_file) ssl_log_file = fopen(ssl_log_filename, "a"); if (!ssl_log_file) { - error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename); + netdata_log_error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename); return; } fputs(line, ssl_log_file); @@ -107,14 +107,14 @@ static int load_private_key() long bytes_read; char *private_key = read_by_filename(filename, &bytes_read); if (!private_key) { - error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + netdata_log_error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); return 1; } - debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + netdata_log_debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); BIO *key_bio = BIO_new_mem_buf(private_key, -1); if (key_bio==NULL) { - error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + netdata_log_error("Claimed agent cannot establish ACLK - failed to create BIO for key"); goto biofailed; } @@ -125,13 +125,13 @@ static int load_private_key() NULL, NULL); if (!aclk_dctx) { - error("Loading private key (from claiming) failed - no OpenSSL Decoders found"); + netdata_log_error("Loading private key (from claiming) failed - no OpenSSL Decoders found"); goto biofailed; } // this is necesseary to avoid RSA key with wrong size if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) { - error("Decoding private key (from claiming) failed - invalid format."); + netdata_log_error("Decoding private key (from claiming) failed - invalid format."); goto biofailed; } #else @@ -145,7 +145,7 @@ static int load_private_key() } char err[512]; ERR_error_string_n(ERR_get_error(), err, sizeof(err)); - error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + netdata_log_error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); biofailed: freez(private_key); @@ -154,8 +154,8 @@ biofailed: static int wait_till_cloud_enabled() { - info("Waiting for Cloud to be enabled"); - while (!netdata_cloud_setting) { + netdata_log_info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_enabled) { sleep_usec(USEC_PER_SEC * 1); if (!service_running(SERVICE_ACLK)) return 1; @@ -204,7 +204,7 @@ static int wait_till_agent_claim_ready() // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { - error("Do not move the cloud base url out of post_conf_load!!"); + netdata_log_error("Do not move the cloud base url out of post_conf_load!!"); return 1; } @@ -212,7 +212,7 @@ static int wait_till_agent_claim_ready() // TODO make it without malloc/free memset(&url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &url)) { - error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); + netdata_log_error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); url_t_destroy(&url); sleep(5); continue; @@ -237,13 +237,13 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) error_report("%s", str); return; case MQTT_WSS_LOG_INFO: - info("%s", str); + netdata_log_info("%s", str); return; case MQTT_WSS_LOG_DEBUG: - debug(D_ACLK, "%s", str); + netdata_log_debug(D_ACLK, "%s", str); return; default: - error("Unknown log type from mqtt_wss"); + netdata_log_error("Unknown log type from mqtt_wss"); } } @@ -252,10 +252,10 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int UNUSED(qos); aclk_rcvd_cloud_msgs++; - debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); + netdata_log_debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring incoming message."); + netdata_log_error("Link is shutting down. Ignoring incoming message."); return; } @@ -277,7 +277,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype); logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); if(logfd < 0) - error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + netdata_log_error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); write(logfd, msg, msglen); close(logfd); #endif @@ -297,7 +297,7 @@ static void puback_callback(uint16_t packet_id) #endif if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { - info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); + netdata_log_info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; } } @@ -308,7 +308,7 @@ static int read_query_thread_count() threads = MAX(threads, 2); threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); if(threads < 1) { - error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); + netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); threads = 1; config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); } @@ -335,7 +335,7 @@ static int handle_connection(mqtt_wss_client client) } if (disconnect_req || aclk_kill_link) { - info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", + netdata_log_info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", disconnect_req ? "true" : "false", aclk_kill_link ? "true" : "false"); disconnect_req = 0; @@ -365,13 +365,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) - error("Unable to fetch topic for COMMAND (to subscribe)"); + netdata_log_error("Unable to fetch topic for COMMAND (to subscribe)"); else mqtt_wss_subscribe(client, topic, 1); topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); if (!topic) - error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + netdata_log_error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); else mqtt_wss_subscribe(client, topic, 1); @@ -390,7 +390,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) void aclk_graceful_disconnect(mqtt_wss_client client) { - info("Preparing to gracefully shutdown ACLK connection"); + netdata_log_info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); @@ -399,21 +399,21 @@ void aclk_graceful_disconnect(mqtt_wss_client client) time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { if (now_monotonic_sec() - t >= 2) { - error("Wasn't able to gracefully shutdown ACLK in time!"); + netdata_log_error("Wasn't able to gracefully shutdown ACLK in time!"); break; } if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { - info("MQTT App Layer `disconnect` message sent successfully"); + netdata_log_info("MQTT App Layer `disconnect` message sent successfully"); break; } } - info("ACLK link is down"); - log_access("ACLK DISCONNECTED"); + netdata_log_info("ACLK link is down"); + netdata_log_access("ACLK DISCONNECTED"); aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); aclk_connected = 0; - info("Attempting to gracefully shutdown the MQTT/WSS connection"); + netdata_log_info("Attempting to gracefully shutdown the MQTT/WSS connection"); mqtt_wss_disconnect(client, 1000); } @@ -455,7 +455,7 @@ static int aclk_block_till_recon_allowed() { next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC); last_backoff_value = (float)recon_delay / MSEC_PER_SEC; - info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC); + netdata_log_info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC); // we want to wake up from time to time to check netdata_exit while (recon_delay) { @@ -489,6 +489,74 @@ static int aclk_get_transport_idx(aclk_env_t *env) { } #endif +ACLK_STATUS aclk_status = ACLK_STATUS_NONE; + +const char *aclk_status_to_string(void) { + switch(aclk_status) { + case ACLK_STATUS_CONNECTED: + return "connected"; + + case ACLK_STATUS_NONE: + return "none"; + + case ACLK_STATUS_DISABLED: + return "disabled"; + + case ACLK_STATUS_NO_CLOUD_URL: + return "no_cloud_url"; + + case ACLK_STATUS_INVALID_CLOUD_URL: + return "invalid_cloud_url"; + + case ACLK_STATUS_NOT_CLAIMED: + return "not_claimed"; + + case ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE: + return "env_endpoint_unreachable"; + + case ACLK_STATUS_ENV_RESPONSE_NOT_200: + return "env_response_not_200"; + + case ACLK_STATUS_ENV_RESPONSE_EMPTY: + return "env_response_empty"; + + case ACLK_STATUS_ENV_RESPONSE_NOT_JSON: + return "env_response_not_json"; + + case ACLK_STATUS_ENV_FAILED: + return "env_failed"; + + case ACLK_STATUS_BLOCKED: + return "blocked"; + + case ACLK_STATUS_NO_OLD_PROTOCOL: + return "no_old_protocol"; + + case ACLK_STATUS_NO_PROTOCOL_CAPABILITY: + return "no_protocol_capability"; + + case ACLK_STATUS_INVALID_ENV_AUTH_URL: + return "invalid_env_auth_url"; + + case ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX: + return "invalid_env_transport_idx"; + + case ACLK_STATUS_INVALID_ENV_TRANSPORT_URL: + return "invalid_env_transport_url"; + + case ACLK_STATUS_INVALID_OTP: + return "invalid_otp"; + + case ACLK_STATUS_NO_LWT_TOPIC: + return "no_lwt_topic"; + + default: + return "unknown"; + } +} + +const char *aclk_cloud_base_url = NULL; + /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client * @return 0 - Successful Connection, @@ -513,18 +581,22 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) #endif while (service_running(SERVICE_ACLK)) { - char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); - if (cloud_base_url == NULL) { + aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (aclk_cloud_base_url == NULL) { error_report("Do not move the cloud base url out of post_conf_load!!"); + aclk_status = ACLK_STATUS_NO_CLOUD_URL; return -1; } - if (aclk_block_till_recon_allowed()) + if (aclk_block_till_recon_allowed()) { + aclk_status = ACLK_STATUS_BLOCKED; return 1; + } - info("Attempting connection now"); + netdata_log_info("Attempting connection now"); memset(&base_url, 0, sizeof(url_t)); - if (url_parse(cloud_base_url, &base_url)) { + if (url_parse(aclk_cloud_base_url, &base_url)) { + aclk_status = ACLK_STATUS_INVALID_CLOUD_URL; error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); url_t_destroy(&base_url); @@ -554,28 +626,65 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_env(aclk_env, base_url.host, base_url.port); url_t_destroy(&base_url); - if (ret) { - error_report("Failed to Get ACLK environment"); - // delay handled by aclk_block_till_recon_allowed - continue; + if(ret) switch(ret) { + case 1: + aclk_status = ACLK_STATUS_NOT_CLAIMED; + error_report("Failed to Get ACLK environment (agent is not claimed)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 2: + aclk_status = ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE; + error_report("Failed to Get ACLK environment (cannot contact ENV endpoint)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 3: + aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_200; + error_report("Failed to Get ACLK environment (ENV response code is not 200)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 4: + aclk_status = ACLK_STATUS_ENV_RESPONSE_EMPTY; + error_report("Failed to Get ACLK environment (ENV response is empty)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + case 5: + aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_JSON; + error_report("Failed to Get ACLK environment (ENV response is not JSON)"); + // delay handled by aclk_block_till_recon_allowed + continue; + + default: + aclk_status = ACLK_STATUS_ENV_FAILED; + error_report("Failed to Get ACLK environment (unknown error)"); + // delay handled by aclk_block_till_recon_allowed + continue; } - if (!service_running(SERVICE_ACLK)) + if (!service_running(SERVICE_ACLK)) { + aclk_status = ACLK_STATUS_DISABLED; return 1; + } if (aclk_env->encoding != ACLK_ENC_PROTO) { + aclk_status = ACLK_STATUS_NO_OLD_PROTOCOL; error_report("This agent can only use the new cloud protocol but cloud requested old one."); continue; } if (!aclk_env_has_capa("proto")) { + aclk_status = ACLK_STATUS_NO_PROTOCOL_CAPABILITY; error_report("Can't use encoding=proto without at least \"proto\" capability."); continue; } - info("New ACLK protobuf protocol negotiated successfully (/env response)."); + netdata_log_info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { + aclk_status = ACLK_STATUS_INVALID_ENV_AUTH_URL; error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); url_t_destroy(&auth_url); continue; @@ -584,6 +693,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); url_t_destroy(&auth_url); if (ret) { + aclk_status = ACLK_STATUS_INVALID_OTP; error_report("Error passing Challenge/Response to get OTP"); continue; } @@ -593,6 +703,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { + aclk_status = ACLK_STATUS_NO_LWT_TOPIC; error_report("Couldn't get LWT topic. Will not send LWT."); continue; } @@ -600,12 +711,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // Do the MQTT connection ret = aclk_get_transport_idx(aclk_env); if (ret < 0) { + aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX; error_report("Cloud /env endpoint didn't return any transport usable by this Agent."); continue; } memset(&mqtt_url, 0, sizeof(url_t)); if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ + aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_URL; error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); url_t_destroy(&mqtt_url); continue; @@ -637,8 +750,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (!ret) { last_conn_time_mqtt = now_realtime_sec(); - info("ACLK connection successfully established"); - log_access("ACLK CONNECTED"); + netdata_log_info("ACLK connection successfully established"); + aclk_status = ACLK_STATUS_CONNECTED; + netdata_log_access("ACLK CONNECTED"); mqtt_connected_actions(client); return 0; } @@ -646,6 +760,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) error_report("Connect failed"); } + aclk_status = ACLK_STATUS_DISABLED; return 1; } @@ -671,7 +786,7 @@ void *aclk_main(void *ptr) ACLK_PROXY_TYPE proxy_type; aclk_get_proxy(&proxy_type); if (proxy_type == PROXY_TYPE_SOCKS5) { - error("SOCKS5 proxy is not supported by ACLK-NG yet."); + netdata_log_error("SOCKS5 proxy is not supported by ACLK-NG yet."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; } @@ -683,7 +798,7 @@ void *aclk_main(void *ptr) netdata_thread_disable_cancelability(); #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) - info("Killing ACLK thread -> cloud functionality has been disabled"); + netdata_log_info("Killing ACLK thread -> cloud functionality has been disabled"); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif @@ -696,7 +811,7 @@ void *aclk_main(void *ptr) goto exit; if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { - error("Couldn't initialize MQTT_WSS network library"); + netdata_log_error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -742,7 +857,7 @@ void *aclk_main(void *ptr) aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); aclk_connected = 0; - log_access("ACLK DISCONNECTED"); + netdata_log_access("ACLK DISCONNECTED"); } } while (service_running(SERVICE_ACLK)); @@ -791,7 +906,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) ret = get_node_id(&host->host_uuid, &node_id); if (ret > 0) { // this means we were not able to check if node_id already present - error("Unable to check for node_id. Ignoring the host state update."); + netdata_log_error("Unable to check for node_id. Ignoring the host state update."); return; } if (ret < 0) { @@ -809,7 +924,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) rrdhost_aclk_state_unlock(localhost); create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; create_query->data.bin_payload.msg_name = "CreateNodeInstance"; - info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); + netdata_log_info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); aclk_queue_query(create_query); return; } @@ -832,7 +947,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, + netdata_log_info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, host->system_info->hops); freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; @@ -875,7 +990,7 @@ void aclk_send_node_instances() node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, + netdata_log_info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); @@ -899,7 +1014,7 @@ void aclk_send_node_instances() node_instance_creation.claim_id = localhost->aclk_state.claimed_id, create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, + netdata_log_info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); freez((void *)node_instance_creation.machine_guid); aclk_queue_query(create_query); diff --git a/aclk/aclk.h b/aclk/aclk.h index bd8375fb5..0badc1a62 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -13,17 +13,50 @@ #define ACLK_PUBACKS_CONN_STABLE 3 #endif /* ENABLE_ACLK */ +typedef enum __attribute__((packed)) { + ACLK_STATUS_CONNECTED = 0, + ACLK_STATUS_NONE, + ACLK_STATUS_DISABLED, + ACLK_STATUS_NO_CLOUD_URL, + ACLK_STATUS_INVALID_CLOUD_URL, + ACLK_STATUS_NOT_CLAIMED, + ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE, + ACLK_STATUS_ENV_RESPONSE_NOT_200, + ACLK_STATUS_ENV_RESPONSE_EMPTY, + ACLK_STATUS_ENV_RESPONSE_NOT_JSON, + ACLK_STATUS_ENV_FAILED, + ACLK_STATUS_BLOCKED, + ACLK_STATUS_NO_OLD_PROTOCOL, + ACLK_STATUS_NO_PROTOCOL_CAPABILITY, + ACLK_STATUS_INVALID_ENV_AUTH_URL, + ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX, + ACLK_STATUS_INVALID_ENV_TRANSPORT_URL, + ACLK_STATUS_INVALID_OTP, + ACLK_STATUS_NO_LWT_TOPIC, +} ACLK_STATUS; + +extern ACLK_STATUS aclk_status; +extern const char *aclk_cloud_base_url; +const char *aclk_status_to_string(void); + extern int aclk_connected; extern int aclk_ctx_based; extern int aclk_disable_runtime; extern int aclk_stats_enabled; extern int aclk_kill_link; +extern time_t last_conn_time_mqtt; +extern time_t last_conn_time_appl; +extern time_t last_disconnect_time; +extern time_t next_connection_attempt; +extern float last_backoff_value; + extern usec_t aclk_session_us; extern time_t aclk_session_sec; extern time_t aclk_block_until; +extern int aclk_connection_counter; extern int disconnect_req; #ifdef ENABLE_ACLK diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c index b38a928a5..e8d85a2b9 100644 --- a/aclk/aclk_capas.c +++ b/aclk/aclk_capas.c @@ -4,17 +4,19 @@ #include "ml/ml.h" +#define HTTP_API_V2_VERSION 6 + const struct capability *aclk_get_agent_capas() { static struct capability agent_capabilities[] = { { .name = "json", .version = 2, .enabled = 0 }, { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = 0, .enabled = 0 }, - { .name = "mc", .version = 0, .enabled = 0 }, + { .name = "ml", .version = 0, .enabled = 0 }, // index 2, below + { .name = "mc", .version = 0, .enabled = 0 }, // index 3, below { .name = "ctx", .version = 1, .enabled = 1 }, { .name = "funcs", .version = 1, .enabled = 1 }, - { .name = "http_api_v2", .version = 3, .enabled = 1 }, - { .name = "health", .version = 1, .enabled = 0 }, + { .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 }, + { .name = "health", .version = 1, .enabled = 0 }, // index 7, below { .name = "req_cancel", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } }; @@ -31,6 +33,8 @@ const struct capability *aclk_get_agent_capas() struct capability *aclk_get_node_instance_capas(RRDHOST *host) { + bool functions = (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))); + struct capability ni_caps[] = { { .name = "proto", .version = 1, .enabled = 1 }, { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) }, @@ -38,8 +42,8 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host) .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, { .name = "ctx", .version = 1, .enabled = 1 }, - { .name = "funcs", .version = 0, .enabled = 0 }, - { .name = "http_api_v2", .version = 3, .enabled = 1 }, + { .name = "funcs", .version = functions ? 1 : 0, .enabled = functions ? 1 : 0 }, + { .name = "http_api_v2", .version = HTTP_API_V2_VERSION, .enabled = 1 }, { .name = "health", .version = 1, .enabled = host->health.health_enabled }, { .name = "req_cancel", .version = 1, .enabled = 1 }, { .name = NULL, .version = 0, .enabled = 0 } @@ -48,10 +52,5 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host) struct capability *ret = mallocz(sizeof(ni_caps)); memcpy(ret, ni_caps, sizeof(ni_caps)); - if (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))) { - ret[4].version = 1; - ret[4].enabled = 1; - } - return ret; } diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index 66d751be6..46d0f6213 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -38,7 +38,7 @@ struct auth_data { #define PARSE_ENV_JSON_CHK_TYPE(it, type, name) \ if (json_object_get_type(json_object_iter_peek_value(it)) != type) { \ - error("value of key \"%s\" should be %s", name, #type); \ + netdata_log_error("value of key \"%s\" should be %s", name, #type); \ goto exit; \ } @@ -55,7 +55,7 @@ static int parse_passwd_response(const char *json_str, struct auth_data *auth) { json = json_tokener_parse(json_str); if (!json) { - error("JSON-C failed to parse the payload of http response of /env endpoint"); + netdata_log_error("JSON-C failed to parse the payload of http response of /env endpoint"); return 1; } @@ -88,26 +88,26 @@ static int parse_passwd_response(const char *json_str, struct auth_data *auth) { PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TOPICS) if (aclk_generate_topic_cache(json_object_iter_peek_value(&it))) { - error("Failed to generate topic cache!"); + netdata_log_error("Failed to generate topic cache!"); goto exit; } json_object_iter_next(&it); continue; } - error("Unknown key \"%s\" in passwd response payload. Ignoring", json_object_iter_peek_name(&it)); + netdata_log_error("Unknown key \"%s\" in passwd response payload. Ignoring", json_object_iter_peek_name(&it)); json_object_iter_next(&it); } if (!auth->client_id) { - error(JSON_KEY_CLIENTID " is compulsory key in /password response"); + netdata_log_error(JSON_KEY_CLIENTID " is compulsory key in /password response"); goto exit; } if (!auth->passwd) { - error(JSON_KEY_PASS " is compulsory in /password response"); + netdata_log_error(JSON_KEY_PASS " is compulsory in /password response"); goto exit; } if (!auth->username) { - error(JSON_KEY_USER " is compulsory in /password response"); + netdata_log_error(JSON_KEY_USER " is compulsory in /password response"); goto exit; } @@ -126,11 +126,11 @@ exit: static const char *get_json_str_by_path(json_object *json, const char *path) { json_object *ptr; if (json_pointer_get(json, path, &ptr)) { - error("Missing compulsory key \"%s\" in error response", path); + netdata_log_error("Missing compulsory key \"%s\" in error response", path); return NULL; } if (json_object_get_type(ptr) != json_type_string) { - error("Value of Key \"%s\" in error response should be string", path); + netdata_log_error("Value of Key \"%s\" in error response should be string", path); return NULL; } return json_object_get_string(ptr); @@ -147,7 +147,7 @@ static int aclk_parse_otp_error(const char *json_str) { json = json_tokener_parse(json_str); if (!json) { - error("JSON-C failed to parse the payload of http response of /env endpoint"); + netdata_log_error("JSON-C failed to parse the payload of http response of /env endpoint"); return 1; } @@ -163,7 +163,7 @@ static int aclk_parse_otp_error(const char *json_str) { // optional field if (!json_pointer_get(json, "/" JSON_KEY_ERTRY, &ptr)) { if (json_object_get_type(ptr) != json_type_boolean) { - error("Error response Key " "/" JSON_KEY_ERTRY " should be of boolean type"); + netdata_log_error("Error response Key " "/" JSON_KEY_ERTRY " should be of boolean type"); goto exit; } block_retry = json_object_get_boolean(ptr); @@ -172,7 +172,7 @@ static int aclk_parse_otp_error(const char *json_str) { // optional field if (!json_pointer_get(json, "/" JSON_KEY_EDELAY, &ptr)) { if (json_object_get_type(ptr) != json_type_int) { - error("Error response Key " "/" JSON_KEY_EDELAY " should be of integer type"); + netdata_log_error("Error response Key " "/" JSON_KEY_EDELAY " should be of integer type"); goto exit; } backoff = json_object_get_int(ptr); @@ -184,7 +184,7 @@ static int aclk_parse_otp_error(const char *json_str) { if (backoff > 0) aclk_block_until = now_monotonic_sec() + backoff; - error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); + netdata_log_error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); rc = 0; exit: json_object_put(json); @@ -205,7 +205,7 @@ static int aclk_parse_otp_error(const char *json_str) { json = json_tokener_parse(json_str); if (!json) { - error("JSON-C failed to parse the payload of http response of /env endpoint"); + netdata_log_error("JSON-C failed to parse the payload of http response of /env endpoint"); return 1; } @@ -236,7 +236,7 @@ static int aclk_parse_otp_error(const char *json_str) { } if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EDELAY)) { if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_int) { - error("value of key " JSON_KEY_EDELAY " should be integer"); + netdata_log_error("value of key " JSON_KEY_EDELAY " should be integer"); goto exit; } @@ -246,7 +246,7 @@ static int aclk_parse_otp_error(const char *json_str) { } if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ERTRY)) { if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_boolean) { - error("value of key " JSON_KEY_ERTRY " should be integer"); + netdata_log_error("value of key " JSON_KEY_ERTRY " should be integer"); goto exit; } @@ -254,7 +254,7 @@ static int aclk_parse_otp_error(const char *json_str) { json_object_iter_next(&it); continue; } - error("Unknown key \"%s\" in error response payload. Ignoring", json_object_iter_peek_name(&it)); + netdata_log_error("Unknown key \"%s\" in error response payload. Ignoring", json_object_iter_peek_name(&it)); json_object_iter_next(&it); } @@ -264,7 +264,7 @@ static int aclk_parse_otp_error(const char *json_str) { if (backoff > 0) aclk_block_until = now_monotonic_sec() + backoff; - error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); + netdata_log_error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); rc = 0; exit: json_object_put(json); @@ -301,7 +301,7 @@ inline static int base64_decode_helper(unsigned char *out, int *outl, const unsi EVP_DecodeFinal(ctx, remaining_data, &remainder); EVP_ENCODE_CTX_free(ctx); if (remainder) { - error("Unexpected data at EVP_DecodeFinal"); + netdata_log_error("Unexpected data at EVP_DecodeFinal"); return 1; } return 0; @@ -322,12 +322,12 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char ** req.url = (char *)buffer_tostring(url); if (aclk_https_request(&req, &resp)) { - error ("ACLK_OTP Challenge failed"); + netdata_log_error("ACLK_OTP Challenge failed"); buffer_free(url); return 1; } if (resp.http_code != 200) { - error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code); + netdata_log_error("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code); buffer_free(url); if (resp.payload_size) aclk_parse_otp_error(resp.payload); @@ -335,36 +335,36 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char ** } buffer_free(url); - info ("ACLK_OTP Got Challenge from Cloud"); + netdata_log_info("ACLK_OTP Got Challenge from Cloud"); json_object *json = json_tokener_parse(resp.payload); if (!json) { - error ("Couldn't parse HTTP GET challenge payload"); + netdata_log_error("Couldn't parse HTTP GET challenge payload"); goto cleanup_resp; } json_object *challenge_json; if (!json_object_object_get_ex(json, "challenge", &challenge_json)) { - error ("No key named \"challenge\" in the returned JSON"); + netdata_log_error("No key named \"challenge\" in the returned JSON"); goto cleanup_json; } if (!json_object_is_type(challenge_json, json_type_string)) { - error ("\"challenge\" is not a string JSON type"); + netdata_log_error("\"challenge\" is not a string JSON type"); goto cleanup_json; } const char *challenge_base64; if (!(challenge_base64 = json_object_get_string(challenge_json))) { - error("Failed to extract challenge from JSON object"); + netdata_log_error("Failed to extract challenge from JSON object"); goto cleanup_json; } if (strlen(challenge_base64) != CHALLENGE_LEN_BASE64) { - error("Received Challenge has unexpected length of %zu (expected %d)", strlen(challenge_base64), CHALLENGE_LEN_BASE64); + netdata_log_error("Received Challenge has unexpected length of %zu (expected %d)", strlen(challenge_base64), CHALLENGE_LEN_BASE64); goto cleanup_json; } *challenge = mallocz((CHALLENGE_LEN_BASE64 / 4) * 3); base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64)); if (*challenge_bytes != CHALLENGE_LEN) { - error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN); + netdata_log_error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN); freez(*challenge); *challenge = NULL; goto cleanup_json; @@ -405,19 +405,19 @@ int aclk_send_otp_response(const char *agent_id, const unsigned char *response, req.payload_size = strlen(req.payload); if (aclk_https_request(&req, &resp)) { - error ("ACLK_OTP Password error trying to post result to password"); + netdata_log_error("ACLK_OTP Password error trying to post result to password"); goto cleanup_buffers; } if (resp.http_code != 201) { - error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code); + netdata_log_error("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code); if (resp.payload_size) aclk_parse_otp_error(resp.payload); goto cleanup_response; } - info ("ACLK_OTP Got Password from Cloud"); + netdata_log_info("ACLK_OTP Got Password from Cloud"); if (parse_passwd_response(resp.payload, mqtt_auth)){ - error("Error parsing response of password endpoint"); + netdata_log_error("Error parsing response of password endpoint"); goto cleanup_response; } @@ -470,7 +470,7 @@ static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, u { char err[512]; ERR_error_string_n(ERR_get_error(), err, sizeof(err)); - error("Decryption of the challenge failed: %s", err); + netdata_log_error("Decryption of the challenge failed: %s", err); } return result; } @@ -486,13 +486,13 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_p char *agent_id = get_agent_claimid(); if (agent_id == NULL) { - error("Agent was not claimed - cannot perform challenge/response"); + netdata_log_error("Agent was not claimed - cannot perform challenge/response"); return 1; } // Get Challenge if (aclk_get_otp_challenge(target, agent_id, &challenge, &challenge_bytes)) { - error("Error getting challenge"); + netdata_log_error("Error getting challenge"); freez(agent_id); return 1; } @@ -501,7 +501,7 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_p unsigned char *response_plaintext; int response_plaintext_bytes = private_decrypt(p_key, challenge, challenge_bytes, &response_plaintext); if (response_plaintext_bytes < 0) { - error ("Couldn't decrypt the challenge received"); + netdata_log_error("Couldn't decrypt the challenge received"); freez(response_plaintext); freez(challenge); freez(agent_id); @@ -512,7 +512,7 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_p // Encode and Send Challenge struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL }; if (aclk_send_otp_response(agent_id, response_plaintext, response_plaintext_bytes, target, &data)) { - error("Error getting response"); + netdata_log_error("Error getting response"); freez(response_plaintext); freez(agent_id); return 1; @@ -549,12 +549,12 @@ static int parse_json_env_transport(json_object *json, aclk_transport_desc_t *tr if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_TYPE)) { PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_TYPE) if (trp->type != ACLK_TRP_UNKNOWN) { - error(JSON_KEY_TRP_TYPE " set already"); + netdata_log_error(JSON_KEY_TRP_TYPE " set already"); goto exit; } trp->type = aclk_transport_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it))); if (trp->type == ACLK_TRP_UNKNOWN) { - error(JSON_KEY_TRP_TYPE " unknown type \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + netdata_log_error(JSON_KEY_TRP_TYPE " unknown type \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); goto exit; } json_object_iter_next(&it); @@ -564,25 +564,25 @@ static int parse_json_env_transport(json_object *json, aclk_transport_desc_t *tr if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_ENDPOINT)) { PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_ENDPOINT) if (trp->endpoint) { - error(JSON_KEY_TRP_ENDPOINT " set already"); + netdata_log_error(JSON_KEY_TRP_ENDPOINT " set already"); goto exit; } trp->endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); json_object_iter_next(&it); continue; } - - error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + + netdata_log_error("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); json_object_iter_next(&it); } if (!trp->endpoint) { - error (JSON_KEY_TRP_ENDPOINT " is missing from JSON dictionary"); + netdata_log_error(JSON_KEY_TRP_ENDPOINT " is missing from JSON dictionary"); goto exit; } if (trp->type == ACLK_TRP_UNKNOWN) { - error ("transport type not set"); + netdata_log_error("transport type not set"); goto exit; } @@ -598,7 +598,7 @@ static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) { json_object *obj; if (env->transports) { - error("transports have been set already"); + netdata_log_error("transports have been set already"); return 1; } @@ -610,7 +610,7 @@ static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) { trp = callocz(1, sizeof(aclk_transport_desc_t)); obj = json_object_array_get_idx(json_array, i); if (parse_json_env_transport(obj, trp)) { - error("error parsing transport idx %d", (int)i); + netdata_log_error("error parsing transport idx %d", (int)i); freez(trp); return 1; } @@ -626,14 +626,14 @@ static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) { static int parse_json_backoff_int(struct json_object_iterator *it, int *out, const char* name, int min, int max) { if (!strcmp(json_object_iter_peek_name(it), name)) { if (json_object_get_type(json_object_iter_peek_value(it)) != json_type_int) { - error("Could not parse \"%s\". Not an integer as expected.", name); + netdata_log_error("Could not parse \"%s\". Not an integer as expected.", name); return MATCHED_ERROR; } *out = json_object_get_int(json_object_iter_peek_value(it)); if (*out < min || *out > max) { - error("Value of \"%s\"=%d out of range (%d-%d).", name, *out, min, max); + netdata_log_error("Value of \"%s\"=%d out of range (%d-%d).", name, *out, min, max); return MATCHED_ERROR; } @@ -675,7 +675,7 @@ static int parse_json_backoff(json_object *json, aclk_backoff_t *backoff) { continue; } - error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + netdata_log_error("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); json_object_iter_next(&it); } @@ -687,7 +687,7 @@ static int parse_json_env_caps(json_object *json, aclk_env_t *env) { const char *str; if (env->capabilities) { - error("transports have been set already"); + netdata_log_error("transports have been set already"); return 1; } @@ -702,12 +702,12 @@ static int parse_json_env_caps(json_object *json, aclk_env_t *env) { for (size_t i = 0; i < env->capability_count; i++) { obj = json_object_array_get_idx(json, i); if (json_object_get_type(obj) != json_type_string) { - error("Capability at index %d not a string!", (int)i); + netdata_log_error("Capability at index %d not a string!", (int)i); return 1; } str = json_object_get_string(obj); if (!str) { - error("Error parsing capabilities"); + netdata_log_error("Error parsing capabilities"); return 1; } env->capabilities[i] = strdupz(str); @@ -723,7 +723,7 @@ static int parse_json_env(const char *json_str, aclk_env_t *env) { json = json_tokener_parse(json_str); if (!json) { - error("JSON-C failed to parse the payload of http response of /env endpoint"); + netdata_log_error("JSON-C failed to parse the payload of http response of /env endpoint"); return 1; } @@ -734,7 +734,7 @@ static int parse_json_env(const char *json_str, aclk_env_t *env) { if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_AUTH_ENDPOINT)) { PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_AUTH_ENDPOINT) if (env->auth_endpoint) { - error("authEndpoint set already"); + netdata_log_error("authEndpoint set already"); goto exit; } env->auth_endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); @@ -745,7 +745,7 @@ static int parse_json_env(const char *json_str, aclk_env_t *env) { if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ENC)) { PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_ENC) if (env->encoding != ACLK_ENC_UNKNOWN) { - error(JSON_KEY_ENC " set already"); + netdata_log_error(JSON_KEY_ENC " set already"); goto exit; } env->encoding = aclk_encoding_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it))); @@ -768,7 +768,7 @@ static int parse_json_env(const char *json_str, aclk_env_t *env) { if (parse_json_backoff(json_object_iter_peek_value(&it), &env->backoff)) { env->backoff.base = 0; - error("Error parsing Backoff parameters in env"); + netdata_log_error("Error parsing Backoff parameters in env"); goto exit; } @@ -780,7 +780,7 @@ static int parse_json_env(const char *json_str, aclk_env_t *env) { PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_CAPS) if (parse_json_env_caps(json_object_iter_peek_value(&it), env)) { - error("Error parsing capabilities list"); + netdata_log_error("Error parsing capabilities list"); goto exit; } @@ -788,25 +788,25 @@ static int parse_json_env(const char *json_str, aclk_env_t *env) { continue; } - error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + netdata_log_error("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); json_object_iter_next(&it); } // Check all compulsory keys have been set if (env->transport_count < 1) { - error("env has to return at least one transport"); + netdata_log_error("env has to return at least one transport"); goto exit; } if (!env->auth_endpoint) { - error(JSON_KEY_AUTH_ENDPOINT " is compulsory"); + netdata_log_error(JSON_KEY_AUTH_ENDPOINT " is compulsory"); goto exit; } if (env->encoding == ACLK_ENC_UNKNOWN) { - error(JSON_KEY_ENC " is compulsory"); + netdata_log_error(JSON_KEY_ENC " is compulsory"); goto exit; } if (!env->backoff.base) { - error(JSON_KEY_BACKOFF " is compulsory"); + netdata_log_error(JSON_KEY_BACKOFF " is compulsory"); goto exit; } @@ -830,7 +830,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { char *agent_id = get_agent_claimid(); if (agent_id == NULL) { - error("Agent was not claimed - cannot perform challenge/response"); + netdata_log_error("Agent was not claimed - cannot perform challenge/response"); buffer_free(buf); return 1; } @@ -843,35 +843,35 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { req.port = aclk_port; req.url = buf->buffer; if (aclk_https_request(&req, &resp)) { - error("Error trying to contact env endpoint"); + netdata_log_error("Error trying to contact env endpoint"); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 2; } if (resp.http_code != 200) { - error("The HTTP code not 200 OK (Got %d)", resp.http_code); + netdata_log_error("The HTTP code not 200 OK (Got %d)", resp.http_code); if (resp.payload_size) aclk_parse_otp_error(resp.payload); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 3; } if (!resp.payload || !resp.payload_size) { - error("Unexpected empty payload as response to /env call"); + netdata_log_error("Unexpected empty payload as response to /env call"); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 4; } if (parse_json_env(resp.payload, env)) { - error ("error parsing /env message"); + netdata_log_error("error parsing /env message"); https_req_response_free(&resp); buffer_free(buf); - return 1; + return 5; } - info("Getting Cloud /env successful"); + netdata_log_info("Getting Cloud /env successful"); https_req_response_free(&resp); buffer_free(buf); diff --git a/aclk/aclk_proxy.c b/aclk/aclk_proxy.c index 1701eb8e8..4af46208f 100644 --- a/aclk/aclk_proxy.c +++ b/aclk/aclk_proxy.c @@ -85,7 +85,7 @@ static inline void safe_log_proxy_error(char *str, const char *proxy) { char *log = strdupz(proxy); safe_log_proxy_censor(log); - error("%s Provided Value:\"%s\"", str, log); + netdata_log_error("%s Provided Value:\"%s\"", str, log); freez(log); } diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 0698c2d60..07d571be1 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -110,7 +110,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) usec_t t; web_client_timeout_checkpoint_set(w, query->timeout); if(web_client_timeout_checkpoint_and_check(w, &t)) { - log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout); + netdata_log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout); retval = 1; w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED; aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0); @@ -128,7 +128,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) ACLK_STATS_UNLOCK; } - w->response.code = web_client_api_request_with_node_selection(localhost, w, path); + w->response.code = (short)web_client_api_request_with_node_selection(localhost, w, path); web_client_timeout_checkpoint_response_ready(w, &t); if(buffer_strlen(w->response.data) > ACLK_MAX_WEB_RESPONSE_SIZE) { @@ -164,7 +164,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->response.zinitialized = true; w->response.zoutput = true; } else - error("Failed to initialize zlib. Proceeding without compression."); + netdata_log_error("Failed to initialize zlib. Proceeding without compression."); } } @@ -177,9 +177,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) z_ret = deflate(&w->response.zstream, Z_FINISH); if(z_ret < 0) { if(w->response.zstream.msg) - error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); + netdata_log_error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); else - error("Unknown error during zlib compression."); + netdata_log_error("Unknown error during zlib compression."); retval = 1; w->response.code = 500; aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_ZLIB_ERROR, CLOUD_EMSG_ZLIB_ERROR, NULL, 0); @@ -197,7 +197,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) z_buffer = NULL; } - w->response.data->date = w->timings.tv_ready.tv_sec; web_client_build_http_header(w); local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk); local_buffer->content_type = CT_APPLICATION_JSON; @@ -222,7 +221,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) cleanup: now_monotonic_high_precision_timeval(&tv); - log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + netdata_log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", w->id , gettid() , query_thr->idx @@ -286,10 +285,10 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que worker_is_busy(query->type); if (query->type == HTTP_API_V2) { - debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); + netdata_log_debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); http_api_v2(query_thr, query); } else { - debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); + netdata_log_debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); send_bin_msg(query_thr, query); } @@ -357,7 +356,7 @@ void *aclk_query_main_thread(void *ptr) #define TASK_LEN_MAX 22 void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client) { - info("Starting %d query threads.", query_threads->count); + netdata_log_info("Starting %d query threads.", query_threads->count); char thread_name[TASK_LEN_MAX]; query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); @@ -366,7 +365,7 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss query_threads->thread_list[i].client = client; if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "ACLK_QRY[%d]", i) < 0)) - error("snprintf encoding error"); + netdata_log_error("snprintf encoding error"); netdata_thread_create( &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, &query_threads->thread_list[i]); diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 78a906d96..8ca21d456 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -27,7 +27,7 @@ static inline int _aclk_queue_query(aclk_query_t query) if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) - error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + netdata_log_error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); aclk_query_free(query); return 1; } @@ -67,7 +67,7 @@ aclk_query_t aclk_queue_pop(void) if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) - error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + netdata_log_error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); return NULL; } diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 944fc0797..5983561a6 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -79,7 +79,7 @@ void aclk_queue_unlock(void); if (likely(query->data.bin_payload.payload)) { \ aclk_queue_query(query); \ } else { \ - error("Failed to generate payload (%s)", __FUNCTION__); \ + netdata_log_error("Failed to generate payload (%s)", __FUNCTION__); \ aclk_query_free(query); \ } diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 60bff9ba1..84ade2b34 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -103,14 +103,14 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur // TODO better check of URL if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { errno = 0; - error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); + netdata_log_error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); return 1; } start = payload + 4; if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) { errno = 0; - error("Doesn't look like HTTP GET request."); + netdata_log_error("Doesn't look like HTTP GET request."); return 1; } @@ -126,7 +126,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent errno = 0; if (cloud_to_agent->version < ACLK_V_COMPRESSION) { - error( + netdata_log_error( "This handler cannot reply to request with version older than %d, received %d.", ACLK_V_COMPRESSION, cloud_to_agent->version); @@ -136,22 +136,22 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent query = aclk_query_new(HTTP_API_V2); if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) { - error("Error extracting payload expected after the JSON dictionary."); + netdata_log_error("Error extracting payload expected after the JSON dictionary."); goto error; } if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) { - error("Could not extract payload from query"); + netdata_log_error("Could not extract payload from query"); goto error; } if (unlikely(!cloud_to_agent->callback_topic)) { - error("Missing callback_topic"); + netdata_log_error("Missing callback_topic"); goto error; } if (unlikely(!cloud_to_agent->msg_id)) { - error("Missing msg_id"); + netdata_log_error("Missing msg_id"); goto error; } @@ -180,7 +180,7 @@ int aclk_handle_cloud_cmd_message(char *payload) return 1; } - debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload); + netdata_log_debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload); int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); @@ -250,17 +250,17 @@ int create_node_instance_result(const char *msg, size_t msg_len) return 1; } - debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); + netdata_log_debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); uuid_t host_id, node_id; if (uuid_parse(res.machine_guid, host_id)) { - error("Error parsing machine_guid provided by CreateNodeInstanceResult"); + netdata_log_error("Error parsing machine_guid provided by CreateNodeInstanceResult"); freez(res.machine_guid); freez(res.node_id); return 1; } if (uuid_parse(res.node_id, node_id)) { - error("Error parsing node_id provided by CreateNodeInstanceResult"); + netdata_log_error("Error parsing node_id provided by CreateNodeInstanceResult"); freez(res.machine_guid); freez(res.node_id); return 1; @@ -341,7 +341,7 @@ int start_alarm_streaming(const char *msg, size_t msg_len) { struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); if (!res.node_id) { - error("Error parsing StartAlarmStreaming"); + netdata_log_error("Error parsing StartAlarmStreaming"); return 1; } aclk_start_alert_streaming(res.node_id, res.resets); @@ -353,7 +353,7 @@ int send_alarm_checkpoint(const char *msg, size_t msg_len) { struct send_alarm_checkpoint sac = parse_send_alarm_checkpoint(msg, msg_len); if (!sac.node_id || !sac.claim_id) { - error("Error parsing SendAlarmCheckpoint"); + netdata_log_error("Error parsing SendAlarmCheckpoint"); freez(sac.node_id); freez(sac.claim_id); return 1; @@ -368,7 +368,7 @@ int send_alarm_configuration(const char *msg, size_t msg_len) { char *config_hash = parse_send_alarm_configuration(msg, msg_len); if (!config_hash || !*config_hash) { - error("Error parsing SendAlarmConfiguration"); + netdata_log_error("Error parsing SendAlarmConfiguration"); freez(config_hash); return 1; } @@ -381,7 +381,7 @@ int send_alarm_snapshot(const char *msg, size_t msg_len) { struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); if (!sas->node_id || !sas->claim_id || !sas->snapshot_uuid) { - error("Error parsing SendAlarmSnapshot"); + netdata_log_error("Error parsing SendAlarmSnapshot"); destroy_send_alarm_snapshot(sas); return 1; } @@ -396,13 +396,13 @@ int handle_disconnect_req(const char *msg, size_t msg_len) if (!cmd) return 1; if (cmd->permaban) { - error("Cloud Banned This Agent!"); + netdata_log_error("Cloud Banned This Agent!"); aclk_disable_runtime = 1; } - info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); + netdata_log_info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); if (cmd->reconnect_after_s > 0) { aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; - info( + netdata_log_info( "Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s); } @@ -455,7 +455,7 @@ int cancel_pending_req(const char *msg, size_t msg_len) return 1; } - log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id); + netdata_log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id); if (mark_pending_req_cancelled(cmd.request_id)) error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id); @@ -529,9 +529,9 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t ACLK_STATS_UNLOCK; } new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type)); - debug(D_ACLK, "Got message named '%s' from cloud", message_type); + netdata_log_debug(D_ACLK, "Got message named '%s' from cloud", message_type); if (unlikely(!msg_descriptor)) { - error("Do not know how to handle message of type '%s'. Ignoring", message_type); + netdata_log_error("Do not know how to handle message of type '%s'. Ignoring", message_type); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.cloud_req_err++; @@ -557,7 +557,7 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t ACLK_STATS_UNLOCK; } if (msg_descriptor->fnc(msg, msg_len)) { - error("Error processing message of type '%s'", message_type); + netdata_log_error("Error processing message of type '%s'", message_type); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.cloud_req_err++; diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 2b4d5e48a..f4672882b 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -193,7 +193,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) - error("snprintf encoding error"); + netdata_log_error("snprintf encoding error"); aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } } @@ -463,7 +463,7 @@ void aclk_stats_msg_puback(uint16_t id) if (unlikely(!pub_time[id])) { ACLK_STATS_UNLOCK; - error("Received PUBACK for unknown message?!"); + netdata_log_error("Received PUBACK for unknown message?!"); return; } diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index d11e96cfb..26e20dfb2 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -32,7 +32,7 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s const char *topic = aclk_get_topic(subtopic); if (unlikely(!topic)) { - error("Couldn't get topic. Aborting message send."); + netdata_log_error("Couldn't get topic. Aborting message send."); return 0; } @@ -61,7 +61,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec int len; if (unlikely(!topic || topic[0] != '/')) { - error ("Full topic required!"); + netdata_log_error("Full topic required!"); json_object_put(msg); return HTTP_RESP_INTERNAL_SERVER_ERROR; } @@ -172,7 +172,7 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char json_object_object_add(msg, "error-description", tmp); if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { - error("Failed to send cancellation message for http reply %zu %s", payload_len, payload); + netdata_log_error("Failed to send cancellation message for http reply %zu %s", payload_len, payload); } } @@ -220,7 +220,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable rrdhost_aclk_state_lock(localhost); if (unlikely(!localhost->aclk_state.claimed_id)) { - error("Internal error. Should not come here if not claimed"); + netdata_log_error("Internal error. Should not come here if not claimed"); rrdhost_aclk_state_unlock(localhost); return 0; } @@ -233,7 +233,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable rrdhost_aclk_state_unlock(localhost); if (!msg) { - error("Error generating agent::v1::UpdateAgentConnection payload"); + netdata_log_error("Error generating agent::v1::UpdateAgentConnection payload"); return 0; } @@ -255,7 +255,7 @@ char *aclk_generate_lwt(size_t *size) { rrdhost_aclk_state_lock(localhost); if (unlikely(!localhost->aclk_state.claimed_id)) { - error("Internal error. Should not come here if not claimed"); + netdata_log_error("Internal error. Should not come here if not claimed"); rrdhost_aclk_state_unlock(localhost); return NULL; } @@ -265,7 +265,7 @@ char *aclk_generate_lwt(size_t *size) { rrdhost_aclk_state_unlock(localhost); if (!msg) - error("Error generating agent::v1::UpdateAgentConnection payload for LWT"); + netdata_log_error("Error generating agent::v1::UpdateAgentConnection payload for LWT"); return msg; } diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index 7d03f97fd..00920e069 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -185,7 +185,7 @@ static void topic_generate_final(struct aclk_topic *t) { rrdhost_aclk_state_lock(localhost); if (unlikely(!localhost->aclk_state.claimed_id)) { - error("This should never be called if agent not claimed"); + netdata_log_error("This should never be called if agent not claimed"); rrdhost_aclk_state_unlock(localhost); return; } @@ -214,19 +214,19 @@ static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *to while (!json_object_iter_equal(&it, &itEnd)) { if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) { if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { - error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string"); + netdata_log_error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string"); return 1; } topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it))); if (topic->topic_id == ACLK_TOPICID_UNKNOWN) { - debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + netdata_log_debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); } json_object_iter_next(&it); continue; } if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) { if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { - error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string"); + netdata_log_error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string"); return 1; } topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); @@ -234,12 +234,12 @@ static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *to continue; } - error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it)); + netdata_log_error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it)); json_object_iter_next(&it); } if (!topic->topic_recvd) { - error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC); + netdata_log_error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC); return 1; } @@ -255,7 +255,7 @@ int aclk_generate_topic_cache(struct json_object *json) size_t array_size = json_object_array_length(json); if (!array_size) { - error("Empty topic list!"); + netdata_log_error("Empty topic list!"); return 1; } @@ -267,19 +267,19 @@ int aclk_generate_topic_cache(struct json_object *json) for (size_t i = 0; i < array_size; i++) { obj = json_object_array_get_idx(json, i); if (json_object_get_type(obj) != json_type_object) { - error("expected json_type_object"); + netdata_log_error("expected json_type_object"); return 1; } aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic)); if (topic_cache_add_topic(obj, aclk_topic_cache[i])) { - error("failed to parse topic @idx=%d", (int)i); + netdata_log_error("failed to parse topic @idx=%d", (int)i); return 1; } } for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { if (!aclk_get_topic(compulsory_topics[i])) { - error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); + netdata_log_error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); return 1; } } @@ -295,7 +295,7 @@ int aclk_generate_topic_cache(struct json_object *json) const char *aclk_get_topic(enum aclk_topics topic) { if (!aclk_topic_cache) { - error("Topic cache not initialized"); + netdata_log_error("Topic cache not initialized"); return NULL; } @@ -303,7 +303,7 @@ const char *aclk_get_topic(enum aclk_topics topic) if (aclk_topic_cache[i]->topic_id == topic) return aclk_topic_cache[i]->topic; } - error("Unknown topic"); + netdata_log_error("Unknown topic"); return NULL; } @@ -315,7 +315,7 @@ const char *aclk_get_topic(enum aclk_topics topic) const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter) { if (!aclk_topic_cache) { - error("Topic cache not initialized when %s was called.", __FUNCTION__); + netdata_log_error("Topic cache not initialized when %s was called.", __FUNCTION__); return NULL; } diff --git a/aclk/https_client.c b/aclk/https_client.c index 345cf65a8..62f99aab6 100644 --- a/aclk/https_client.c +++ b/aclk/https_client.c @@ -70,17 +70,17 @@ static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx) char buf_val[HTTP_HDR_BUFFER_SIZE]; char *ptr = buf_key; if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx_end)) { - error("CRLF expected"); + netdata_log_error("CRLF expected"); return 1; } char *separator = rbuf_find_bytes(buf, HTTP_KEYVAL_SEPARATOR, strlen(HTTP_KEYVAL_SEPARATOR), &idx); if (!separator) { - error("Missing Key/Value separator"); + netdata_log_error("Missing Key/Value separator"); return 1; } if (idx >= HTTP_HDR_BUFFER_SIZE) { - error("Key name is too long"); + netdata_log_error("Key name is too long"); return 1; } @@ -90,7 +90,7 @@ static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx) rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR)); idx_end -= strlen(HTTP_KEYVAL_SEPARATOR) + idx; if (idx_end >= HTTP_HDR_BUFFER_SIZE) { - error("Value of key \"%s\" too long", buf_key); + netdata_log_error("Value of key \"%s\" too long", buf_key); return 1; } @@ -116,22 +116,22 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) switch (parse_ctx->state) { case HTTP_PARSE_INITIAL: if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) { - error("Expected response to start with \"%s\"", RESP_PROTO); + netdata_log_error("Expected response to start with \"%s\"", RESP_PROTO); return PARSE_ERROR; } rbuf_bump_tail(buf, strlen(RESP_PROTO)); if (rbuf_pop(buf, rc, 4) != 4) { - error("Expected HTTP status code"); + netdata_log_error("Expected HTTP status code"); return PARSE_ERROR; } if (rc[3] != ' ') { - error("Expected space after HTTP return code"); + netdata_log_error("Expected space after HTTP return code"); return PARSE_ERROR; } rc[3] = 0; parse_ctx->http_code = atoi(rc); if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) { - error("HTTP code not in range 100 to 599"); + netdata_log_error("HTTP code not in range 100 to 599"); return PARSE_ERROR; } @@ -186,7 +186,7 @@ typedef struct https_req_ctx { static int https_req_check_timedout(https_req_ctx_t *ctx) { if (now_realtime_sec() > ctx->req_start_time + ctx->request->timeout_s) { - error("request timed out"); + netdata_log_error("request timed out"); return 1; } return 0; @@ -220,12 +220,12 @@ static int socket_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { do { int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); if (ret < 0) { - error("poll error"); + netdata_log_error("poll error"); return 1; } if (ret == 0) { if (https_req_check_timedout(ctx)) { - error("Poll timed out"); + netdata_log_error("Poll timed out"); return 2; } continue; @@ -235,7 +235,7 @@ static int socket_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { if (ret > 0) { ctx->written += ret; } else if (errno != EAGAIN && errno != EWOULDBLOCK) { - error("Error writing to socket"); + netdata_log_error("Error writing to socket"); return 3; } } while (ctx->written < data_len); @@ -250,12 +250,12 @@ static int ssl_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { do { int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); if (ret < 0) { - error("poll error"); + netdata_log_error("poll error"); return 1; } if (ret == 0) { if (https_req_check_timedout(ctx)) { - error("Poll timed out"); + netdata_log_error("Poll timed out"); return 2; } continue; @@ -275,7 +275,7 @@ static int ssl_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { ctx->poll_fd.events |= POLLOUT; break; default: - error("SSL_write Err: %s", _ssl_err_tos(ret)); + netdata_log_error("SSL_write Err: %s", _ssl_err_tos(ret)); return 3; } } @@ -299,12 +299,12 @@ static int read_parse_response(https_req_ctx_t *ctx) { do { ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); if (ret < 0) { - error("poll error"); + netdata_log_error("poll error"); return 1; } if (ret == 0) { if (https_req_check_timedout(ctx)) { - error("Poll timed out"); + netdata_log_error("Poll timed out"); return 2; } if (!ctx->ssl_ctx) @@ -332,12 +332,12 @@ static int read_parse_response(https_req_ctx_t *ctx) { ctx->poll_fd.events |= POLLOUT; break; default: - error("SSL_read Err: %s", _ssl_err_tos(ret)); + netdata_log_error("SSL_read Err: %s", _ssl_err_tos(ret)); return 3; } } else { if (errno != EAGAIN && errno != EWOULDBLOCK) { - error("write error"); + netdata_log_error("write error"); return 3; } ctx->poll_fd.events |= POLLIN; @@ -346,7 +346,7 @@ static int read_parse_response(https_req_ctx_t *ctx) { } while (!(ret = parse_http_response(ctx->buf_rx, &ctx->parse_ctx))); if (ret != PARSE_SUCCESS) { - error("Error parsing HTTP response"); + netdata_log_error("Error parsing HTTP response"); return 1; } @@ -373,7 +373,7 @@ static int handle_http_request(https_req_ctx_t *ctx) { buffer_strcat(hdr, "POST "); break; default: - error("Unknown HTTPS request type!"); + netdata_log_error("Unknown HTTPS request type!"); rc = 1; goto err_exit; } @@ -419,14 +419,14 @@ static int handle_http_request(https_req_ctx_t *ctx) { // Send the request if (https_client_write_all(ctx, hdr->buffer, hdr->len)) { - error("Couldn't write HTTP request header into SSL connection"); + netdata_log_error("Couldn't write HTTP request header into SSL connection"); rc = 2; goto err_exit; } if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) { if (https_client_write_all(ctx, ctx->request->payload, ctx->request->payload_size)) { - error("Couldn't write payload into SSL connection"); + netdata_log_error("Couldn't write payload into SSL connection"); rc = 3; goto err_exit; } @@ -434,7 +434,7 @@ static int handle_http_request(https_req_ctx_t *ctx) { // Read The Response if (read_parse_response(ctx)) { - error("Error reading or parsing response from server"); + netdata_log_error("Error reading or parsing response from server"); rc = 4; goto err_exit; } @@ -456,7 +456,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) err_cert = X509_STORE_CTX_get_current_cert(ctx); err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0); - error("Cert Chain verify error:num=%d:%s:depth=%d:%s", err, + netdata_log_error("Cert Chain verify error:num=%d:%s:depth=%d:%s", err, X509_verify_cert_error_string(err), depth, err_str); free(err_str); @@ -466,7 +466,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT) { preverify_ok = 1; - error("Self Signed Certificate Accepted as the agent was built with ACLK_SSL_ALLOW_SELF_SIGNED"); + netdata_log_error("Self Signed Certificate Accepted as the agent was built with ACLK_SSL_ALLOW_SELF_SIGNED"); } #endif @@ -486,7 +486,7 @@ int https_request(https_req_t *request, https_req_response_t *response) { ctx->buf_rx = rbuf_create(RX_BUFFER_SIZE); if (!ctx->buf_rx) { - error("Couldn't allocate buffer for RX data"); + netdata_log_error("Couldn't allocate buffer for RX data"); goto exit_req_ctx; } @@ -494,12 +494,12 @@ int https_request(https_req_t *request, https_req_response_t *response) { ctx->sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, connect_host, 0, connect_port_str, &timeout); if (ctx->sock < 0) { - error("Error connecting TCP socket to \"%s\"", connect_host); + netdata_log_error("Error connecting TCP socket to \"%s\"", connect_host); goto exit_buf_rx; } if (fcntl(ctx->sock, F_SETFL, fcntl(ctx->sock, F_GETFL, 0) | O_NONBLOCK) == -1) { - error("Error setting O_NONBLOCK to TCP socket."); + netdata_log_error("Error setting O_NONBLOCK to TCP socket."); goto exit_sock; } @@ -517,39 +517,39 @@ int https_request(https_req_t *request, https_req_response_t *response) { req.proxy_password = request->proxy_password; ctx->request = &req; if (handle_http_request(ctx)) { - error("Failed to CONNECT with proxy"); + netdata_log_error("Failed to CONNECT with proxy"); goto exit_sock; } if (ctx->parse_ctx.http_code != 200) { - error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code); + netdata_log_error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code); goto exit_sock; } - info("Proxy accepted CONNECT upgrade"); + netdata_log_info("Proxy accepted CONNECT upgrade"); } ctx->request = request; ctx->ssl_ctx = netdata_ssl_create_client_ctx(0); if (ctx->ssl_ctx==NULL) { - error("Cannot allocate SSL context"); + netdata_log_error("Cannot allocate SSL context"); goto exit_sock; } if (!SSL_CTX_set_default_verify_paths(ctx->ssl_ctx)) { - error("Error setting default verify paths"); + netdata_log_error("Error setting default verify paths"); goto exit_CTX; } SSL_CTX_set_verify(ctx->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback); ctx->ssl = SSL_new(ctx->ssl_ctx); if (ctx->ssl==NULL) { - error("Cannot allocate SSL"); + netdata_log_error("Cannot allocate SSL"); goto exit_CTX; } SSL_set_fd(ctx->ssl, ctx->sock); ret = SSL_connect(ctx->ssl); if (ret != -1 && ret != 1) { - error("SSL could not connect"); + netdata_log_error("SSL could not connect"); goto exit_SSL; } if (ret == -1) { @@ -557,14 +557,14 @@ int https_request(https_req_t *request, https_req_response_t *response) { // consult SSL_connect documentation for details int ec = SSL_get_error(ctx->ssl, ret); if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) { - error("Failed to start SSL connection"); + netdata_log_error("Failed to start SSL connection"); goto exit_SSL; } } // The actual request here if (handle_http_request(ctx)) { - error("Couldn't process request"); + netdata_log_error("Couldn't process request"); goto exit_SSL; } response->http_code = ctx->parse_ctx.http_code; @@ -573,7 +573,7 @@ int https_request(https_req_t *request, https_req_response_t *response) { response->payload = mallocz(response->payload_size + 1); ret = rbuf_pop(ctx->buf_rx, response->payload, response->payload_size); if (ret != (int)response->payload_size) { - error("Payload size doesn't match remaining data on the buffer!"); + netdata_log_error("Payload size doesn't match remaining data on the buffer!"); response->payload_size = ret; } // normally we take payload as it is and copy it @@ -584,7 +584,7 @@ int https_request(https_req_t *request, https_req_response_t *response) { // only exact data without affixed 0x00 ((char*)response->payload)[response->payload_size] = 0; // mallocz(response->payload_size + 1); } - info("HTTPS \"%s\" request to \"%s\" finished with HTTP code: %d", http_req_type_to_str(ctx->request->request_type), ctx->request->host, response->http_code); + netdata_log_info("HTTPS \"%s\" request to \"%s\" finished with HTTP code: %d", http_req_type_to_str(ctx->request->request_type), ctx->request->host, response->http_code); rc = 0; @@ -627,16 +627,16 @@ static int parse_host_port(url_t *url) { if (ptr) { size_t port_len = strlen(ptr + 1); if (!port_len) { - error(URL_PARSER_LOG_PREFIX ": specified but no port number"); + netdata_log_error(URL_PARSER_LOG_PREFIX ": specified but no port number"); return 1; } if (port_len > 5 /* MAX port length is 5digit long in decimal */) { - error(URL_PARSER_LOG_PREFIX "port # is too long"); + netdata_log_error(URL_PARSER_LOG_PREFIX "port # is too long"); return 1; } *ptr = 0; if (!strlen(url->host)) { - error(URL_PARSER_LOG_PREFIX "host empty after removing port"); + netdata_log_error(URL_PARSER_LOG_PREFIX "host empty after removing port"); return 1; } url->port = atoi (ptr + 1); @@ -672,7 +672,7 @@ int url_parse(const char *url, url_t *parsed) { if (end) { if (end == start) { - error (URL_PARSER_LOG_PREFIX "found " URI_PROTO_SEPARATOR " without protocol specified"); + netdata_log_error(URL_PARSER_LOG_PREFIX "found " URI_PROTO_SEPARATOR " without protocol specified"); return 1; } @@ -685,7 +685,7 @@ int url_parse(const char *url, url_t *parsed) { end = start + strlen(start); if (start == end) { - error(URL_PARSER_LOG_PREFIX "Host empty"); + netdata_log_error(URL_PARSER_LOG_PREFIX "Host empty"); return 1; } diff --git a/aclk/schema-wrappers/alarm_config.cc b/aclk/schema-wrappers/alarm_config.cc index 56d7e6f39..fe0b0517c 100644 --- a/aclk/schema-wrappers/alarm_config.cc +++ b/aclk/schema-wrappers/alarm_config.cc @@ -48,6 +48,8 @@ void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg) freez(cfg->p_db_lookup_dimensions); freez(cfg->p_db_lookup_method); freez(cfg->p_db_lookup_options); + + freez(cfg->chart_labels); } char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data) @@ -127,6 +129,9 @@ char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_con cfg->set_p_db_lookup_options(data->cfg.p_db_lookup_options); cfg->set_p_update_every(data->cfg.p_update_every); + if (data->cfg.chart_labels) + cfg->set_chart_labels(data->cfg.chart_labels); + *len = PROTO_COMPAT_MSG_SIZE(msg); char *bin = (char*)mallocz(*len); if (!msg.SerializeToArray(bin, *len)) diff --git a/aclk/schema-wrappers/alarm_config.h b/aclk/schema-wrappers/alarm_config.h index 157fbc60f..4eaa4fd70 100644 --- a/aclk/schema-wrappers/alarm_config.h +++ b/aclk/schema-wrappers/alarm_config.h @@ -50,6 +50,8 @@ struct aclk_alarm_configuration { char *p_db_lookup_method; char *p_db_lookup_options; int32_t p_update_every; + + char *chart_labels; }; void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg); diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc index 11b9284f5..d1079a688 100644 --- a/aclk/schema-wrappers/alarm_stream.cc +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -59,7 +59,7 @@ static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status case aclk_alarm_status::ALARM_STATUS_CRITICAL: return alarms::v1::ALARM_STATUS_CRITICAL; default: - error("Unknown alarm status"); + netdata_log_error("Unknown alarm status"); return alarms::v1::ALARM_STATUS_UNKNOWN; } } -- cgit v1.2.3