summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:22 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:22 +0000
commitc21c3b0befeb46a51b6bf3758ffa30813bea0ff0 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /aclk/aclk.c
parentAdding upstream version 1.43.2. (diff)
downloadnetdata-c21c3b0befeb46a51b6bf3758ffa30813bea0ff0.tar.xz
netdata-c21c3b0befeb46a51b6bf3758ffa30813bea0ff0.zip
Adding upstream version 1.44.3.upstream/1.44.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--aclk/aclk.c97
1 files changed, 65 insertions, 32 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 854408ce6..e95d7d6ab 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -154,7 +154,9 @@ biofailed:
static int wait_till_cloud_enabled()
{
- netdata_log_info("Waiting for Cloud to be enabled");
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "Waiting for Cloud to be enabled");
+
while (!netdata_cloud_enabled) {
sleep_usec(USEC_PER_SEC * 1);
if (!service_running(SERVICE_ACLK))
@@ -233,17 +235,22 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
switch(log_type) {
case MQTT_WSS_LOG_ERROR:
case MQTT_WSS_LOG_FATAL:
+ nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str);
+ return;
+
case MQTT_WSS_LOG_WARN:
- error_report("%s", str);
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str);
return;
+
case MQTT_WSS_LOG_INFO:
- netdata_log_info("%s", str);
+ nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str);
return;
+
case MQTT_WSS_LOG_DEBUG:
- netdata_log_debug(D_ACLK, "%s", str);
return;
+
default:
- netdata_log_error("Unknown log type from mqtt_wss");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss");
}
}
@@ -297,7 +304,9 @@ static void puback_callback(uint16_t packet_id)
#endif
if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
- netdata_log_info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Shutdown message has been acknowledged by the cloud. Exiting gracefully");
+
aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
}
}
@@ -335,9 +344,11 @@ static int handle_connection(mqtt_wss_client client)
}
if (disconnect_req || aclk_kill_link) {
- netdata_log_info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
- disconnect_req ? "true" : "false",
- aclk_kill_link ? "true" : "false");
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
+ disconnect_req ? "true" : "false",
+ aclk_kill_link ? "true" : "false");
+
disconnect_req = 0;
aclk_kill_link = 0;
aclk_graceful_disconnect(client);
@@ -390,7 +401,9 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
void aclk_graceful_disconnect(mqtt_wss_client client)
{
- netdata_log_info("Preparing to gracefully shutdown ACLK connection");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Preparing to gracefully shutdown ACLK connection");
+
aclk_queue_lock();
aclk_queue_flush();
@@ -403,17 +416,22 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
break;
}
if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
- netdata_log_info("MQTT App Layer `disconnect` message sent successfully");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "MQTT App Layer `disconnect` message sent successfully");
break;
}
}
- netdata_log_info("ACLK link is down");
- netdata_log_access("ACLK DISCONNECTED");
+
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "ACLK link is down");
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
+
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
- netdata_log_info("Attempting to gracefully shutdown the MQTT/WSS connection");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Attempting to gracefully shutdown the MQTT/WSS connection");
+
mqtt_wss_disconnect(client, 1000);
}
@@ -455,7 +473,9 @@ static int aclk_block_till_recon_allowed() {
next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
- netdata_log_info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
+
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
@@ -593,7 +613,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
return 1;
}
- netdata_log_info("Attempting connection now");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Attempting connection now");
+
memset(&base_url, 0, sizeof(url_t));
if (url_parse(aclk_cloud_base_url, &base_url)) {
aclk_status = ACLK_STATUS_INVALID_CLOUD_URL;
@@ -680,7 +702,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
error_report("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
- netdata_log_info("New ACLK protobuf protocol negotiated successfully (/env response).");
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
@@ -750,9 +774,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
- netdata_log_info("ACLK connection successfully established");
+ nd_log(NDLS_DAEMON, NDLP_INFO, "ACLK connection successfully established");
aclk_status = ACLK_STATUS_CONNECTED;
- netdata_log_access("ACLK CONNECTED");
+ nd_log(NDLS_ACCESS, NDLP_INFO, "ACLK CONNECTED");
mqtt_connected_actions(client);
return 0;
}
@@ -798,7 +822,9 @@ void *aclk_main(void *ptr)
netdata_thread_disable_cancelability();
#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
- netdata_log_info("Killing ACLK thread -> cloud functionality has been disabled");
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "Killing ACLK thread -> cloud functionality has been disabled");
+
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
#endif
@@ -857,7 +883,7 @@ void *aclk_main(void *ptr)
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
- netdata_log_access("ACLK DISCONNECTED");
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
}
} while (service_running(SERVICE_ACLK));
@@ -891,7 +917,7 @@ exit:
return NULL;
}
-void aclk_host_state_update(RRDHOST *host, int cmd)
+void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
{
uuid_t node_id;
int ret = 0;
@@ -924,7 +950,9 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
rrdhost_aclk_state_unlock(localhost);
create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- netdata_log_info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+
aclk_queue_query(create_query);
return;
}
@@ -934,7 +962,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_instance_connection_t node_state_update = {
.hops = host->system_info->hops,
.live = cmd,
- .queryable = 1,
+ .queryable = queryable,
.session_id = aclk_session_newarch
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
@@ -947,8 +975,9 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- netdata_log_info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
- host->system_info->hops);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing status update for node=%s, live=%d, hops=%u, queryable=%d",
+ (char*)node_state_update.node_id, cmd, host->system_info->hops, queryable);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
@@ -990,9 +1019,10 @@ void aclk_send_node_instances()
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- netdata_log_info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
- list->live,
- list->hops);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing status update for node=%s, live=%d, hops=%d, queryable=1",
+ (char*)node_state_update.node_id, list->live, list->hops);
freez((void*)node_state_update.capabilities);
freez((void*)node_state_update.node_id);
@@ -1014,8 +1044,11 @@ void aclk_send_node_instances()
node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- netdata_log_info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
- list->hops);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing registration for host=%s, hops=%d",
+ (char*)node_instance_creation.machine_guid, list->hops);
+
freez((void *)node_instance_creation.machine_guid);
aclk_queue_query(create_query);
}
@@ -1322,7 +1355,7 @@ void add_aclk_host_labels(void) {
void aclk_queue_node_info(RRDHOST *host, bool immediate)
{
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (likely(wc))
wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
}