summaryrefslogtreecommitdiffstats
path: root/aclk/aclk.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/aclk.c209
1 files changed, 88 insertions, 121 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 7b3641b1e..3b035b849 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -2,6 +2,7 @@
#include "aclk.h"
+#ifdef ENABLE_ACLK
#include "aclk_stats.h"
#include "mqtt_wss_client.h"
#include "aclk_otp.h"
@@ -12,6 +13,7 @@
#include "aclk_rx_msgs.h"
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_capas.h"
#include "aclk_proxy.h"
@@ -23,21 +25,31 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+#endif /* ENABLE_ACLK */
+
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
int aclk_rcvd_cloud_msgs = 0;
int aclk_connection_counter = 0;
int disconnect_req = 0;
+int aclk_connected = 0;
+int aclk_ctx_based = 0;
+int aclk_disable_runtime = 0;
+int aclk_stats_enabled;
+int aclk_kill_link = 0;
+
+usec_t aclk_session_us = 0;
+time_t aclk_session_sec = 0;
+
time_t last_conn_time_mqtt = 0;
time_t last_conn_time_appl = 0;
time_t last_disconnect_time = 0;
time_t next_connection_attempt = 0;
float last_backoff_value = 0;
-int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
-
time_t aclk_block_until = 0;
+#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -447,9 +459,9 @@ static int aclk_block_till_recon_allowed() {
*/
static int aclk_get_transport_idx(aclk_env_t *env) {
for (size_t i = 0; i < env->transport_count; i++) {
- // currently we support only MQTT 3
+ // currently we support only MQTT 5
// therefore select first transport that matches
- if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+ if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
return i;
}
}
@@ -483,7 +495,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
- error("Do not move the cloud base url out of post_conf_load!!");
+ error_report("Do not move the cloud base url out of post_conf_load!!");
return -1;
}
@@ -493,13 +505,13 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
info("Attempting connection now");
memset(&base_url, 0, sizeof(url_t));
if (url_parse(cloud_base_url, &base_url)) {
- error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
+ error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
url_t_destroy(&base_url);
continue;
}
- struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
struct mqtt_connect_params mqtt_conn_params = {
@@ -523,7 +535,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
url_t_destroy(&base_url);
if (ret) {
- error("Failed to Get ACLK environment");
+ error_report("Failed to Get ACLK environment");
// delay handled by aclk_block_till_recon_allowed
continue;
}
@@ -537,14 +549,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
if (!aclk_env_has_capa("proto")) {
- error ("Can't use encoding=proto without at least \"proto\" capability.");
+ error_report("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
info("New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
- error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
url_t_destroy(&auth_url);
continue;
}
@@ -552,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
url_t_destroy(&auth_url);
if (ret) {
- error("Error passing Challenge/Response to get OTP");
+ error_report("Error passing Challenge/Response to get OTP");
continue;
}
@@ -561,20 +573,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
- error("Couldn't get LWT topic. Will not send LWT.");
+ error_report("Couldn't get LWT topic. Will not send LWT.");
continue;
}
// Do the MQTT connection
ret = aclk_get_transport_idx(aclk_env);
if (ret < 0) {
- error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
continue;
}
memset(&mqtt_url, 0, sizeof(url_t));
if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
- error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
url_t_destroy(&mqtt_url);
continue;
}
@@ -660,9 +672,7 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -672,7 +682,7 @@ void *aclk_main(void *ptr)
// that send JSON payloads of 10 MB as single messages
mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
- aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
+ aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
if (aclk_stats_enabled) {
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
@@ -748,7 +758,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_instance_creation_t node_instance_creation = {
.claim_id = localhost->aclk_state.claimed_id,
.hops = host->system_info->hops,
- .hostname = host->hostname,
+ .hostname = rrdhost_hostname(host),
.machine_guid = host->machine_guid
};
create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
@@ -770,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_agent_capas();
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -815,15 +818,8 @@ void aclk_send_node_instances()
char host_id[UUID_STR_LEN];
uuid_unparse_lower(list->host_id, host_id);
- RRDHOST *host = rrdhost_find_by_guid(host_id, 0);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ RRDHOST *host = rrdhost_find_by_guid(host_id);
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -832,6 +828,8 @@ void aclk_send_node_instances()
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
+
+ freez((void*)node_state_update.capabilities);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
@@ -853,7 +851,7 @@ void aclk_send_node_instances()
rrdhost_aclk_state_unlock(localhost);
info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
- freez(node_instance_creation.machine_guid);
+ freez((void *)node_instance_creation.machine_guid);
aclk_queue_query(create_query);
}
freez(list->hostname);
@@ -891,41 +889,13 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
status.last_submitted_sequence_id
);
}
+#endif /* ENABLE_ACLK */
-static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
-{
- struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
- if (!stats) {
- buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host");
- return;
- }
- buffer_sprintf(wb,
- "\n\t\tUpdates: %d"
- "\n\t\tBatch ID: %"PRIu64
- "\n\t\tMin Seq ID: %"PRIu64
- "\n\t\tMax Seq ID: %"PRIu64
- "\n\t\tPending Min Seq ID: %"PRIu64
- "\n\t\tPending Max Seq ID: %"PRIu64
- "\n\t\tSent Min Seq ID: %"PRIu64
- "\n\t\tSent Max Seq ID: %"PRIu64
- "\n\t\tAcked Min Seq ID: %"PRIu64
- "\n\t\tAcked Max Seq ID: %"PRIu64,
- stats->updates,
- stats->batch_id,
- stats->min_seqid,
- stats->max_seqid,
- stats->min_seqid_pend,
- stats->max_seqid_pend,
- stats->min_seqid_sent,
- stats->max_seqid_sent,
- stats->min_seqid_ack,
- stats->max_seqid_ack
- );
- freez(stats);
-}
-
-char *ng_aclk_state(void)
+char *aclk_state(void)
{
+#ifndef ENABLE_ACLK
+ return strdupz("ACLK Available: No");
+#else
BUFFER *wb = buffer_create(1024);
struct tm *tmptr, tmbuf;
char *ret;
@@ -935,7 +905,7 @@ char *ng_aclk_state(void)
"ACLK Version: 2\n"
"Protocols Supported: Protobuf\n"
);
- buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
+ buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
char *agent_id = get_agent_claimid();
if (agent_id == NULL)
@@ -974,7 +944,7 @@ char *ng_aclk_state(void)
RRDHOST *host;
rrd_rdlock();
rrdhost_foreach_read(host) {
- buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname);
+ buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
buffer_strcat(wb, "\tClaimed ID: ");
rrdhost_aclk_state_lock(host);
@@ -1000,9 +970,6 @@ char *ng_aclk_state(void)
buffer_strcat(wb, "\n\tAlert Streaming Status:");
fill_alert_status_for_host(wb, host);
-
- buffer_strcat(wb, "\n\tChart Streaming Status:");
- fill_chart_status_for_host(wb, host);
}
rrd_unlock();
}
@@ -1010,8 +977,10 @@ char *ng_aclk_state(void)
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
return ret;
+#endif /* ENABLE_ACLK */
}
+#ifdef ENABLE_ACLK
static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
{
struct proto_alert_status status;
@@ -1038,45 +1007,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
json_object_object_add(obj, "last-submitted-seq-id", tmp);
}
-static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
-{
- struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
- if (!stats)
- return;
-
- json_object *tmp = json_object_new_int(stats->updates);
- json_object_object_add(obj, "updates", tmp);
-
- tmp = json_object_new_int(stats->batch_id);
- json_object_object_add(obj, "batch-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid);
- json_object_object_add(obj, "min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid);
- json_object_object_add(obj, "max-seq-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid_pend);
- json_object_object_add(obj, "pending-min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid_pend);
- json_object_object_add(obj, "pending-max-seq-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid_sent);
- json_object_object_add(obj, "sent-min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid_sent);
- json_object_object_add(obj, "sent-max-seq-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid_ack);
- json_object_object_add(obj, "acked-min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid_ack);
- json_object_object_add(obj, "acked-max-seq-id", tmp);
-
- freez(stats);
-}
-
static json_object *timestamp_to_json(const time_t *t)
{
struct tm *tmptr, tmbuf;
@@ -1087,9 +1017,13 @@ static json_object *timestamp_to_json(const time_t *t)
}
return NULL;
}
+#endif /* ENABLE_ACLK */
-char *ng_aclk_state_json(void)
+char *aclk_state_json(void)
{
+#ifndef ENABLE_ACLK
+ return strdupz("{\"aclk-available\":false}");
+#else
json_object *tmp, *grp, *msg = json_object_new_object();
tmp = json_object_new_boolean(1);
@@ -1124,7 +1058,7 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_string("Protobuf");
json_object_object_add(msg, "used-cloud-protocol", tmp);
- tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
+ tmp = json_object_new_int(5);
json_object_object_add(msg, "mqtt-version", tmp);
tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
@@ -1155,7 +1089,7 @@ char *ng_aclk_state_json(void)
rrdhost_foreach_read(host) {
json_object *nodeinstance = json_object_new_object();
- tmp = json_object_new_string(host->hostname);
+ tmp = json_object_new_string(rrdhost_hostname(host));
json_object_object_add(nodeinstance, "hostname", tmp);
tmp = json_object_new_string(host->machine_guid);
@@ -1191,10 +1125,6 @@ char *ng_aclk_state_json(void)
fill_alert_status_for_host_json(tmp, host);
json_object_object_add(nodeinstance, "alert-sync-status", tmp);
- tmp = json_object_new_object();
- fill_chart_status_for_host_json(tmp, host);
- json_object_object_add(nodeinstance, "chart-sync-status", tmp);
-
json_object_array_add(grp, nodeinstance);
}
rrd_unlock();
@@ -1203,4 +1133,41 @@ char *ng_aclk_state_json(void)
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;
+#endif /* ENABLE_ACLK */
+}
+
+void add_aclk_host_labels(void) {
+ DICTIONARY *labels = localhost->rrdlabels;
+
+#ifdef ENABLE_ACLK
+ rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+ ACLK_PROXY_TYPE aclk_proxy;
+ char *proxy_str;
+ aclk_get_proxy(&aclk_proxy);
+
+ switch(aclk_proxy) {
+ case PROXY_TYPE_SOCKS5:
+ proxy_str = "SOCKS5";
+ break;
+ case PROXY_TYPE_HTTP:
+ proxy_str = "HTTP";
+ break;
+ default:
+ proxy_str = "none";
+ break;
+ }
+
+ rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#else
+ rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#endif
+}
+
+void aclk_queue_node_info(RRDHOST *host) {
+ struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
+ if (likely(wc)) {
+ wc->node_info_send = 1;
+ }
}