summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to 'aclk')
-rw-r--r--aclk/README.md2
-rw-r--r--aclk/aclk.c361
-rw-r--r--aclk/aclk_api.c5
-rw-r--r--aclk/aclk_query.c81
-rw-r--r--aclk/aclk_query.h4
-rw-r--r--aclk/aclk_query_queue.c35
-rw-r--r--aclk/aclk_query_queue.h5
-rw-r--r--aclk/aclk_rx_msgs.c16
-rw-r--r--aclk/aclk_rx_msgs.h3
-rw-r--r--aclk/aclk_stats.c114
-rw-r--r--aclk/aclk_stats.h15
-rw-r--r--aclk/aclk_tx_msgs.c68
-rw-r--r--aclk/aclk_tx_msgs.h1
-rw-r--r--aclk/aclk_util.c3
-rw-r--r--aclk/aclk_util.h15
-rw-r--r--aclk/https_client.c2
16 files changed, 569 insertions, 161 deletions
diff --git a/aclk/README.md b/aclk/README.md
index 870314be4..09c0d2920 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -134,4 +134,4 @@ If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf`
Restart your Agent and [connect your node](/claim/README.md#how-to-connect-a-node).
-[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Faclk%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)
+
diff --git a/aclk/aclk.c b/aclk/aclk.c
index c25b7df68..599b9a093 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -24,8 +24,16 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
+int aclk_rcvd_cloud_msgs = 0;
+int aclk_connection_counter = 0;
int disconnect_req = 0;
+time_t last_conn_time_mqtt = 0;
+time_t last_conn_time_appl = 0;
+time_t last_disconnect_time = 0;
+time_t next_connection_attempt = 0;
+float last_backoff_value = 0;
+
int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
time_t aclk_block_until = 0;
@@ -43,8 +51,6 @@ struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_rcvd = 0
};
-//ENDTODO
-
static RSA *aclk_private_key = NULL;
static int load_private_key()
{
@@ -123,7 +129,7 @@ static int wait_till_agent_claimed(void)
* @param aclk_hostname points to location where string pointer to hostname will be set
* @param aclk_port port to int where port will be saved
*
- * @return If non 0 returned irrecoverable error happened and ACLK should be terminated
+ * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
*/
static int wait_till_agent_claim_ready()
{
@@ -144,20 +150,20 @@ static int wait_till_agent_claim_ready()
// TODO make it without malloc/free
memset(&url, 0, sizeof(url_t));
if (url_parse(cloud_base_url, &url)) {
- error("Agent is claimed but the configuration is invalid, please fix");
+ error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
url_t_destroy(&url);
sleep(5);
continue;
}
url_t_destroy(&url);
- if (!load_private_key()) {
- sleep(5);
- break;
- }
+ if (!load_private_key())
+ return 0;
+
+ sleep(5);
}
- return 0;
+ return 1;
}
void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
@@ -266,6 +272,7 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t
}
static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
+ aclk_rcvd_cloud_msgs++;
if (aclk_use_new_cloud_arch)
msg_callback_new_protocol(topic, msg, msglen, qos);
else
@@ -275,8 +282,10 @@ static inline void msg_callback(const char *topic, const void *msg, size_t msgle
static void puback_callback(uint16_t packet_id)
{
- if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
+ if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
+ last_conn_time_appl = now_realtime_sec();
aclk_tbeb_reset();
+ }
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_puback(packet_id);
@@ -402,6 +411,8 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
+ aclk_rcvd_cloud_msgs = 0;
+ aclk_connection_counter++;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch) {
@@ -480,6 +491,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
info("ACLK link is down");
log_access("ACLK DISCONNECTED");
aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
info("Attempting to gracefully shutdown the MQTT/WSS connection");
@@ -521,6 +533,9 @@ static unsigned long aclk_reconnect_delay() {
static int aclk_block_till_recon_allowed() {
unsigned long recon_delay = aclk_reconnect_delay();
+ next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
+ last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
+
info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
@@ -613,12 +628,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.drop_on_publish_fail = 1
};
-#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE)
- aclk_use_new_cloud_arch = 1;
- info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE.");
-#else
aclk_use_new_cloud_arch = 0;
-#endif
#ifndef ACLK_DISABLE_CHALLENGE
if (aclk_env) {
@@ -638,20 +648,19 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (netdata_exit)
return 1;
-#ifndef ACLK_NEWARCH_DEVMODE
if (aclk_env->encoding == ACLK_ENC_PROTO) {
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
continue;
-#endif
+#else
if (!aclk_env_has_capa("proto")) {
error ("Can't encoding=proto without at least \"proto\" capability.");
continue;
}
info("Switching ACLK to new protobuf protocol. Due to /env response.");
aclk_use_new_cloud_arch = 1;
- }
#endif
+ }
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
@@ -728,6 +737,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
json_object_put(lwt);
if (!ret) {
+ last_conn_time_mqtt = now_realtime_sec();
info("ACLK connection successfully established");
log_access("ACLK CONNECTED");
mqtt_connected_actions(client);
@@ -767,8 +777,9 @@ void *aclk_main(void *ptr)
return NULL;
}
+ unsigned int proto_hdl_cnt;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- aclk_init_rx_msg_handlers();
+ proto_hdl_cnt = aclk_init_rx_msg_handlers();
#endif
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
@@ -808,6 +819,7 @@ void *aclk_main(void *ptr)
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
+ aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
netdata_thread_create(
stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
stats_thread);
@@ -843,6 +855,7 @@ void *aclk_main(void *ptr)
if (handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
+ last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
log_access("ACLK DISCONNECTED");
}
@@ -1092,6 +1105,7 @@ void aclk_send_node_instances()
uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
query->data.node_update.queryable = 1;
query->data.node_update.session_id = aclk_session_newarch;
+ freez(list->hostname);
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
list->live,
list->hops);
@@ -1121,53 +1135,257 @@ void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, con
aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status)) {
+ buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tBatch ID: %"PRIu64
+ "\n\t\tLast Acked Seq ID: %"PRIu64
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tLast Submitted Seq ID: %"PRIu64,
+ status.alert_updates,
+ status.alerts_batch_id,
+ status.last_acked_sequence_id,
+ status.pending_min_sequence_id,
+ status.pending_max_sequence_id,
+ status.last_submitted_sequence_id
+ );
+}
+
+static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
+ if (!stats) {
+ buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host");
+ return;
+ }
+ buffer_sprintf(wb,
+ "\n\t\tUpdates: %d"
+ "\n\t\tBatch ID: %"PRIu64
+ "\n\t\tMin Seq ID: %"PRIu64
+ "\n\t\tMax Seq ID: %"PRIu64
+ "\n\t\tPending Min Seq ID: %"PRIu64
+ "\n\t\tPending Max Seq ID: %"PRIu64
+ "\n\t\tSent Min Seq ID: %"PRIu64
+ "\n\t\tSent Max Seq ID: %"PRIu64
+ "\n\t\tAcked Min Seq ID: %"PRIu64
+ "\n\t\tAcked Max Seq ID: %"PRIu64,
+ stats->updates,
+ stats->batch_id,
+ stats->min_seqid,
+ stats->max_seqid,
+ stats->min_seqid_pend,
+ stats->max_seqid_pend,
+ stats->min_seqid_sent,
+ stats->max_seqid_sent,
+ stats->min_seqid_ack,
+ stats->max_seqid_ack
+ );
+ freez(stats);
+}
+#endif
+
char *ng_aclk_state(void)
{
BUFFER *wb = buffer_create(1024);
+ struct tm *tmptr, tmbuf;
char *ret;
buffer_strcat(wb,
"ACLK Available: Yes\n"
- "ACLK Implementation: Next Generation\n"
+ "ACLK Version: 2\n"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- "New Cloud Protocol Support: Yes\n"
+ "Protocols Supported: Legacy, Protobuf\n"
#else
- "New Cloud Protocol Support: No\n"
+ "Protocols Supported: Legacy\n"
#endif
- "Claimed: "
);
+ buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
buffer_strcat(wb, "No\n");
else {
- buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
freez(agent_id);
}
- buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy");
+ buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
+ if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
+ }
+ if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
+ }
+ if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
+ }
+ if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
+ }
+
+ if (aclk_connected) {
+ buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname);
+
+ buffer_strcat(wb, "\tClaimed ID: ");
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id)
+ buffer_strcat(wb, host->aclk_state.claimed_id);
+ else
+ buffer_strcat(wb, "null");
+ rrdhost_aclk_state_unlock(host);
+
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ buffer_strcat(wb, "\n\tNode ID: null\n");
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
+ }
+
+ buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
+
+ if (host != localhost)
+ buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
+
+ buffer_strcat(wb, "\n\tAlert Streaming Status:");
+ fill_alert_status_for_host(wb, host);
+
+ buffer_strcat(wb, "\n\tChart Streaming Status:");
+ fill_chart_status_for_host(wb, host);
+ }
+ rrd_unlock();
+#endif
+ }
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
return ret;
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct proto_alert_status status;
+ memset(&status, 0, sizeof(status));
+ if (get_proto_alert_status(host, &status))
+ return;
+
+ json_object *tmp = json_object_new_int(status.alert_updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(status.alerts_batch_id);
+ json_object_object_add(obj, "batch-id", tmp);
+
+ tmp = json_object_new_int(status.last_acked_sequence_id);
+ json_object_object_add(obj, "last-acked-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_min_sequence_id);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(status.pending_max_sequence_id);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(status.last_submitted_sequence_id);
+ json_object_object_add(obj, "last-submitted-seq-id", tmp);
+}
+
+static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
+{
+ struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
+ if (!stats)
+ return;
+
+ json_object *tmp = json_object_new_int(stats->updates);
+ json_object_object_add(obj, "updates", tmp);
+
+ tmp = json_object_new_int(stats->batch_id);
+ json_object_object_add(obj, "batch-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid);
+ json_object_object_add(obj, "min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid);
+ json_object_object_add(obj, "max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_pend);
+ json_object_object_add(obj, "pending-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_pend);
+ json_object_object_add(obj, "pending-max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_sent);
+ json_object_object_add(obj, "sent-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_sent);
+ json_object_object_add(obj, "sent-max-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->min_seqid_ack);
+ json_object_object_add(obj, "acked-min-seq-id", tmp);
+
+ tmp = json_object_new_int(stats->max_seqid_ack);
+ json_object_object_add(obj, "acked-max-seq-id", tmp);
+
+ freez(stats);
+}
+#endif
+
+static json_object *timestamp_to_json(const time_t *t)
+{
+ struct tm *tmptr, tmbuf;
+ if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
+ char timebuf[26];
+ strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
+ return json_object_new_string(timebuf);
+ }
+ return NULL;
+}
+
char *ng_aclk_state_json(void)
{
- json_object *tmp, *msg = json_object_new_object();
+ json_object *tmp, *grp, *msg = json_object_new_object();
tmp = json_object_new_boolean(1);
json_object_object_add(msg, "aclk-available", tmp);
- tmp = json_object_new_string("Next Generation");
- json_object_object_add(msg, "aclk-implementation", tmp);
+ tmp = json_object_new_int(2);
+ json_object_object_add(msg, "aclk-version", tmp);
+ grp = json_object_new_array();
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- tmp = json_object_new_boolean(1);
+ tmp = json_object_new_string("Legacy");
+ json_object_array_add(grp, tmp);
+ tmp = json_object_new_string("Protobuf");
+ json_object_array_add(grp, tmp);
#else
- tmp = json_object_new_boolean(0);
+ tmp = json_object_new_string("Legacy");
+ json_object_array_add(grp, tmp);
#endif
- json_object_object_add(msg, "new-cloud-protocol-supported", tmp);
+ json_object_object_add(msg, "protocols-supported", grp);
char *agent_id = is_agent_claimed();
tmp = json_object_new_boolean(agent_id != NULL);
@@ -1180,12 +1398,91 @@ char *ng_aclk_state_json(void)
tmp = NULL;
json_object_object_add(msg, "claimed-id", tmp);
+ char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
+ tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
+ json_object_object_add(msg, "cloud-url", tmp);
+
tmp = json_object_new_boolean(aclk_connected);
json_object_object_add(msg, "online", tmp);
- tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy");
+ tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
json_object_object_add(msg, "used-cloud-protocol", tmp);
+ tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
+ json_object_object_add(msg, "received-app-layer-msgs", tmp);
+
+ tmp = json_object_new_int(aclk_pubacks_per_conn);
+ json_object_object_add(msg, "received-mqtt-pubacks", tmp);
+
+ tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
+ json_object_object_add(msg, "reconnect-count", tmp);
+
+ json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
+ json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
+ json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
+ json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
+ tmp = NULL;
+ if (!aclk_connected && last_backoff_value)
+ tmp = json_object_new_double(last_backoff_value);
+ json_object_object_add(msg, "last-backoff-value", tmp);
+
+ tmp = json_object_new_boolean(aclk_disable_runtime);
+ json_object_object_add(msg, "banned-by-cloud", tmp);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ grp = json_object_new_array();
+
+ RRDHOST *host;
+ rrd_rdlock();
+ rrdhost_foreach_read(host) {
+ json_object *nodeinstance = json_object_new_object();
+
+ tmp = json_object_new_string(host->hostname);
+ json_object_object_add(nodeinstance, "hostname", tmp);
+
+ tmp = json_object_new_string(host->machine_guid);
+ json_object_object_add(nodeinstance, "mguid", tmp);
+
+ rrdhost_aclk_state_lock(host);
+ if (host->aclk_state.claimed_id) {
+ tmp = json_object_new_string(host->aclk_state.claimed_id);
+ json_object_object_add(nodeinstance, "claimed_id", tmp);
+ } else
+ json_object_object_add(nodeinstance, "claimed_id", NULL);
+ rrdhost_aclk_state_unlock(host);
+
+ if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
+ json_object_object_add(nodeinstance, "node-id", NULL);
+ } else {
+ char node_id[GUID_LEN + 1];
+ uuid_unparse_lower(*host->node_id, node_id);
+ tmp = json_object_new_string(node_id);
+ json_object_object_add(nodeinstance, "node-id", tmp);
+ }
+
+ tmp = json_object_new_int(host->system_info->hops);
+ json_object_object_add(nodeinstance, "streaming-hops", tmp);
+
+ tmp = json_object_new_string(host == localhost ? "self" : "child");
+ json_object_object_add(nodeinstance, "relationship", tmp);
+
+ tmp = json_object_new_boolean((host->receiver || host == localhost));
+ json_object_object_add(nodeinstance, "streaming-online", tmp);
+
+ tmp = json_object_new_object();
+ fill_alert_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "alert-sync-status", tmp);
+
+ tmp = json_object_new_object();
+ fill_chart_status_for_host_json(tmp, host);
+ json_object_object_add(nodeinstance, "chart-sync-status", tmp);
+
+ json_object_array_add(grp, nodeinstance);
+ }
+ rrd_unlock();
+ json_object_object_add(msg, "node-instances", grp);
+#endif
+
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index 172cf2982..766f78053 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -70,6 +70,11 @@ struct label *add_aclk_host_labels(struct label *label) {
label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "true", LABEL_SOURCE_AUTO);
+#else
+ label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "false", LABEL_SOURCE_AUTO);
+#endif
#endif
return label;
}
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 001c1ba02..ae5659310 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -2,7 +2,6 @@
#include "aclk_query.h"
#include "aclk_stats.h"
-#include "aclk_query_queue.h"
#include "aclk_tx_msgs.h"
#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
@@ -59,6 +58,13 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id)
{
int res;
uuid_t node_id_bin, host_id_bin;
+
+ rrd_rdlock();
+ RRDHOST *host = find_host_by_node_id((char *) node_id);
+ rrd_unlock();
+ if (host)
+ return host;
+
char host_id[UUID_STR_LEN];
if (uuid_parse(node_id, node_id_bin)) {
error("Couldn't parse UUID %s", node_id);
@@ -99,26 +105,34 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
+ buffer_strcat(log_buffer, query->data.http_api_v2.query);
+ size_t size = 0;
+ size_t sent = 0;
+ w->tv_in = query->created_tv;
+ now_realtime_timeval(&w->tv_ready);
+
if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
- error("URL requests node_id but there is not enough chars following");
+ error_report(CLOUD_EMSG_MALFORMED_NODE_ID);
retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0);
goto cleanup;
}
strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1);
query_host = node_id_2_rrdhost(nodeid);
if (!query_host) {
- error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid);
+ error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
goto cleanup;
}
}
- buffer_strcat(log_buffer, query->data.http_api_v2.query);
-
char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
@@ -136,11 +150,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// execute the query
- w->tv_in = query->created_tv;
- now_realtime_timeval(&w->tv_ready);
t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
- size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
- size_t sent = size;
+ size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
+ sent = size;
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
@@ -174,6 +186,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
else
error("Unknown error during zlib compression.");
retval = 1;
+ w->response.code = 500;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_ZLIB_ERROR, CLOUD_EMSG_ZLIB_ERROR, NULL, 0);
goto cleanup;
}
int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
@@ -214,8 +228,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
// send msg.
aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
- // log.
struct timeval tv;
+
+cleanup:
now_realtime_timeval(&tv);
log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
w->id
@@ -232,7 +247,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
, strip_control_characters((char *)buffer_tostring(log_buffer))
);
-cleanup:
#ifdef NETDATA_WITH_ZLIB
if(w->response.zinitialized)
deflateEnd(&w->response.zstream);
@@ -287,27 +301,37 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
#endif
aclk_query_handler aclk_query_handlers[] = {
- { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
- { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
- { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata },
- { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
- { .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
- { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
+ { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 },
+ { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query },
+ { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata },
+ { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata },
+ { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
+ { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- { .type = REGISTER_NODE, .name = "register node", .fnc = register_node },
- { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update },
- { .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg },
- { .type = CHART_CONFIG_UPDATED, .name = "chart config updated", .fnc = send_bin_msg },
- { .type = CHART_RESET, .name = "reset chart messages", .fnc = send_bin_msg },
- { .type = RETENTION_UPDATED, .name = "update retention info", .fnc = send_bin_msg },
- { .type = UPDATE_NODE_INFO, .name = "update node info", .fnc = send_bin_msg },
- { .type = ALARM_LOG_HEALTH, .name = "alarm log health", .fnc = send_bin_msg },
- { .type = ALARM_PROVIDE_CFG, .name = "provide alarm config", .fnc = send_bin_msg },
- { .type = ALARM_SNAPSHOT, .name = "alarm snapshot", .fnc = send_bin_msg },
+ { .type = REGISTER_NODE, .name = "register_node", .fnc = register_node },
+ { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = node_state_update },
+ { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
+ { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
+ { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
+ { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg },
+ { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg },
+ { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg },
+ { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg },
+ { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg },
#endif
{ .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
+const char *aclk_query_get_name(aclk_query_type_t qt)
+{
+ aclk_query_handler *ptr = aclk_query_handlers;
+ while (ptr->type != UNKNOWN) {
+ if (ptr->type == qt)
+ return ptr->name;
+ ptr++;
+ }
+ return "unknown";
+}
static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
@@ -315,13 +339,14 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
if (aclk_query_handlers[i].type == query->type) {
debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
aclk_query_handlers[i].fnc(query_thr, query);
- aclk_query_free(query);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[query_thr->idx]++;
+ aclk_metrics_per_sample.queries_per_type[query->type]++;
ACLK_STATS_UNLOCK;
}
+ aclk_query_free(query);
return;
}
}
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
index 43741fb32..f86754a2a 100644
--- a/aclk/aclk_query.h
+++ b/aclk/aclk_query.h
@@ -7,6 +7,8 @@
#include "mqtt_wss_client.h"
+#include "aclk_query_queue.h"
+
extern pthread_cond_t query_cond_wait;
extern pthread_mutex_t query_lock_wait;
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
@@ -29,4 +31,6 @@ struct aclk_query_threads {
void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
+const char *aclk_query_get_name(aclk_query_type_t qt);
+
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index fe7ee123c..74a899226 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -45,49 +45,14 @@ static inline int _aclk_queue_query(aclk_query_t query)
}
-// Gets a pointer to the metric associated with a particular query type.
-// NULL if the query type has no associated metric.
-static inline volatile uint32_t *aclk_stats_qmetric_for_qtype(aclk_query_type_t qtype) {
- switch (qtype) {
- case HTTP_API_V2:
- return &aclk_metrics_per_sample.query_type_http;
- case ALARM_STATE_UPDATE:
- return &aclk_metrics_per_sample.query_type_alarm_upd;
- case METADATA_INFO:
- return &aclk_metrics_per_sample.query_type_metadata_info;
- case METADATA_ALARMS:
- return &aclk_metrics_per_sample.query_type_metadata_alarms;
- case CHART_NEW:
- return &aclk_metrics_per_sample.query_type_chart_new;
- case CHART_DEL:
- return &aclk_metrics_per_sample.query_type_chart_del;
- case REGISTER_NODE:
- return &aclk_metrics_per_sample.query_type_register_node;
- case NODE_STATE_UPDATE:
- return &aclk_metrics_per_sample.query_type_node_upd;
- default:
- return NULL;
- }
-}
-
int aclk_queue_query(aclk_query_t query)
{
int ret = _aclk_queue_query(query);
if (!ret) {
- // local cache of query type before we wake up query thread, which may
- // free the query in a race.
- aclk_query_type_t qtype = query->type;
QUERY_THREAD_WAKEUP;
-
if (aclk_stats_enabled) {
- // get target query type metric before lock so we keep lock for
- // minimal time.
- volatile uint32_t *metric = aclk_stats_qmetric_for_qtype(qtype);
-
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_queued++;
- if (metric)
- *metric += 1;
ACLK_STATS_UNLOCK;
}
}
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index db6354433..88976f9eb 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -10,7 +10,7 @@
#include "aclk_util.h"
typedef enum {
- UNKNOWN,
+ UNKNOWN = 0,
METADATA_INFO,
METADATA_ALARMS,
HTTP_API_V2,
@@ -26,7 +26,8 @@ typedef enum {
UPDATE_NODE_INFO,
ALARM_LOG_HEALTH,
ALARM_PROVIDE_CFG,
- ALARM_SNAPSHOT
+ ALARM_SNAPSHOT,
+ ACLK_QUERY_TYPE_COUNT // always keep this as last
} aclk_query_type_t;
struct aclk_query_metadata {
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index ecb2b4179..1f2cb27ef 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -457,9 +457,15 @@ new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash)
return NULL;
}
-void aclk_init_rx_msg_handlers(void)
+const char *rx_handler_get_name(size_t i)
{
- for (int i = 0; rx_msgs[i].fnc; i++) {
+ return rx_msgs[i].name;
+}
+
+unsigned int aclk_init_rx_msg_handlers(void)
+{
+ int i;
+ for (i = 0; rx_msgs[i].fnc; i++) {
simple_hash_t hash = simple_hash(rx_msgs[i].name);
new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash);
if (unlikely(hdl)) {
@@ -469,6 +475,7 @@ void aclk_init_rx_msg_handlers(void)
}
rx_msgs[i].name_hash = hash;
}
+ return i;
}
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
@@ -489,6 +496,11 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
}
return;
}
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++;
+ ACLK_STATS_UNLOCK;
+ }
if (msg_descriptor->fnc(msg, msg_len)) {
error("Error processing message of type '%s'", message_type);
if (aclk_stats_enabled) {
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index 38243a4c9..00f88c6a8 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -11,7 +11,8 @@
int aclk_handle_cloud_cmd_message(char *payload);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-void aclk_init_rx_msg_handlers(void);
+const char *rx_handler_get_name(size_t i);
+unsigned int aclk_init_rx_msg_handlers(void);
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len);
#endif
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index a7d4a4709..a9f0a923c 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -2,9 +2,18 @@
#include "aclk_stats.h"
+#include "aclk_query.h"
+
netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
-int query_thread_count;
+struct {
+ int query_thread_count;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ unsigned int proto_hdl_cnt;
+ uint32_t *aclk_proto_rx_msgs_sample;
+ RRDDIM **rx_msg_dims;
+#endif
+} aclk_stats_cfg; // there is only 1 stats thread at a time
// data ACLK stats need per query thread
struct aclk_qt_data {
@@ -13,6 +22,7 @@ struct aclk_qt_data {
uint32_t *aclk_queries_per_thread = NULL;
uint32_t *aclk_queries_per_thread_sample = NULL;
+uint32_t *aclk_proto_rx_msgs_sample = NULL;
struct aclk_metrics aclk_metrics = {
.online = 0,
@@ -113,39 +123,21 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
- static RRDDIM *rd_type_http = NULL;
- static RRDDIM *rd_type_alarm_upd = NULL;
- static RRDDIM *rd_type_metadata_info = NULL;
- static RRDDIM *rd_type_metadata_alarms = NULL;
- static RRDDIM *rd_type_chart_new = NULL;
- static RRDDIM *rd_type_chart_del = NULL;
- static RRDDIM *rd_type_register_node = NULL;
- static RRDDIM *rd_type_node_upd = NULL;
+ static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT];
if (unlikely(!st)) {
st = rrdset_create_localhost(
"netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s",
"netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
- rd_type_http = rrddim_add(st, "http", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_type_alarm_upd = rrddim_add(st, "alarm update", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_type_metadata_info = rrddim_add(st, "info metadata", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_type_metadata_alarms = rrddim_add(st, "alarms metadata", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_type_chart_new = rrddim_add(st, "chart new", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_type_chart_del = rrddim_add(st, "chart delete", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_type_register_node = rrddim_add(st, "register node", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_type_node_upd = rrddim_add(st, "node update", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
+ dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+
} else
rrdset_next(st);
- rrddim_set_by_pointer(st, rd_type_http, per_sample->query_type_http);
- rrddim_set_by_pointer(st, rd_type_alarm_upd, per_sample->query_type_alarm_upd);
- rrddim_set_by_pointer(st, rd_type_metadata_info, per_sample->query_type_metadata_info);
- rrddim_set_by_pointer(st, rd_type_metadata_alarms, per_sample->query_type_metadata_alarms);
- rrddim_set_by_pointer(st, rd_type_chart_new, per_sample->query_type_chart_new);
- rrddim_set_by_pointer(st, rd_type_chart_del, per_sample->query_type_chart_del);
- rrddim_set_by_pointer(st, rd_type_register_node, per_sample->query_type_register_node);
- rrddim_set_by_pointer(st, rd_type_node_upd, per_sample->query_type_node_upd);
+ for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
+ rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]);
rrdset_done(st);
}
@@ -202,7 +194,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
"netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
"netdata", "stats", 200009, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
- for (int i = 0; i < query_thread_count; i++) {
+ for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) {
if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
error("snprintf encoding error");
aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
@@ -210,7 +202,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
} else
rrdset_next(st);
- for (int i = 0; i < query_thread_count; i++) {
+ for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) {
rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
}
@@ -245,8 +237,57 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+const char *rx_handler_get_name(size_t i);
+static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
+{
+ static RRDSET *st = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_protobuf_rx_types", NULL, "aclk", NULL, "Received new cloud architecture messages by their type.", "msg/s",
+ "netdata", "stats", 200010, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) {
+ aclk_stats_cfg.rx_msg_dims[i] = rrddim_add(st, rx_handler_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ }
+ } else
+ rrdset_next(st);
+
+ for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++)
+ rrddim_set_by_pointer(st, aclk_stats_cfg.rx_msg_dims[i], rx_msgs_sample[i]);
+
+ rrdset_done(st);
+}
+#endif
+
+void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
+{
+#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+ UNUSED(proto_hdl_cnt);
+#endif
+
+ aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
+ aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
+ aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+
+ memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt;
+ aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
+ aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
+ aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*));
+#endif
+}
+
void aclk_stats_thread_cleanup()
{
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ freez(aclk_stats_cfg.rx_msg_dims);
+ freez(aclk_proto_rx_msgs_sample);
+ freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
+#endif
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
@@ -256,17 +297,12 @@ void *aclk_stats_main_thread(void *ptr)
{
struct aclk_stats_thread *args = ptr;
- query_thread_count = args->query_thread_count;
- aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
- aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
- aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+ aclk_stats_cfg.query_thread_count = args->query_thread_count;
heartbeat_t hb;
heartbeat_init(&hb);
usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
- memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
-
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
@@ -282,11 +318,15 @@ void *aclk_stats_main_thread(void *ptr)
// to not hold lock longer than necessary, especially not to hold it
// during database rrd* operations
memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
+ memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
+#endif
memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
- memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
- memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
+ memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count);
+ memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count);
ACLK_STATS_UNLOCK;
aclk_stats_collect(&per_sample, &permanent);
@@ -302,6 +342,10 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_query_threads(aclk_queries_per_thread_sample);
aclk_stats_query_time(&per_sample);
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
+#endif
}
return 0;
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index 3cc6a0cb0..4f2894798 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -5,6 +5,7 @@
#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
+#include "aclk_query_queue.h"
#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
@@ -49,14 +50,7 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_req_err;
// query types.
- volatile uint32_t query_type_http;
- volatile uint32_t query_type_alarm_upd;
- volatile uint32_t query_type_metadata_info;
- volatile uint32_t query_type_metadata_alarms;
- volatile uint32_t query_type_chart_new;
- volatile uint32_t query_type_chart_del;
- volatile uint32_t query_type_register_node;
- volatile uint32_t query_type_node_upd;
+ volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT];
// HTTP-specific request types.
volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT];
@@ -66,9 +60,14 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_q_process_max;
} aclk_metrics_per_sample;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+extern uint32_t *aclk_proto_rx_msgs_sample;
+#endif
+
extern uint32_t *aclk_queries_per_thread;
void *aclk_stats_main_thread(void *ptr);
+void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt);
void aclk_stats_thread_cleanup();
void aclk_stats_upd_online(int online);
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 74fc19c72..185f5d796 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -116,28 +116,30 @@ static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, co
#define TOPIC_MAX_LEN 512
#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
-static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
+static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
{
uint16_t packet_id;
const char *str;
- char *full_msg;
- int len;
+ char *full_msg = NULL;
+ int len, rc;
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
- return;
+ return 500;
}
str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
len = strlen(str);
- full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
+ if (payload_len) {
+ full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
- memcpy(full_msg, str, len);
- memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
- len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
- memcpy(&full_msg[len], payload, payload_len);
- len += payload_len;
+ memcpy(full_msg, str, len);
+ memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
+ len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
+ memcpy(&full_msg[len], payload, payload_len);
+ len += payload_len;
+ }
/* TODO
#ifdef ACLK_LOG_CONVERSATION_DIR
@@ -147,15 +149,22 @@ static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obje
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
- int rc = mqtt_wss_publish_pid_block(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT)
+ rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
+ if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
error("Timeout sending binpacked message");
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL)
+ freez(full_msg);
+ return 503;
+ }
+ if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
error("Message is bigger than allowed maximum");
+ freez(full_msg);
+ return 403;
+ }
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
freez(full_msg);
+ return 0;
}
/*
@@ -316,6 +325,25 @@ void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
buffer_free(local_buffer);
}
+void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
+{
+ json_object *tmp, *msg;
+ msg = create_hdr("http", msg_id, 0, 0, 2);
+ tmp = json_object_new_int(http_code);
+ json_object_object_add(msg, "http-code", tmp);
+
+ tmp = json_object_new_int(ec);
+ json_object_object_add(msg, "error-code", tmp);
+
+ tmp = json_object_new_string(emsg);
+ json_object_object_add(msg, "error-description", tmp);
+
+ if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
+ error("Failed to send cancelation message for http reply");
+ }
+ json_object_put(msg);
+}
+
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -331,8 +359,20 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
tmp = json_object_new_int(http_code);
json_object_object_add(msg, "http-code", tmp);
- aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
+ int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
json_object_put(msg);
+
+ switch (rc) {
+ case 403:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
+ break;
+ case 500:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
+ break;
+ case 503:
+ aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
+ break;
+ }
}
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index da29a4a32..402f13fb6 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -14,6 +14,7 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
+void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len);
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart);
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index ee8fcaf94..5576a865a 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -41,6 +41,7 @@ void aclk_env_t_destroy(aclk_env_t *env) {
for (size_t i = 0; i < env->transport_count; i++) {
if(env->transports[i]) {
aclk_transport_desc_t_destroy(env->transports[i]);
+ freez(env->transports[i]);
env->transports[i] = NULL;
}
}
@@ -64,7 +65,7 @@ int aclk_env_has_capa(const char *capa)
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
-#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
+#if !defined(HAVE_C___ATOMIC)
netdata_mutex_t aclk_conversation_log_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_get_conv_log_next()
{
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 4d8744e7f..7a7202076 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -5,6 +5,19 @@
#include "libnetdata/libnetdata.h"
#include "mqtt_wss_client.h"
+#define CLOUD_EC_MALFORMED_NODE_ID 1
+#define CLOUD_EMSG_MALFORMED_NODE_ID "URL requests node_id but there is not enough chars following (for it to be valid uuid)."
+#define CLOUD_EC_NODE_NOT_FOUND 2
+#define CLOUD_EMSG_NODE_NOT_FOUND "Node with requested node_id not found"
+#define CLOUD_EC_ZLIB_ERROR 3
+#define CLOUD_EMSG_ZLIB_ERROR "Error during zlib compression"
+#define CLOUD_EC_REQ_REPLY_TOO_BIG 4
+#define CLOUD_EMSG_REQ_REPLY_TOO_BIG "Request reply produces message bigger than allowed maximum"
+#define CLOUD_EC_FAIL_TOPIC 5
+#define CLOUD_EMSG_FAIL_TOPIC "Internal Topic Error"
+#define CLOUD_EC_SND_TIMEOUT 6
+#define CLOUD_EMSG_SND_TIMEOUT "Timeout sending binpacked message"
+
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK
extern int aclk_use_new_cloud_arch;
@@ -86,7 +99,7 @@ void free_topic_cache(void);
#ifdef ACLK_LOG_CONVERSATION_DIR
extern volatile int aclk_conversation_log_counter;
-#if defined(HAVE_C___ATOMIC) && !defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
+#if defined(HAVE_C___ATOMIC)
#define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST)
#else
extern netdata_mutex_t aclk_conversation_log_mutex;
diff --git a/aclk/https_client.c b/aclk/https_client.c
index 470c3fdf3..1a32f833f 100644
--- a/aclk/https_client.c
+++ b/aclk/https_client.c
@@ -587,7 +587,7 @@ void https_req_response_init(https_req_response_t *res) {
res->payload_size = 0;
}
-static inline char *min_non_null(char *a, char *b) {
+static inline char *UNUSED_FUNCTION(min_non_null)(char *a, char *b) {
if (!a)
return b;
if (!b)