summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_stats.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_stats.c')
-rw-r--r--aclk/aclk_stats.c78
1 files changed, 75 insertions, 3 deletions
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 215313ff..2b4d5e48 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -1,5 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
+#define MQTT_WSS_CPUSTATS
+
#include "aclk_stats.h"
#include "aclk_query.h"
@@ -143,7 +145,9 @@ static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = {
"alarms",
"alarm_log",
"chart",
- "charts"
+ "charts",
+ "function",
+ "functions"
// if you change then update `ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT`.
};
@@ -257,6 +261,23 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
static uint64_t sent = 0;
static uint64_t recvd = 0;
+ static RRDSET *st_txbuf_perc = NULL;
+ static RRDDIM *rd_txbuf_perc = NULL;
+
+ static RRDSET *st_txbuf = NULL;
+ static RRDDIM *rd_tx_buffer_usable = NULL;
+ static RRDDIM *rd_tx_buffer_reclaimable = NULL;
+ static RRDDIM *rd_tx_buffer_used = NULL;
+ static RRDDIM *rd_tx_buffer_free = NULL;
+ static RRDDIM *rd_tx_buffer_size = NULL;
+
+ static RRDSET *st_timing = NULL;
+ static RRDDIM *rd_keepalive = NULL;
+ static RRDDIM *rd_read_socket = NULL;
+ static RRDDIM *rd_write_socket = NULL;
+ static RRDDIM *rd_process_websocket = NULL;
+ static RRDDIM *rd_process_mqtt = NULL;
+
sent += stats->bytes_tx;
recvd += stats->bytes_rx;
@@ -269,10 +290,61 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
+ if (unlikely(!st_txbuf_perc)) {
+ st_txbuf_perc = rrdset_create_localhost(
+ "netdata", "aclk_mqtt_tx_perc", NULL, "aclk", NULL, "Actively used percentage of MQTT Tx Buffer,", "%",
+ "netdata", "stats", 200012, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_txbuf_perc = rrddim_add(st_txbuf_perc, "used", NULL, 1, 100, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ if (unlikely(!st_txbuf)) {
+ st_txbuf = rrdset_create_localhost(
+ "netdata", "aclk_mqtt_tx_queue", NULL, "aclk", NULL, "State of transmit MQTT queue.", "B",
+ "netdata", "stats", 200013, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_tx_buffer_usable = rrddim_add(st_txbuf, "usable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_reclaimable = rrddim_add(st_txbuf, "reclaimable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_used = rrddim_add(st_txbuf, "used", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_free = rrddim_add(st_txbuf, "free", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_size = rrddim_add(st_txbuf, "size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ if (unlikely(!st_timing)) {
+ st_timing = rrdset_create_localhost(
+ "netdata", "aclk_mqtt_wss_time", NULL, "aclk", NULL, "Time spent handling MQTT, WSS, SSL and network communication.", "us",
+ "netdata", "stats", 200014, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_keepalive = rrddim_add(st_timing, "keep-alive", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_read_socket = rrddim_add(st_timing, "socket_read_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_write_socket = rrddim_add(st_timing, "socket_write_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_process_websocket = rrddim_add(st_timing, "process_websocket", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_process_mqtt = rrddim_add(st_timing, "process_mqtt", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
rrddim_set_by_pointer(st, rd_sent, sent);
rrddim_set_by_pointer(st, rd_recvd, recvd);
+ float usage = ((float)stats->mqtt.tx_buffer_free + stats->mqtt.tx_buffer_reclaimable) / stats->mqtt.tx_buffer_size;
+ usage = (1 - usage) * 10000;
+ rrddim_set_by_pointer(st_txbuf_perc, rd_txbuf_perc, usage);
+
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_usable, stats->mqtt.tx_buffer_reclaimable + stats->mqtt.tx_buffer_free);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_reclaimable, stats->mqtt.tx_buffer_reclaimable);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_used, stats->mqtt.tx_buffer_used);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_free, stats->mqtt.tx_buffer_free);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_size, stats->mqtt.tx_buffer_size);
+
+ rrddim_set_by_pointer(st_timing, rd_keepalive, stats->time_keepalive);
+ rrddim_set_by_pointer(st_timing, rd_read_socket, stats->time_read_socket);
+ rrddim_set_by_pointer(st_timing, rd_write_socket, stats->time_write_socket);
+ rrddim_set_by_pointer(st_timing, rd_process_websocket, stats->time_process_websocket);
+ rrddim_set_by_pointer(st_timing, rd_process_mqtt, stats->time_process_mqtt);
+
rrdset_done(st);
+ rrdset_done(st_txbuf_perc);
+ rrdset_done(st_txbuf);
+ rrdset_done(st_timing);
}
void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
@@ -312,13 +384,13 @@ void *aclk_stats_main_thread(void *ptr)
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) {
netdata_thread_testcancel();
// ------------------------------------------------------------------------
// Wait for the next iteration point.
heartbeat_next(&hb, step_ut);
- if (netdata_exit) break;
+ if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break;
ACLK_STATS_LOCK;
// to not hold lock longer than necessary, especially not to hold it