diff options
Diffstat (limited to 'aclk/aclk_stats.c')
-rw-r--r-- | aclk/aclk_stats.c | 78 |
1 files changed, 75 insertions, 3 deletions
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 215313ff9..2b4d5e48a 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -1,5 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later +#define MQTT_WSS_CPUSTATS + #include "aclk_stats.h" #include "aclk_query.h" @@ -143,7 +145,9 @@ static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = { "alarms", "alarm_log", "chart", - "charts" + "charts", + "function", + "functions" // if you change then update `ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT`. }; @@ -257,6 +261,23 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) static uint64_t sent = 0; static uint64_t recvd = 0; + static RRDSET *st_txbuf_perc = NULL; + static RRDDIM *rd_txbuf_perc = NULL; + + static RRDSET *st_txbuf = NULL; + static RRDDIM *rd_tx_buffer_usable = NULL; + static RRDDIM *rd_tx_buffer_reclaimable = NULL; + static RRDDIM *rd_tx_buffer_used = NULL; + static RRDDIM *rd_tx_buffer_free = NULL; + static RRDDIM *rd_tx_buffer_size = NULL; + + static RRDSET *st_timing = NULL; + static RRDDIM *rd_keepalive = NULL; + static RRDDIM *rd_read_socket = NULL; + static RRDDIM *rd_write_socket = NULL; + static RRDDIM *rd_process_websocket = NULL; + static RRDDIM *rd_process_mqtt = NULL; + sent += stats->bytes_tx; recvd += stats->bytes_rx; @@ -269,10 +290,61 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); } + if (unlikely(!st_txbuf_perc)) { + st_txbuf_perc = rrdset_create_localhost( + "netdata", "aclk_mqtt_tx_perc", NULL, "aclk", NULL, "Actively used percentage of MQTT Tx Buffer,", "%", + "netdata", "stats", 200012, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_txbuf_perc = rrddim_add(st_txbuf_perc, "used", NULL, 1, 100, RRD_ALGORITHM_ABSOLUTE); + } + + if (unlikely(!st_txbuf)) { + st_txbuf = rrdset_create_localhost( + "netdata", "aclk_mqtt_tx_queue", NULL, "aclk", NULL, "State of transmit MQTT queue.", "B", + "netdata", "stats", 200013, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_tx_buffer_usable = rrddim_add(st_txbuf, "usable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_tx_buffer_reclaimable = rrddim_add(st_txbuf, "reclaimable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_tx_buffer_used = rrddim_add(st_txbuf, "used", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_tx_buffer_free = rrddim_add(st_txbuf, "free", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_tx_buffer_size = rrddim_add(st_txbuf, "size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + if (unlikely(!st_timing)) { + st_timing = rrdset_create_localhost( + "netdata", "aclk_mqtt_wss_time", NULL, "aclk", NULL, "Time spent handling MQTT, WSS, SSL and network communication.", "us", + "netdata", "stats", 200014, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_keepalive = rrddim_add(st_timing, "keep-alive", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_read_socket = rrddim_add(st_timing, "socket_read_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_write_socket = rrddim_add(st_timing, "socket_write_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_process_websocket = rrddim_add(st_timing, "process_websocket", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_process_mqtt = rrddim_add(st_timing, "process_mqtt", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + rrddim_set_by_pointer(st, rd_sent, sent); rrddim_set_by_pointer(st, rd_recvd, recvd); + float usage = ((float)stats->mqtt.tx_buffer_free + stats->mqtt.tx_buffer_reclaimable) / stats->mqtt.tx_buffer_size; + usage = (1 - usage) * 10000; + rrddim_set_by_pointer(st_txbuf_perc, rd_txbuf_perc, usage); + + rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_usable, stats->mqtt.tx_buffer_reclaimable + stats->mqtt.tx_buffer_free); + rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_reclaimable, stats->mqtt.tx_buffer_reclaimable); + rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_used, stats->mqtt.tx_buffer_used); + rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_free, stats->mqtt.tx_buffer_free); + rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_size, stats->mqtt.tx_buffer_size); + + rrddim_set_by_pointer(st_timing, rd_keepalive, stats->time_keepalive); + rrddim_set_by_pointer(st_timing, rd_read_socket, stats->time_read_socket); + rrddim_set_by_pointer(st_timing, rd_write_socket, stats->time_write_socket); + rrddim_set_by_pointer(st_timing, rd_process_websocket, stats->time_process_websocket); + rrddim_set_by_pointer(st_timing, rd_process_mqtt, stats->time_process_mqtt); + rrdset_done(st); + rrdset_done(st_txbuf_perc); + rrdset_done(st_txbuf); + rrdset_done(st_timing); } void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) @@ -312,13 +384,13 @@ void *aclk_stats_main_thread(void *ptr) struct aclk_metrics_per_sample per_sample; struct aclk_metrics permanent; - while (!netdata_exit) { + while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) { netdata_thread_testcancel(); // ------------------------------------------------------------------------ // Wait for the next iteration point. heartbeat_next(&hb, step_ut); - if (netdata_exit) break; + if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break; ACLK_STATS_LOCK; // to not hold lock longer than necessary, especially not to hold it |