summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_stats.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_stats.c')
-rw-r--r--aclk/aclk_stats.c48
1 files changed, 32 insertions, 16 deletions
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index ca0532638..241e9b724 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -8,11 +8,9 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
struct {
int query_thread_count;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
unsigned int proto_hdl_cnt;
uint32_t *aclk_proto_rx_msgs_sample;
RRDDIM **rx_msg_dims;
-#endif
} aclk_stats_cfg; // there is only 1 stats thread at a time
// data ACLK stats need per query thread
@@ -237,7 +235,6 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
const char *rx_handler_get_name(size_t i);
static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
{
@@ -259,35 +256,53 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
rrdset_done(st);
}
-#endif
-void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
+static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
{
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
- UNUSED(proto_hdl_cnt);
-#endif
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_sent = NULL;
+ static RRDDIM *rd_recvd = NULL;
+ static uint64_t sent = 0;
+ static uint64_t recvd = 0;
+
+ sent += stats->bytes_tx;
+ recvd += stats->bytes_rx;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_openssl_bytes", NULL, "aclk", NULL, "Received and Sent bytes.", "B/s",
+ "netdata", "stats", 200011, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_sent, sent);
+ rrddim_set_by_pointer(st, rd_recvd, recvd);
+ rrdset_done(st);
+}
+
+void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
+{
aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt;
aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*));
-#endif
}
void aclk_stats_thread_cleanup()
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
freez(aclk_stats_cfg.rx_msg_dims);
freez(aclk_proto_rx_msgs_sample);
freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
-#endif
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
@@ -318,10 +333,10 @@ void *aclk_stats_main_thread(void *ptr)
// to not hold lock longer than necessary, especially not to hold it
// during database rrd* operations
memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+
memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
-#endif
+
memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
@@ -343,9 +358,10 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_query_time(&per_sample);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client);
+ aclk_stats_mqtt_wss(&mqtt_wss_stats);
+
aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
-#endif
}
return 0;