diff options
Diffstat (limited to 'aclk/aclk.c')
-rw-r--r-- | aclk/aclk.c | 209 |
1 files changed, 88 insertions, 121 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 7b3641b1e..3b035b849 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -2,6 +2,7 @@ #include "aclk.h" +#ifdef ENABLE_ACLK #include "aclk_stats.h" #include "mqtt_wss_client.h" #include "aclk_otp.h" @@ -12,6 +13,7 @@ #include "aclk_rx_msgs.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" +#include "aclk_capas.h" #include "aclk_proxy.h" @@ -23,21 +25,31 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable +#endif /* ENABLE_ACLK */ + int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. int aclk_rcvd_cloud_msgs = 0; int aclk_connection_counter = 0; int disconnect_req = 0; +int aclk_connected = 0; +int aclk_ctx_based = 0; +int aclk_disable_runtime = 0; +int aclk_stats_enabled; +int aclk_kill_link = 0; + +usec_t aclk_session_us = 0; +time_t aclk_session_sec = 0; + time_t last_conn_time_mqtt = 0; time_t last_conn_time_appl = 0; time_t last_disconnect_time = 0; time_t next_connection_attempt = 0; float last_backoff_value = 0; -int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload - time_t aclk_block_until = 0; +#ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; @@ -447,9 +459,9 @@ static int aclk_block_till_recon_allowed() { */ static int aclk_get_transport_idx(aclk_env_t *env) { for (size_t i = 0; i < env->transport_count; i++) { - // currently we support only MQTT 3 + // currently we support only MQTT 5 // therefore select first transport that matches - if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) { + if (env->transports[i]->type == ACLK_TRP_MQTT_5) { return i; } } @@ -483,7 +495,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { - error("Do not move the cloud base url out of post_conf_load!!"); + error_report("Do not move the cloud base url out of post_conf_load!!"); return -1; } @@ -493,13 +505,13 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) info("Attempting connection now"); memset(&base_url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &base_url)) { - error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); + error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); url_t_destroy(&base_url); continue; } - struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT }; + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT }; aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); struct mqtt_connect_params mqtt_conn_params = { @@ -523,7 +535,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_env(aclk_env, base_url.host, base_url.port); url_t_destroy(&base_url); if (ret) { - error("Failed to Get ACLK environment"); + error_report("Failed to Get ACLK environment"); // delay handled by aclk_block_till_recon_allowed continue; } @@ -537,14 +549,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } if (!aclk_env_has_capa("proto")) { - error ("Can't use encoding=proto without at least \"proto\" capability."); + error_report("Can't use encoding=proto without at least \"proto\" capability."); continue; } info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { - error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); + error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); url_t_destroy(&auth_url); continue; } @@ -552,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); url_t_destroy(&auth_url); if (ret) { - error("Error passing Challenge/Response to get OTP"); + error_report("Error passing Challenge/Response to get OTP"); continue; } @@ -561,20 +573,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { - error("Couldn't get LWT topic. Will not send LWT."); + error_report("Couldn't get LWT topic. Will not send LWT."); continue; } // Do the MQTT connection ret = aclk_get_transport_idx(aclk_env); if (ret < 0) { - error("Cloud /env endpoint didn't return any transport usable by this Agent."); + error_report("Cloud /env endpoint didn't return any transport usable by this Agent."); continue; } memset(&mqtt_url, 0, sizeof(url_t)); if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ - error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); + error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); url_t_destroy(&mqtt_url); continue; } @@ -660,9 +672,7 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); - - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) { error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -672,7 +682,7 @@ void *aclk_main(void *ptr) // that send JSON payloads of 10 MB as single messages mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024); - aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled); if (aclk_stats_enabled) { stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); @@ -748,7 +758,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) node_instance_creation_t node_instance_creation = { .claim_id = localhost->aclk_state.claimed_id, .hops = host->system_info->hops, - .hostname = host->hostname, + .hostname = rrdhost_hostname(host), .machine_guid = host->machine_guid }; create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); @@ -770,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(node_id, (char*)node_state_update.node_id); - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + node_state_update.capabilities = aclk_get_agent_capas(); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; @@ -815,15 +818,8 @@ void aclk_send_node_instances() char host_id[UUID_STR_LEN]; uuid_unparse_lower(list->host_id, host_id); - RRDHOST *host = rrdhost_find_by_guid(host_id, 0); - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + RRDHOST *host = rrdhost_find_by_guid(host_id); + node_state_update.capabilities = aclk_get_node_instance_capas(host); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; @@ -832,6 +828,8 @@ void aclk_send_node_instances() info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); + + freez((void*)node_state_update.capabilities); freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; @@ -853,7 +851,7 @@ void aclk_send_node_instances() rrdhost_aclk_state_unlock(localhost); info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); - freez(node_instance_creation.machine_guid); + freez((void *)node_instance_creation.machine_guid); aclk_queue_query(create_query); } freez(list->hostname); @@ -891,41 +889,13 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) status.last_submitted_sequence_id ); } +#endif /* ENABLE_ACLK */ -static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) -{ - struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); - if (!stats) { - buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host"); - return; - } - buffer_sprintf(wb, - "\n\t\tUpdates: %d" - "\n\t\tBatch ID: %"PRIu64 - "\n\t\tMin Seq ID: %"PRIu64 - "\n\t\tMax Seq ID: %"PRIu64 - "\n\t\tPending Min Seq ID: %"PRIu64 - "\n\t\tPending Max Seq ID: %"PRIu64 - "\n\t\tSent Min Seq ID: %"PRIu64 - "\n\t\tSent Max Seq ID: %"PRIu64 - "\n\t\tAcked Min Seq ID: %"PRIu64 - "\n\t\tAcked Max Seq ID: %"PRIu64, - stats->updates, - stats->batch_id, - stats->min_seqid, - stats->max_seqid, - stats->min_seqid_pend, - stats->max_seqid_pend, - stats->min_seqid_sent, - stats->max_seqid_sent, - stats->min_seqid_ack, - stats->max_seqid_ack - ); - freez(stats); -} - -char *ng_aclk_state(void) +char *aclk_state(void) { +#ifndef ENABLE_ACLK + return strdupz("ACLK Available: No"); +#else BUFFER *wb = buffer_create(1024); struct tm *tmptr, tmbuf; char *ret; @@ -935,7 +905,7 @@ char *ng_aclk_state(void) "ACLK Version: 2\n" "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5); char *agent_id = get_agent_claimid(); if (agent_id == NULL) @@ -974,7 +944,7 @@ char *ng_aclk_state(void) RRDHOST *host; rrd_rdlock(); rrdhost_foreach_read(host) { - buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname); + buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host)); buffer_strcat(wb, "\tClaimed ID: "); rrdhost_aclk_state_lock(host); @@ -1000,9 +970,6 @@ char *ng_aclk_state(void) buffer_strcat(wb, "\n\tAlert Streaming Status:"); fill_alert_status_for_host(wb, host); - - buffer_strcat(wb, "\n\tChart Streaming Status:"); - fill_chart_status_for_host(wb, host); } rrd_unlock(); } @@ -1010,8 +977,10 @@ char *ng_aclk_state(void) ret = strdupz(buffer_tostring(wb)); buffer_free(wb); return ret; +#endif /* ENABLE_ACLK */ } +#ifdef ENABLE_ACLK static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { struct proto_alert_status status; @@ -1038,45 +1007,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) json_object_object_add(obj, "last-submitted-seq-id", tmp); } -static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) -{ - struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); - if (!stats) - return; - - json_object *tmp = json_object_new_int(stats->updates); - json_object_object_add(obj, "updates", tmp); - - tmp = json_object_new_int(stats->batch_id); - json_object_object_add(obj, "batch-id", tmp); - - tmp = json_object_new_int(stats->min_seqid); - json_object_object_add(obj, "min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid); - json_object_object_add(obj, "max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_pend); - json_object_object_add(obj, "pending-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_pend); - json_object_object_add(obj, "pending-max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_sent); - json_object_object_add(obj, "sent-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_sent); - json_object_object_add(obj, "sent-max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_ack); - json_object_object_add(obj, "acked-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_ack); - json_object_object_add(obj, "acked-max-seq-id", tmp); - - freez(stats); -} - static json_object *timestamp_to_json(const time_t *t) { struct tm *tmptr, tmbuf; @@ -1087,9 +1017,13 @@ static json_object *timestamp_to_json(const time_t *t) } return NULL; } +#endif /* ENABLE_ACLK */ -char *ng_aclk_state_json(void) +char *aclk_state_json(void) { +#ifndef ENABLE_ACLK + return strdupz("{\"aclk-available\":false}"); +#else json_object *tmp, *grp, *msg = json_object_new_object(); tmp = json_object_new_boolean(1); @@ -1124,7 +1058,7 @@ char *ng_aclk_state_json(void) tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); - tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); + tmp = json_object_new_int(5); json_object_object_add(msg, "mqtt-version", tmp); tmp = json_object_new_int(aclk_rcvd_cloud_msgs); @@ -1155,7 +1089,7 @@ char *ng_aclk_state_json(void) rrdhost_foreach_read(host) { json_object *nodeinstance = json_object_new_object(); - tmp = json_object_new_string(host->hostname); + tmp = json_object_new_string(rrdhost_hostname(host)); json_object_object_add(nodeinstance, "hostname", tmp); tmp = json_object_new_string(host->machine_guid); @@ -1191,10 +1125,6 @@ char *ng_aclk_state_json(void) fill_alert_status_for_host_json(tmp, host); json_object_object_add(nodeinstance, "alert-sync-status", tmp); - tmp = json_object_new_object(); - fill_chart_status_for_host_json(tmp, host); - json_object_object_add(nodeinstance, "chart-sync-status", tmp); - json_object_array_add(grp, nodeinstance); } rrd_unlock(); @@ -1203,4 +1133,41 @@ char *ng_aclk_state_json(void) char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; +#endif /* ENABLE_ACLK */ +} + +void add_aclk_host_labels(void) { + DICTIONARY *labels = localhost->rrdlabels; + +#ifdef ENABLE_ACLK + rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); + ACLK_PROXY_TYPE aclk_proxy; + char *proxy_str; + aclk_get_proxy(&aclk_proxy); + + switch(aclk_proxy) { + case PROXY_TYPE_SOCKS5: + proxy_str = "SOCKS5"; + break; + case PROXY_TYPE_HTTP: + proxy_str = "HTTP"; + break; + default: + proxy_str = "none"; + break; + } + + rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); +#else + rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); +#endif +} + +void aclk_queue_node_info(RRDHOST *host) { + struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker; + if (likely(wc)) { + wc->node_info_send = 1; + } } |