From b5321aff06d6ea8d730d62aec2ffd8e9271c1ffc Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 14 Apr 2022 20:12:10 +0200 Subject: Adding upstream version 1.34.0. Signed-off-by: Daniel Baumann --- aclk/README.md | 2 +- aclk/aclk.c | 361 +++++++++++++++++++++++++++++++++++++++++++----- aclk/aclk_api.c | 5 + aclk/aclk_query.c | 81 +++++++---- aclk/aclk_query.h | 4 + aclk/aclk_query_queue.c | 35 ----- aclk/aclk_query_queue.h | 5 +- aclk/aclk_rx_msgs.c | 16 ++- aclk/aclk_rx_msgs.h | 3 +- aclk/aclk_stats.c | 114 ++++++++++----- aclk/aclk_stats.h | 15 +- aclk/aclk_tx_msgs.c | 68 +++++++-- aclk/aclk_tx_msgs.h | 1 + aclk/aclk_util.c | 3 +- aclk/aclk_util.h | 15 +- aclk/https_client.c | 2 +- 16 files changed, 569 insertions(+), 161 deletions(-) (limited to 'aclk') diff --git a/aclk/README.md b/aclk/README.md index 870314be4..09c0d2920 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -134,4 +134,4 @@ If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` Restart your Agent and [connect your node](/claim/README.md#how-to-connect-a-node). -[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Faclk%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) + diff --git a/aclk/aclk.c b/aclk/aclk.c index c25b7df68..599b9a093 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -24,8 +24,16 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +int aclk_rcvd_cloud_msgs = 0; +int aclk_connection_counter = 0; int disconnect_req = 0; +time_t last_conn_time_mqtt = 0; +time_t last_conn_time_appl = 0; +time_t last_disconnect_time = 0; +time_t next_connection_attempt = 0; +float last_backoff_value = 0; + int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload time_t aclk_block_until = 0; @@ -43,8 +51,6 @@ struct aclk_shared_state aclk_shared_state = { .mqtt_shutdown_msg_rcvd = 0 }; -//ENDTODO - static RSA *aclk_private_key = NULL; static int load_private_key() { @@ -123,7 +129,7 @@ static int wait_till_agent_claimed(void) * @param aclk_hostname points to location where string pointer to hostname will be set * @param aclk_port port to int where port will be saved * - * @return If non 0 returned irrecoverable error happened and ACLK should be terminated + * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated */ static int wait_till_agent_claim_ready() { @@ -144,20 +150,20 @@ static int wait_till_agent_claim_ready() // TODO make it without malloc/free memset(&url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &url)) { - error("Agent is claimed but the configuration is invalid, please fix"); + error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); url_t_destroy(&url); sleep(5); continue; } url_t_destroy(&url); - if (!load_private_key()) { - sleep(5); - break; - } + if (!load_private_key()) + return 0; + + sleep(5); } - return 0; + return 1; } void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) @@ -266,6 +272,7 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t } static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { + aclk_rcvd_cloud_msgs++; if (aclk_use_new_cloud_arch) msg_callback_new_protocol(topic, msg, msglen, qos); else @@ -275,8 +282,10 @@ static inline void msg_callback(const char *topic, const void *msg, size_t msgle static void puback_callback(uint16_t packet_id) { - if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) + if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) { + last_conn_time_appl = now_realtime_sec(); aclk_tbeb_reset(); + } #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_puback(packet_id); @@ -402,6 +411,8 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; + aclk_rcvd_cloud_msgs = 0; + aclk_connection_counter++; #ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!aclk_use_new_cloud_arch) { @@ -480,6 +491,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client) info("ACLK link is down"); log_access("ACLK DISCONNECTED"); aclk_stats_upd_online(0); + last_disconnect_time = now_realtime_sec(); aclk_connected = 0; info("Attempting to gracefully shutdown the MQTT/WSS connection"); @@ -521,6 +533,9 @@ static unsigned long aclk_reconnect_delay() { static int aclk_block_till_recon_allowed() { unsigned long recon_delay = aclk_reconnect_delay(); + next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC); + last_backoff_value = (float)recon_delay / MSEC_PER_SEC; + info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC); // we want to wake up from time to time to check netdata_exit while (recon_delay) @@ -613,12 +628,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .drop_on_publish_fail = 1 }; -#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE) - aclk_use_new_cloud_arch = 1; - info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE."); -#else aclk_use_new_cloud_arch = 0; -#endif #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { @@ -638,20 +648,19 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (netdata_exit) return 1; -#ifndef ACLK_NEWARCH_DEVMODE if (aclk_env->encoding == ACLK_ENC_PROTO) { #ifndef ENABLE_NEW_CLOUD_PROTOCOL error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); continue; -#endif +#else if (!aclk_env_has_capa("proto")) { error ("Can't encoding=proto without at least \"proto\" capability."); continue; } info("Switching ACLK to new protobuf protocol. Due to /env response."); aclk_use_new_cloud_arch = 1; - } #endif + } memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { @@ -728,6 +737,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) json_object_put(lwt); if (!ret) { + last_conn_time_mqtt = now_realtime_sec(); info("ACLK connection successfully established"); log_access("ACLK CONNECTED"); mqtt_connected_actions(client); @@ -767,8 +777,9 @@ void *aclk_main(void *ptr) return NULL; } + unsigned int proto_hdl_cnt; #ifdef ENABLE_NEW_CLOUD_PROTOCOL - aclk_init_rx_msg_handlers(); + proto_hdl_cnt = aclk_init_rx_msg_handlers(); #endif // This thread is unusual in that it cannot be cancelled by cancel_main_threads() @@ -808,6 +819,7 @@ void *aclk_main(void *ptr) stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; + aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); netdata_thread_create( stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread); @@ -843,6 +855,7 @@ void *aclk_main(void *ptr) if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); + last_disconnect_time = now_realtime_sec(); aclk_connected = 0; log_access("ACLK DISCONNECTED"); } @@ -1092,6 +1105,7 @@ void aclk_send_node_instances() uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); query->data.node_update.queryable = 1; query->data.node_update.session_id = aclk_session_newarch; + freez(list->hostname); info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id, list->live, list->hops); @@ -1121,53 +1135,257 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) { + buffer_strcat(wb, "\nFailed to get alert streaming status for this host"); + return; + } + buffer_sprintf(wb, + "\n\t\tUpdates: %d" + "\n\t\tBatch ID: %"PRIu64 + "\n\t\tLast Acked Seq ID: %"PRIu64 + "\n\t\tPending Min Seq ID: %"PRIu64 + "\n\t\tPending Max Seq ID: %"PRIu64 + "\n\t\tLast Submitted Seq ID: %"PRIu64, + status.alert_updates, + status.alerts_batch_id, + status.last_acked_sequence_id, + status.pending_min_sequence_id, + status.pending_max_sequence_id, + status.last_submitted_sequence_id + ); +} + +static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) +{ + struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); + if (!stats) { + buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host"); + return; + } + buffer_sprintf(wb, + "\n\t\tUpdates: %d" + "\n\t\tBatch ID: %"PRIu64 + "\n\t\tMin Seq ID: %"PRIu64 + "\n\t\tMax Seq ID: %"PRIu64 + "\n\t\tPending Min Seq ID: %"PRIu64 + "\n\t\tPending Max Seq ID: %"PRIu64 + "\n\t\tSent Min Seq ID: %"PRIu64 + "\n\t\tSent Max Seq ID: %"PRIu64 + "\n\t\tAcked Min Seq ID: %"PRIu64 + "\n\t\tAcked Max Seq ID: %"PRIu64, + stats->updates, + stats->batch_id, + stats->min_seqid, + stats->max_seqid, + stats->min_seqid_pend, + stats->max_seqid_pend, + stats->min_seqid_sent, + stats->max_seqid_sent, + stats->min_seqid_ack, + stats->max_seqid_ack + ); + freez(stats); +} +#endif + char *ng_aclk_state(void) { BUFFER *wb = buffer_create(1024); + struct tm *tmptr, tmbuf; char *ret; buffer_strcat(wb, "ACLK Available: Yes\n" - "ACLK Implementation: Next Generation\n" + "ACLK Version: 2\n" #ifdef ENABLE_NEW_CLOUD_PROTOCOL - "New Cloud Protocol Support: Yes\n" + "Protocols Supported: Legacy, Protobuf\n" #else - "New Cloud Protocol Support: No\n" + "Protocols Supported: Legacy\n" #endif - "Claimed: " ); + buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); char *agent_id = is_agent_claimed(); if (agent_id == NULL) buffer_strcat(wb, "No\n"); else { - buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null"); freez(agent_id); } - buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy"); + buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No"); + if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf); + } + if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf); + } + if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf); + } + if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value); + } + + if (aclk_connected) { + buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname); + + buffer_strcat(wb, "\tClaimed ID: "); + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + buffer_strcat(wb, host->aclk_state.claimed_id); + else + buffer_strcat(wb, "null"); + rrdhost_aclk_state_unlock(host); + + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + buffer_strcat(wb, "\n\tNode ID: null\n"); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id); + } + + buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child"); + + if (host != localhost) + buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false"); + + buffer_strcat(wb, "\n\tAlert Streaming Status:"); + fill_alert_status_for_host(wb, host); + + buffer_strcat(wb, "\n\tChart Streaming Status:"); + fill_chart_status_for_host(wb, host); + } + rrd_unlock(); +#endif + } ret = strdupz(buffer_tostring(wb)); buffer_free(wb); return ret; } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) + return; + + json_object *tmp = json_object_new_int(status.alert_updates); + json_object_object_add(obj, "updates", tmp); + + tmp = json_object_new_int(status.alerts_batch_id); + json_object_object_add(obj, "batch-id", tmp); + + tmp = json_object_new_int(status.last_acked_sequence_id); + json_object_object_add(obj, "last-acked-seq-id", tmp); + + tmp = json_object_new_int(status.pending_min_sequence_id); + json_object_object_add(obj, "pending-min-seq-id", tmp); + + tmp = json_object_new_int(status.pending_max_sequence_id); + json_object_object_add(obj, "pending-max-seq-id", tmp); + + tmp = json_object_new_int(status.last_submitted_sequence_id); + json_object_object_add(obj, "last-submitted-seq-id", tmp); +} + +static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) +{ + struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); + if (!stats) + return; + + json_object *tmp = json_object_new_int(stats->updates); + json_object_object_add(obj, "updates", tmp); + + tmp = json_object_new_int(stats->batch_id); + json_object_object_add(obj, "batch-id", tmp); + + tmp = json_object_new_int(stats->min_seqid); + json_object_object_add(obj, "min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid); + json_object_object_add(obj, "max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_pend); + json_object_object_add(obj, "pending-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_pend); + json_object_object_add(obj, "pending-max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_sent); + json_object_object_add(obj, "sent-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_sent); + json_object_object_add(obj, "sent-max-seq-id", tmp); + + tmp = json_object_new_int(stats->min_seqid_ack); + json_object_object_add(obj, "acked-min-seq-id", tmp); + + tmp = json_object_new_int(stats->max_seqid_ack); + json_object_object_add(obj, "acked-max-seq-id", tmp); + + freez(stats); +} +#endif + +static json_object *timestamp_to_json(const time_t *t) +{ + struct tm *tmptr, tmbuf; + if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + return json_object_new_string(timebuf); + } + return NULL; +} + char *ng_aclk_state_json(void) { - json_object *tmp, *msg = json_object_new_object(); + json_object *tmp, *grp, *msg = json_object_new_object(); tmp = json_object_new_boolean(1); json_object_object_add(msg, "aclk-available", tmp); - tmp = json_object_new_string("Next Generation"); - json_object_object_add(msg, "aclk-implementation", tmp); + tmp = json_object_new_int(2); + json_object_object_add(msg, "aclk-version", tmp); + grp = json_object_new_array(); #ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_boolean(1); + tmp = json_object_new_string("Legacy"); + json_object_array_add(grp, tmp); + tmp = json_object_new_string("Protobuf"); + json_object_array_add(grp, tmp); #else - tmp = json_object_new_boolean(0); + tmp = json_object_new_string("Legacy"); + json_object_array_add(grp, tmp); #endif - json_object_object_add(msg, "new-cloud-protocol-supported", tmp); + json_object_object_add(msg, "protocols-supported", grp); char *agent_id = is_agent_claimed(); tmp = json_object_new_boolean(agent_id != NULL); @@ -1180,12 +1398,91 @@ char *ng_aclk_state_json(void) tmp = NULL; json_object_object_add(msg, "claimed-id", tmp); + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL; + json_object_object_add(msg, "cloud-url", tmp); + tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy"); + tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); json_object_object_add(msg, "used-cloud-protocol", tmp); + tmp = json_object_new_int(aclk_rcvd_cloud_msgs); + json_object_object_add(msg, "received-app-layer-msgs", tmp); + + tmp = json_object_new_int(aclk_pubacks_per_conn); + json_object_object_add(msg, "received-mqtt-pubacks", tmp); + + tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0); + json_object_object_add(msg, "reconnect-count", tmp); + + json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt)); + json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl)); + json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time)); + json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL); + tmp = NULL; + if (!aclk_connected && last_backoff_value) + tmp = json_object_new_double(last_backoff_value); + json_object_object_add(msg, "last-backoff-value", tmp); + + tmp = json_object_new_boolean(aclk_disable_runtime); + json_object_object_add(msg, "banned-by-cloud", tmp); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + grp = json_object_new_array(); + + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + json_object *nodeinstance = json_object_new_object(); + + tmp = json_object_new_string(host->hostname); + json_object_object_add(nodeinstance, "hostname", tmp); + + tmp = json_object_new_string(host->machine_guid); + json_object_object_add(nodeinstance, "mguid", tmp); + + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) { + tmp = json_object_new_string(host->aclk_state.claimed_id); + json_object_object_add(nodeinstance, "claimed_id", tmp); + } else + json_object_object_add(nodeinstance, "claimed_id", NULL); + rrdhost_aclk_state_unlock(host); + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + json_object_object_add(nodeinstance, "node-id", NULL); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + tmp = json_object_new_string(node_id); + json_object_object_add(nodeinstance, "node-id", tmp); + } + + tmp = json_object_new_int(host->system_info->hops); + json_object_object_add(nodeinstance, "streaming-hops", tmp); + + tmp = json_object_new_string(host == localhost ? "self" : "child"); + json_object_object_add(nodeinstance, "relationship", tmp); + + tmp = json_object_new_boolean((host->receiver || host == localhost)); + json_object_object_add(nodeinstance, "streaming-online", tmp); + + tmp = json_object_new_object(); + fill_alert_status_for_host_json(tmp, host); + json_object_object_add(nodeinstance, "alert-sync-status", tmp); + + tmp = json_object_new_object(); + fill_chart_status_for_host_json(tmp, host); + json_object_object_add(nodeinstance, "chart-sync-status", tmp); + + json_object_array_add(grp, nodeinstance); + } + rrd_unlock(); + json_object_object_add(msg, "node-instances", grp); +#endif + char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index 172cf2982..766f78053 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -70,6 +70,11 @@ struct label *add_aclk_host_labels(struct label *label) { label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO); label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "true", LABEL_SOURCE_AUTO); +#else + label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "false", LABEL_SOURCE_AUTO); +#endif #endif return label; } diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 001c1ba02..ae5659310 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -2,7 +2,6 @@ #include "aclk_query.h" #include "aclk_stats.h" -#include "aclk_query_queue.h" #include "aclk_tx_msgs.h" #define ACLK_QUERY_THREAD_NAME "ACLK_Query" @@ -59,6 +58,13 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id) { int res; uuid_t node_id_bin, host_id_bin; + + rrd_rdlock(); + RRDHOST *host = find_host_by_node_id((char *) node_id); + rrd_unlock(); + if (host) + return host; + char host_id[UUID_STR_LEN]; if (uuid_parse(node_id, node_id_bin)) { error("Couldn't parse UUID %s", node_id); @@ -99,26 +105,34 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; + buffer_strcat(log_buffer, query->data.http_api_v2.query); + size_t size = 0; + size_t sent = 0; + w->tv_in = query->created_tv; + now_realtime_timeval(&w->tv_ready); + if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); char nodeid[UUID_STR_LEN]; if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { - error("URL requests node_id but there is not enough chars following"); + error_report(CLOUD_EMSG_MALFORMED_NODE_ID); retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0); goto cleanup; } strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1); query_host = node_id_2_rrdhost(nodeid); if (!query_host) { - error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid); + error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); goto cleanup; } } - buffer_strcat(log_buffer, query->data.http_api_v2.query); - char *mysep = strchr(query->data.http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); @@ -136,11 +150,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } // execute the query - w->tv_in = query->created_tv; - now_realtime_timeval(&w->tv_ready); t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); - size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; - size_t sent = size; + size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; + sent = size; #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -174,6 +186,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) else error("Unknown error during zlib compression."); retval = 1; + w->response.code = 500; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_ZLIB_ERROR, CLOUD_EMSG_ZLIB_ERROR, NULL, 0); goto cleanup; } int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; @@ -214,8 +228,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) // send msg. aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); - // log. struct timeval tv; + +cleanup: now_realtime_timeval(&tv); log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", w->id @@ -232,7 +247,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) , strip_control_characters((char *)buffer_tostring(log_buffer)) ); -cleanup: #ifdef NETDATA_WITH_ZLIB if(w->response.zinitialized) deflateEnd(&w->response.zstream); @@ -287,27 +301,37 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) #endif aclk_query_handler aclk_query_handlers[] = { - { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, - { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, - { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata }, - { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, - { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, - { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, + { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 }, + { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query }, + { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata }, + { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata }, + { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, + { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, #ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register node", .fnc = register_node }, - { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update }, - { .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg }, - { .type = CHART_CONFIG_UPDATED, .name = "chart config updated", .fnc = send_bin_msg }, - { .type = CHART_RESET, .name = "reset chart messages", .fnc = send_bin_msg }, - { .type = RETENTION_UPDATED, .name = "update retention info", .fnc = send_bin_msg }, - { .type = UPDATE_NODE_INFO, .name = "update node info", .fnc = send_bin_msg }, - { .type = ALARM_LOG_HEALTH, .name = "alarm log health", .fnc = send_bin_msg }, - { .type = ALARM_PROVIDE_CFG, .name = "provide alarm config", .fnc = send_bin_msg }, - { .type = ALARM_SNAPSHOT, .name = "alarm snapshot", .fnc = send_bin_msg }, + { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node }, + { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update }, + { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, + { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, + { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, + { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg }, + { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg }, + { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg }, + { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg }, + { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg }, #endif { .type = UNKNOWN, .name = NULL, .fnc = NULL } }; +const char *aclk_query_get_name(aclk_query_type_t qt) +{ + aclk_query_handler *ptr = aclk_query_handlers; + while (ptr->type != UNKNOWN) { + if (ptr->type == qt) + return ptr->name; + ptr++; + } + return "unknown"; +} static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { @@ -315,13 +339,14 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que if (aclk_query_handlers[i].type == query->type) { debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); aclk_query_handlers[i].fnc(query_thr, query); - aclk_query_free(query); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; aclk_queries_per_thread[query_thr->idx]++; + aclk_metrics_per_sample.queries_per_type[query->type]++; ACLK_STATS_UNLOCK; } + aclk_query_free(query); return; } } diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h index 43741fb32..f86754a2a 100644 --- a/aclk/aclk_query.h +++ b/aclk/aclk_query.h @@ -7,6 +7,8 @@ #include "mqtt_wss_client.h" +#include "aclk_query_queue.h" + extern pthread_cond_t query_cond_wait; extern pthread_mutex_t query_lock_wait; #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) @@ -29,4 +31,6 @@ struct aclk_query_threads { void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client); void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); +const char *aclk_query_get_name(aclk_query_type_t qt); + #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index fe7ee123c..74a899226 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -45,49 +45,14 @@ static inline int _aclk_queue_query(aclk_query_t query) } -// Gets a pointer to the metric associated with a particular query type. -// NULL if the query type has no associated metric. -static inline volatile uint32_t *aclk_stats_qmetric_for_qtype(aclk_query_type_t qtype) { - switch (qtype) { - case HTTP_API_V2: - return &aclk_metrics_per_sample.query_type_http; - case ALARM_STATE_UPDATE: - return &aclk_metrics_per_sample.query_type_alarm_upd; - case METADATA_INFO: - return &aclk_metrics_per_sample.query_type_metadata_info; - case METADATA_ALARMS: - return &aclk_metrics_per_sample.query_type_metadata_alarms; - case CHART_NEW: - return &aclk_metrics_per_sample.query_type_chart_new; - case CHART_DEL: - return &aclk_metrics_per_sample.query_type_chart_del; - case REGISTER_NODE: - return &aclk_metrics_per_sample.query_type_register_node; - case NODE_STATE_UPDATE: - return &aclk_metrics_per_sample.query_type_node_upd; - default: - return NULL; - } -} - int aclk_queue_query(aclk_query_t query) { int ret = _aclk_queue_query(query); if (!ret) { - // local cache of query type before we wake up query thread, which may - // free the query in a race. - aclk_query_type_t qtype = query->type; QUERY_THREAD_WAKEUP; - if (aclk_stats_enabled) { - // get target query type metric before lock so we keep lock for - // minimal time. - volatile uint32_t *metric = aclk_stats_qmetric_for_qtype(qtype); - ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_queued++; - if (metric) - *metric += 1; ACLK_STATS_UNLOCK; } } diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index db6354433..88976f9eb 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -10,7 +10,7 @@ #include "aclk_util.h" typedef enum { - UNKNOWN, + UNKNOWN = 0, METADATA_INFO, METADATA_ALARMS, HTTP_API_V2, @@ -26,7 +26,8 @@ typedef enum { UPDATE_NODE_INFO, ALARM_LOG_HEALTH, ALARM_PROVIDE_CFG, - ALARM_SNAPSHOT + ALARM_SNAPSHOT, + ACLK_QUERY_TYPE_COUNT // always keep this as last } aclk_query_type_t; struct aclk_query_metadata { diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index ecb2b4179..1f2cb27ef 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -457,9 +457,15 @@ new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash) return NULL; } -void aclk_init_rx_msg_handlers(void) +const char *rx_handler_get_name(size_t i) { - for (int i = 0; rx_msgs[i].fnc; i++) { + return rx_msgs[i].name; +} + +unsigned int aclk_init_rx_msg_handlers(void) +{ + int i; + for (i = 0; rx_msgs[i].fnc; i++) { simple_hash_t hash = simple_hash(rx_msgs[i].name); new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash); if (unlikely(hdl)) { @@ -469,6 +475,7 @@ void aclk_init_rx_msg_handlers(void) } rx_msgs[i].name_hash = hash; } + return i; } void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) @@ -489,6 +496,11 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t } return; } + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++; + ACLK_STATS_UNLOCK; + } if (msg_descriptor->fnc(msg, msg_len)) { error("Error processing message of type '%s'", message_type); if (aclk_stats_enabled) { diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index 38243a4c9..00f88c6a8 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -11,7 +11,8 @@ int aclk_handle_cloud_cmd_message(char *payload); #ifdef ENABLE_NEW_CLOUD_PROTOCOL -void aclk_init_rx_msg_handlers(void); +const char *rx_handler_get_name(size_t i); +unsigned int aclk_init_rx_msg_handlers(void); void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len); #endif diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index a7d4a4709..a9f0a923c 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -2,9 +2,18 @@ #include "aclk_stats.h" +#include "aclk_query.h" + netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; -int query_thread_count; +struct { + int query_thread_count; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + unsigned int proto_hdl_cnt; + uint32_t *aclk_proto_rx_msgs_sample; + RRDDIM **rx_msg_dims; +#endif +} aclk_stats_cfg; // there is only 1 stats thread at a time // data ACLK stats need per query thread struct aclk_qt_data { @@ -13,6 +22,7 @@ struct aclk_qt_data { uint32_t *aclk_queries_per_thread = NULL; uint32_t *aclk_queries_per_thread_sample = NULL; +uint32_t *aclk_proto_rx_msgs_sample = NULL; struct aclk_metrics aclk_metrics = { .online = 0, @@ -113,39 +123,21 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; - static RRDDIM *rd_type_http = NULL; - static RRDDIM *rd_type_alarm_upd = NULL; - static RRDDIM *rd_type_metadata_info = NULL; - static RRDDIM *rd_type_metadata_alarms = NULL; - static RRDDIM *rd_type_chart_new = NULL; - static RRDDIM *rd_type_chart_del = NULL; - static RRDDIM *rd_type_register_node = NULL; - static RRDDIM *rd_type_node_upd = NULL; + static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT]; if (unlikely(!st)) { st = rrdset_create_localhost( "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s", "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - rd_type_http = rrddim_add(st, "http", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_type_alarm_upd = rrddim_add(st, "alarm update", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_type_metadata_info = rrddim_add(st, "info metadata", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_type_metadata_alarms = rrddim_add(st, "alarms metadata", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_type_chart_new = rrddim_add(st, "chart new", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_type_chart_del = rrddim_add(st, "chart delete", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_type_register_node = rrddim_add(st, "register node", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_type_node_upd = rrddim_add(st, "node update", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) + dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else rrdset_next(st); - rrddim_set_by_pointer(st, rd_type_http, per_sample->query_type_http); - rrddim_set_by_pointer(st, rd_type_alarm_upd, per_sample->query_type_alarm_upd); - rrddim_set_by_pointer(st, rd_type_metadata_info, per_sample->query_type_metadata_info); - rrddim_set_by_pointer(st, rd_type_metadata_alarms, per_sample->query_type_metadata_alarms); - rrddim_set_by_pointer(st, rd_type_chart_new, per_sample->query_type_chart_new); - rrddim_set_by_pointer(st, rd_type_chart_del, per_sample->query_type_chart_del); - rrddim_set_by_pointer(st, rd_type_register_node, per_sample->query_type_register_node); - rrddim_set_by_pointer(st, rd_type_node_upd, per_sample->query_type_node_upd); + for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) + rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]); rrdset_done(st); } @@ -202,7 +194,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", "netdata", "stats", 200009, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - for (int i = 0; i < query_thread_count; i++) { + for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) error("snprintf encoding error"); aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); @@ -210,7 +202,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) } else rrdset_next(st); - for (int i = 0; i < query_thread_count; i++) { + for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); } @@ -245,8 +237,57 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +const char *rx_handler_get_name(size_t i); +static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) +{ + static RRDSET *st = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_protobuf_rx_types", NULL, "aclk", NULL, "Received new cloud architecture messages by their type.", "msg/s", + "netdata", "stats", 200010, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) { + aclk_stats_cfg.rx_msg_dims[i] = rrddim_add(st, rx_handler_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } else + rrdset_next(st); + + for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) + rrddim_set_by_pointer(st, aclk_stats_cfg.rx_msg_dims[i], rx_msgs_sample[i]); + + rrdset_done(st); +} +#endif + +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) +{ +#ifndef ENABLE_NEW_CLOUD_PROTOCOL + UNUSED(proto_hdl_cnt); +#endif + + aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); + aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt; + aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); + aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); + aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*)); +#endif +} + void aclk_stats_thread_cleanup() { +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + freez(aclk_stats_cfg.rx_msg_dims); + freez(aclk_proto_rx_msgs_sample); + freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample); +#endif freez(aclk_qt_data); freez(aclk_queries_per_thread); freez(aclk_queries_per_thread_sample); @@ -256,17 +297,12 @@ void *aclk_stats_main_thread(void *ptr) { struct aclk_stats_thread *args = ptr; - query_thread_count = args->query_thread_count; - aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); - aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); - aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + aclk_stats_cfg.query_thread_count = args->query_thread_count; heartbeat_t hb; heartbeat_init(&hb); usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; - memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); - struct aclk_metrics_per_sample per_sample; struct aclk_metrics permanent; @@ -282,11 +318,15 @@ void *aclk_stats_main_thread(void *ptr) // to not hold lock longer than necessary, especially not to hold it // during database rrd* operations memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); + memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); +#endif memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); - memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); - memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); + memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count); + memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count); ACLK_STATS_UNLOCK; aclk_stats_collect(&per_sample, &permanent); @@ -302,6 +342,10 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_query_threads(aclk_queries_per_thread_sample); aclk_stats_query_time(&per_sample); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample); +#endif } return 0; diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index 3cc6a0cb0..4f2894798 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -5,6 +5,7 @@ #include "daemon/common.h" #include "libnetdata/libnetdata.h" +#include "aclk_query_queue.h" #define ACLK_STATS_THREAD_NAME "ACLK_Stats" @@ -49,14 +50,7 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_req_err; // query types. - volatile uint32_t query_type_http; - volatile uint32_t query_type_alarm_upd; - volatile uint32_t query_type_metadata_info; - volatile uint32_t query_type_metadata_alarms; - volatile uint32_t query_type_chart_new; - volatile uint32_t query_type_chart_del; - volatile uint32_t query_type_register_node; - volatile uint32_t query_type_node_upd; + volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT]; // HTTP-specific request types. volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT]; @@ -66,9 +60,14 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_q_process_max; } aclk_metrics_per_sample; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +extern uint32_t *aclk_proto_rx_msgs_sample; +#endif + extern uint32_t *aclk_queries_per_thread; void *aclk_stats_main_thread(void *ptr); +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt); void aclk_stats_thread_cleanup(); void aclk_stats_upd_online(int online); diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 74fc19c72..185f5d796 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -116,28 +116,30 @@ static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, co #define TOPIC_MAX_LEN 512 #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" -static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len) +static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len) { uint16_t packet_id; const char *str; - char *full_msg; - int len; + char *full_msg = NULL; + int len, rc; if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); - return; + return 500; } str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); len = strlen(str); - full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); + if (payload_len) { + full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); - memcpy(full_msg, str, len); - memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); - len += strlen(V2_BIN_PAYLOAD_SEPARATOR); - memcpy(&full_msg[len], payload, payload_len); - len += payload_len; + memcpy(full_msg, str, len); + memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); + len += strlen(V2_BIN_PAYLOAD_SEPARATOR); + memcpy(&full_msg[len], payload, payload_len); + len += payload_len; + } /* TODO #ifdef ACLK_LOG_CONVERSATION_DIR @@ -147,15 +149,22 @@ static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obje json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); #endif */ - int rc = mqtt_wss_publish_pid_block(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) + rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); + if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { error("Timeout sending binpacked message"); - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) + freez(full_msg); + return 503; + } + if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { error("Message is bigger than allowed maximum"); + freez(full_msg); + return 403; + } #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif freez(full_msg); + return 0; } /* @@ -316,6 +325,25 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) buffer_free(local_buffer); } +void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len) +{ + json_object *tmp, *msg; + msg = create_hdr("http", msg_id, 0, 0, 2); + tmp = json_object_new_int(http_code); + json_object_object_add(msg, "http-code", tmp); + + tmp = json_object_new_int(ec); + json_object_object_add(msg, "error-code", tmp); + + tmp = json_object_new_string(emsg); + json_object_object_add(msg, "error-description", tmp); + + if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { + error("Failed to send cancelation message for http reply"); + } + json_object_put(msg); +} + void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -331,8 +359,20 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg tmp = json_object_new_int(http_code); json_object_object_add(msg, "http-code", tmp); - aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); + int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); json_object_put(msg); + + switch (rc) { + case 403: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len); + break; + case 500: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len); + break; + case 503: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len); + break; + } } void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index da29a4a32..402f13fb6 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -14,6 +14,7 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); +void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len); void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart); diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index ee8fcaf94..5576a865a 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -41,6 +41,7 @@ void aclk_env_t_destroy(aclk_env_t *env) { for (size_t i = 0; i < env->transport_count; i++) { if(env->transports[i]) { aclk_transport_desc_t_destroy(env->transports[i]); + freez(env->transports[i]); env->transports[i] = NULL; } } @@ -64,7 +65,7 @@ int aclk_env_has_capa(const char *capa) #ifdef ACLK_LOG_CONVERSATION_DIR volatile int aclk_conversation_log_counter = 0; -#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS) +#if !defined(HAVE_C___ATOMIC) netdata_mutex_t aclk_conversation_log_mutex = NETDATA_MUTEX_INITIALIZER; int aclk_get_conv_log_next() { diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 4d8744e7f..7a7202076 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -5,6 +5,19 @@ #include "libnetdata/libnetdata.h" #include "mqtt_wss_client.h" +#define CLOUD_EC_MALFORMED_NODE_ID 1 +#define CLOUD_EMSG_MALFORMED_NODE_ID "URL requests node_id but there is not enough chars following (for it to be valid uuid)." +#define CLOUD_EC_NODE_NOT_FOUND 2 +#define CLOUD_EMSG_NODE_NOT_FOUND "Node with requested node_id not found" +#define CLOUD_EC_ZLIB_ERROR 3 +#define CLOUD_EMSG_ZLIB_ERROR "Error during zlib compression" +#define CLOUD_EC_REQ_REPLY_TOO_BIG 4 +#define CLOUD_EMSG_REQ_REPLY_TOO_BIG "Request reply produces message bigger than allowed maximum" +#define CLOUD_EC_FAIL_TOPIC 5 +#define CLOUD_EMSG_FAIL_TOPIC "Internal Topic Error" +#define CLOUD_EC_SND_TIMEOUT 6 +#define CLOUD_EMSG_SND_TIMEOUT "Timeout sending binpacked message" + // Helper stuff which should not have any further inside ACLK dependency // and are supposed not to be needed outside of ACLK extern int aclk_use_new_cloud_arch; @@ -86,7 +99,7 @@ void free_topic_cache(void); #ifdef ACLK_LOG_CONVERSATION_DIR extern volatile int aclk_conversation_log_counter; -#if defined(HAVE_C___ATOMIC) && !defined(NETDATA_NO_ATOMIC_INSTRUCTIONS) +#if defined(HAVE_C___ATOMIC) #define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST) #else extern netdata_mutex_t aclk_conversation_log_mutex; diff --git a/aclk/https_client.c b/aclk/https_client.c index 470c3fdf3..1a32f833f 100644 --- a/aclk/https_client.c +++ b/aclk/https_client.c @@ -587,7 +587,7 @@ void https_req_response_init(https_req_response_t *res) { res->payload_size = 0; } -static inline char *min_non_null(char *a, char *b) { +static inline char *UNUSED_FUNCTION(min_non_null)(char *a, char *b) { if (!a) return b; if (!b) -- cgit v1.2.3