diff options
Diffstat (limited to 'aclk/aclk_stats.c')
-rw-r--r-- | aclk/aclk_stats.c | 48 |
1 files changed, 32 insertions, 16 deletions
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index ca053263..241e9b72 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -8,11 +8,9 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; struct { int query_thread_count; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL unsigned int proto_hdl_cnt; uint32_t *aclk_proto_rx_msgs_sample; RRDDIM **rx_msg_dims; -#endif } aclk_stats_cfg; // there is only 1 stats thread at a time // data ACLK stats need per query thread @@ -237,7 +235,6 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL const char *rx_handler_get_name(size_t i); static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) { @@ -259,35 +256,53 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) rrdset_done(st); } -#endif -void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) +static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - UNUSED(proto_hdl_cnt); -#endif + static RRDSET *st = NULL; + static RRDDIM *rd_sent = NULL; + static RRDDIM *rd_recvd = NULL; + static uint64_t sent = 0; + static uint64_t recvd = 0; + + sent += stats->bytes_tx; + recvd += stats->bytes_rx; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_openssl_bytes", NULL, "aclk", NULL, "Received and Sent bytes.", "B/s", + "netdata", "stats", 200011, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_sent, sent); + rrddim_set_by_pointer(st, rd_recvd, recvd); + rrdset_done(st); +} + +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) +{ aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt; aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*)); -#endif } void aclk_stats_thread_cleanup() { -#ifdef ENABLE_NEW_CLOUD_PROTOCOL freez(aclk_stats_cfg.rx_msg_dims); freez(aclk_proto_rx_msgs_sample); freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample); -#endif freez(aclk_qt_data); freez(aclk_queries_per_thread); freez(aclk_queries_per_thread_sample); @@ -318,10 +333,10 @@ void *aclk_stats_main_thread(void *ptr) // to not hold lock longer than necessary, especially not to hold it // during database rrd* operations memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL + memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); -#endif + memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); @@ -343,9 +358,10 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_query_time(&per_sample); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL + struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client); + aclk_stats_mqtt_wss(&mqtt_wss_stats); + aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample); -#endif } return 0; |