diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 361 |
1 files changed, 329 insertions, 32 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index c25b7df6..599b9a09 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -24,8 +24,16 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +int aclk_rcvd_cloud_msgs = 0; +int aclk_connection_counter = 0; int disconnect_req = 0; +time_t last_conn_time_mqtt = 0; +time_t last_conn_time_appl = 0; +time_t last_disconnect_time = 0; +time_t next_connection_attempt = 0; +float last_backoff_value = 0; + int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload time_t aclk_block_until = 0; @@ -43,8 +51,6 @@ struct aclk_shared_state aclk_shared_state = { .mqtt_shutdown_msg_rcvd = 0 }; -//ENDTODO - static RSA *aclk_private_key = NULL; static int load_private_key() { @@ -123,7 +129,7 @@ static int wait_till_agent_claimed(void) * @param aclk_hostname points to location where string pointer to hostname will be set * @param aclk_port port to int where port will be saved * - * @return If non 0 returned irrecoverable error happened and ACLK should be terminated + * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated */ static int wait_till_agent_claim_ready() { @@ -144,20 +150,20 @@ static int wait_till_agent_claim_ready() // TODO make it without malloc/free memset(&url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &url)) { - error("Agent is claimed but the configuration is invalid, please fix"); + error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); url_t_destroy(&url); sleep(5); continue; } url_t_destroy(&url); - if (!load_private_key()) { - sleep(5); - break; - } + if (!load_private_key()) + return 0; + + sleep(5); } - return 0; + return 1; } void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) @@ -266,6 +272,7 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t } static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { + aclk_rcvd_cloud_msgs++; if (aclk_use_new_cloud_arch) msg_callback_new_protocol(topic, msg, msglen, qos); else @@ -275,8 +282,10 @@ static inline void msg_callback(const char *topic, const void *msg, size_t msgle static void puback_callback(uint16_t packet_id) { - if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) + if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) { + last_conn_time_appl = now_realtime_sec(); aclk_tbeb_reset(); + } #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_puback(packet_id); @@ -402,6 +411,8 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; + aclk_rcvd_cloud_msgs = 0; + aclk_connection_counter++; #ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!aclk_use_new_cloud_arch) { @@ -480,6 +491,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client) info("ACLK link is down"); log_access("ACLK DISCONNECTED"); aclk_stats_upd_online(0); + last_disconnect_time = now_realtime_sec(); aclk_connected = 0; info("Attempting to gracefully shutdown the MQTT/WSS connection"); @@ -521,6 +533,9 @@ static unsigned long aclk_reconnect_delay() { static int aclk_block_till_recon_allowed() { unsigned long recon_delay = aclk_reconnect_delay(); + next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC); + last_backoff_value = (float)recon_delay / MSEC_PER_SEC; + info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC); // we want to wake up from time to time to check netdata_exit while (recon_delay) @@ -613,12 +628,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .drop_on_publish_fail = 1 }; -#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE) - aclk_use_new_cloud_arch = 1; - info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE."); -#else aclk_use_new_cloud_arch = 0; -#endif #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { @@ -638,20 +648,19 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (netdata_exit) return 1; -#ifndef ACLK_NEWARCH_DEVMODE if (aclk_env->encoding == ACLK_ENC_PROTO) { #ifndef ENABLE_NEW_CLOUD_PROTOCOL error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); continue; -#endif +#else if (!aclk_env_has_capa("proto")) { error ("Can't encoding=proto without at least \"proto\" capability."); continue; } info("Switching ACLK to new protobuf protocol. Due to /env response."); aclk_use_new_cloud_arch = 1; - } #endif + } memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { @@ -728,6 +737,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) json_object_put(lwt); if (!ret) { + last_conn_time_mqtt = now_realtime_sec(); info("ACLK connection successfully established"); log_access("ACLK CONNECTED"); mqtt_connected_actions(client); @@ -767,8 +777,9 @@ void *aclk_main(void *ptr) return NULL; } + unsigned int proto_hdl_cnt; #ifdef ENABLE_NEW_CLOUD_PROTOCOL - aclk_init_rx_msg_handlers(); + proto_hdl_cnt = aclk_init_rx_msg_handlers(); #endif // This thread is unusual in that it cannot be cancelled by cancel_main_threads() @@ -808,6 +819,7 @@ void *aclk_main(void *ptr) stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; + aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); netdata_thread_create( stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread); @@ -843,6 +855,7 @@ void *aclk_main(void *ptr) if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); + last_disconnect_time = now_realtime_sec(); aclk_connected = 0; log_access("ACLK DISCONNECTED"); } @@ -1092,6 +1105,7 @@ void aclk_send_node_instances() uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); query->data.node_update.queryable = 1; query->data.node_update.session_id = aclk_session_newarch; + freez(list->hostname); info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id, list->live, list->hops); @@ -1121,53 +1135,257 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) { + buffer_strcat(wb, "\nFailed to get alert streaming status for this host"); + return; + } + buffer_sprintf(wb, + "\n\t\tUpdates: %d" + "\n\t\tBatch ID: %"PRIu64 + "\n\t\tLast Acked Seq ID: %"PRIu64 + "\n\t\tPending Min Seq ID: %"PRIu64 + "\n\t\tPending Max Seq ID: %"PRIu64 + "\n\t\tLast Submitted Seq ID: %"PRIu64, + status.alert_updates, + status.alerts_batch_id, + status.last_acked_sequence_id, + status.pending_min_sequence_id, + status.pending_max_sequence_id, + status.last_submitted_sequence_id + ); +} + +static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) +{ + struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); + if (!stats) { + buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host"); + return; + } + buffer_sprintf(wb, + "\n\t\tUpdates: %d" + "\n\t\tBatch ID: %"PRIu64 + "\n\t\tMin Seq ID: %"PRIu64 + "\n\t\tMax Seq ID: %"PRIu64 + "\n\t\tPending Min Seq ID: %"PRIu64 + "\n\t\tPending Max Seq ID: %"PRIu64 + "\n\t\tSent Min Seq ID: %"PRIu64 + "\n\t\tSent Max Seq ID: %"PRIu64 + "\n\t\tAcked Min Seq ID: %"PRIu64 + "\n\t\tAcked Max Seq ID: %"PRIu64, + stats->updates, + stats->batch_id, + stats->min_seqid, + stats->max_seqid, + stats->min_seqid_pend, + stats->max_seqid_pend, + stats->min_seqid_sent, + stats->max_seqid_sent, + stats->min_seqid_ack, + stats->max_seqid_ack + ); + freez(stats); +} +#endif + char *ng_aclk_state(void) { BUFFER *wb = buffer_create(1024); + struct tm *tmptr, tmbuf; char *ret; buffer_strcat(wb, "ACLK Available: Yes\n" - "ACLK Implementation: Next Generation\n" + "ACLK Version: 2\n" #ifdef ENABLE_NEW_CLOUD_PROTOCOL - "New Cloud Protocol Support: Yes\n" + "Protocols Supported: Legacy, Protobuf\n" #else - "New Cloud Protocol Support: No\n" + "Protocols Supported: Legacy\n" #endif - "Claimed: " ); + buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); char *agent_id = is_agent_claimed(); if (agent_id == NULL) buffer_strcat(wb, "No\n"); else { - buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null"); freez(agent_id); } - buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy"); + buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No"); + if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf); + } + if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf); + } + if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf); + } + if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value); + } + + if (aclk_connected) { + buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname); + + buffer_strcat(wb, "\tClaimed ID: "); + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + buffer_strcat(wb, host->aclk_state.claimed_id); + else + buffer_strcat(wb, "null"); + rrdhost_aclk_state_unlock(host); + + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + buffer_strcat(wb, "\n\tNode ID: null\n"); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id); + } + + buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child"); + + if (host != localhost) + buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false"); + + buffer_strcat(wb, "\n\tAlert Streaming Status:"); + fill_alert_status_for_host(wb, host); + + buffer_strcat(wb, "\n\tChart Streaming Status:"); + fill_chart_status_for_host(wb, host); + } + rrd_unlock(); +#endif + } ret = strdupz(buffer_tostring(wb)); buffer_free(wb); return ret; } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) + return; + + json_object *tmp = json_object_new_int(status.alert_updates); + json_object_object_add(obj, "updates", tmp); + + tmp = json_object_new_int(status.alerts_batch_id); + json_object_object_add(obj, "batch-id", tmp); + + tmp = json_object_new_int(status.last_acked_sequence_id); + json_object_object_add(obj, "last-acked-seq-id", tmp); + + tmp = json_object_new_int(status.pending_min_sequence_id); + json_object_object_add(obj, "pending-min-seq-id", tmp); + + tmp = json_object_new_int(status.pending_max_sequence_id); + json_object_object_add(obj, "pending-max-seq-id", tmp); + + tmp = json_object_new_int(status.last_submitted_sequence_id); + json_object_object_add(obj, "last-submitted-seq-id", tmp); +} + +static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) +{ + struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); + if (!stats) + return; + + json_object *tmp = json_object_new_int(stats->updates); + json_object_object_add(obj, "updates", tmp); + + tmp = json_object_new_int(stats->batch_id); + json_object_object_add(obj, "batch-id", tmp); + + tmp = json_object_new_int(stats->min_seqid); + json_object_object_add(obj, "min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid); + json_object_object_add(obj, "max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_pend); + json_object_object_add(obj, "pending-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_pend); + json_object_object_add(obj, "pending-max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_sent); + json_object_object_add(obj, "sent-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_sent); + json_object_object_add(obj, "sent-max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_ack); + json_object_object_add(obj, "acked-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_ack); + json_object_object_add(obj, "acked-max-seq-id", tmp); + + freez(stats); +} +#endif + +static json_object *timestamp_to_json(const time_t *t) +{ + struct tm *tmptr, tmbuf; + if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + return json_object_new_string(timebuf); + } + return NULL; +} + char *ng_aclk_state_json(void) { - json_object *tmp, *msg = json_object_new_object(); + json_object *tmp, *grp, *msg = json_object_new_object(); tmp = json_object_new_boolean(1); json_object_object_add(msg, "aclk-available", tmp); - tmp = json_object_new_string("Next Generation"); - json_object_object_add(msg, "aclk-implementation", tmp); + tmp = json_object_new_int(2); + json_object_object_add(msg, "aclk-version", tmp); + grp = json_object_new_array(); #ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_boolean(1); + tmp = json_object_new_string("Legacy"); + json_object_array_add(grp, tmp); + tmp = json_object_new_string("Protobuf"); + json_object_array_add(grp, tmp); #else - tmp = json_object_new_boolean(0); + tmp = json_object_new_string("Legacy"); + json_object_array_add(grp, tmp); #endif - json_object_object_add(msg, "new-cloud-protocol-supported", tmp); + json_object_object_add(msg, "protocols-supported", grp); char *agent_id = is_agent_claimed(); tmp = json_object_new_boolean(agent_id != NULL); @@ -1180,12 +1398,91 @@ char *ng_aclk_state_json(void) tmp = NULL; json_object_object_add(msg, "claimed-id", tmp); + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL; + json_object_object_add(msg, "cloud-url", tmp); + tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy"); + tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); json_object_object_add(msg, "used-cloud-protocol", tmp); + tmp = json_object_new_int(aclk_rcvd_cloud_msgs); + json_object_object_add(msg, "received-app-layer-msgs", tmp); + + tmp = json_object_new_int(aclk_pubacks_per_conn); + json_object_object_add(msg, "received-mqtt-pubacks", tmp); + + tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0); + json_object_object_add(msg, "reconnect-count", tmp); + + json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt)); + json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl)); + json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time)); + json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL); + tmp = NULL; + if (!aclk_connected && last_backoff_value) + tmp = json_object_new_double(last_backoff_value); + json_object_object_add(msg, "last-backoff-value", tmp); + + tmp = json_object_new_boolean(aclk_disable_runtime); + json_object_object_add(msg, "banned-by-cloud", tmp); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + grp = json_object_new_array(); + + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + json_object *nodeinstance = json_object_new_object(); + + tmp = json_object_new_string(host->hostname); + json_object_object_add(nodeinstance, "hostname", tmp); + + tmp = json_object_new_string(host->machine_guid); + json_object_object_add(nodeinstance, "mguid", tmp); + + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) { + tmp = json_object_new_string(host->aclk_state.claimed_id); + json_object_object_add(nodeinstance, "claimed_id", tmp); + } else + json_object_object_add(nodeinstance, "claimed_id", NULL); + rrdhost_aclk_state_unlock(host); + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + json_object_object_add(nodeinstance, "node-id", NULL); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + tmp = json_object_new_string(node_id); + json_object_object_add(nodeinstance, "node-id", tmp); + } + + tmp = json_object_new_int(host->system_info->hops); + json_object_object_add(nodeinstance, "streaming-hops", tmp); + + tmp = json_object_new_string(host == localhost ? "self" : "child"); + json_object_object_add(nodeinstance, "relationship", tmp); + + tmp = json_object_new_boolean((host->receiver || host == localhost)); + json_object_object_add(nodeinstance, "streaming-online", tmp); + + tmp = json_object_new_object(); + fill_alert_status_for_host_json(tmp, host); + json_object_object_add(nodeinstance, "alert-sync-status", tmp); + + tmp = json_object_new_object(); + fill_chart_status_for_host_json(tmp, host); + json_object_object_add(nodeinstance, "chart-sync-status", tmp); + + json_object_array_add(grp, nodeinstance); + } + rrd_unlock(); + json_object_object_add(msg, "node-instances", grp); +#endif + char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; |