summaryrefslogtreecommitdiffstats
path: root/aclk/legacy
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-03-31 12:58:11 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-03-31 12:58:11 +0000
commitf99c4526d94d3e04124c5c48ab4a3da6ca53a458 (patch)
treea2ed8860030cc49f492b09b3222d593c65619800 /aclk/legacy
parentAdding upstream version 1.29.3. (diff)
downloadnetdata-f99c4526d94d3e04124c5c48ab4a3da6ca53a458.tar.xz
netdata-f99c4526d94d3e04124c5c48ab4a3da6ca53a458.zip
Adding upstream version 1.30.0.upstream/1.30.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--aclk/legacy/aclk_common.c1
-rw-r--r--aclk/legacy/aclk_lws_https_client.c4
-rw-r--r--aclk/legacy/aclk_lws_wss_client.c4
-rw-r--r--aclk/legacy/aclk_query.c60
-rw-r--r--aclk/legacy/aclk_query.h4
-rw-r--r--aclk/legacy/aclk_rx_msgs.c36
-rw-r--r--aclk/legacy/aclk_stats.c125
-rw-r--r--aclk/legacy/aclk_stats.h13
-rw-r--r--aclk/legacy/agent_cloud_link.c3
9 files changed, 231 insertions, 19 deletions
diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c
index d7188b1f0..43455393a 100644
--- a/aclk/legacy/aclk_common.c
+++ b/aclk/legacy/aclk_common.c
@@ -252,6 +252,7 @@ struct label *add_aclk_host_labels(struct label *label) {
proxy_str = "none";
break;
}
+ label = add_label_to_list(label, "_aclk_impl", "Legacy", LABEL_SOURCE_AUTO);
return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#else
return label;
diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c
index c1856ed2c..f41a230db 100644
--- a/aclk/legacy/aclk_lws_https_client.c
+++ b/aclk/legacy/aclk_lws_https_client.c
@@ -3,7 +3,11 @@
#define ACLK_LWS_HTTPS_CLIENT_INTERNAL
#include "aclk_lws_https_client.h"
+#ifndef ACLK_NG
#include "aclk_common.h"
+#else
+#include "../aclk.h"
+#endif
#include "aclk_lws_wss_client.h"
diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c
index f06df3f42..df221dd60 100644
--- a/aclk/legacy/aclk_lws_wss_client.c
+++ b/aclk/legacy/aclk_lws_wss_client.c
@@ -348,6 +348,7 @@ static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data,
return 1;
}
+#ifdef ACLK_TRP_DEBUG_VERBOSE
static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
{
switch (reason) {
@@ -377,12 +378,11 @@ static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
return "LWS_CALLBACK_EVENT_WAIT_CANCELLED";
default:
// Not using an internal buffer here for thread-safety with unknown calling context.
-#ifdef ACLK_TRP_DEBUG_VERBOSE
error("Unknown LWS callback %u", reason);
-#endif
return "unknown";
}
}
+#endif
void aclk_lws_wss_fail_report()
{
diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c
index 7ab534f16..27ad9ac16 100644
--- a/aclk/legacy/aclk_query.c
+++ b/aclk/legacy/aclk_query.c
@@ -22,6 +22,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
struct aclk_query {
usec_t created;
+ struct timeval tv_in;
usec_t created_boot_time;
time_t run_after; // Delay run until after this time
ACLK_CMD cmd; // What command is this
@@ -30,6 +31,7 @@ struct aclk_query {
char *msg_id; // msg_id generated by the cloud (NULL if internal)
char *query; // The actual query
u_char deleted; // Mark deleted for garbage collect
+ int idx; // index of query thread
struct aclk_query *next;
};
@@ -62,6 +64,7 @@ static void aclk_query_free(struct aclk_query *this_query)
freez(this_query->query);
if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) {
struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data;
+ freez(del->query_endpoint);
freez(del->data);
freez(del);
}
@@ -236,7 +239,8 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
new_query->data = data;
new_query->next = NULL;
- new_query->created = now_realtime_usec();
+ now_realtime_timeval(&new_query->tv_in);
+ new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec;
new_query->created_boot_time = now_boottime_usec();
new_query->run_after = run_after;
@@ -324,6 +328,7 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
#pragma region ACLK_QUERY
#endif
+
static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
{
usec_t t = now_boottime_usec();
@@ -359,8 +364,11 @@ static int aclk_execute_query(struct aclk_query *this_query)
mysep = strrchr(this_query->query, '/');
// TODO: handle bad response perhaps in a different way. For now it does to the payload
- aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+ w->tv_in = this_query->tv_in;
now_realtime_timeval(&w->tv_ready);
+ aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+ size_t size = w->response.data->len;
+ size_t sent = size;
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -382,6 +390,24 @@ static int aclk_execute_query(struct aclk_query *this_query)
aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
+ struct timeval tv;
+ now_realtime_timeval(&tv);
+
+ log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
+ w->id
+ , gettid()
+ , this_query->idx
+ , "DATA"
+ , sent
+ , size
+ , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
+ , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
+ , dt_usec(&tv, &w->tv_ready) / 1000.0
+ , dt_usec(&tv, &w->tv_in) / 1000.0
+ , w->response.code
+ , strip_control_characters(this_query->query)
+ );
+
buffer_free(w->response.data);
buffer_free(w->response.header);
buffer_free(w->response.header_output);
@@ -426,7 +452,11 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
mysep = strrchr(this_query->query, '/');
// execute the query
+ w->tv_in = this_query->tv_in;
+ now_realtime_timeval(&w->tv_ready);
t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
+ size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
+ size_t sent = size;
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
@@ -475,7 +505,6 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
}
#endif
- now_realtime_timeval(&w->tv_ready);
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w);
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -492,6 +521,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
buffer_need_bytes(local_buffer, w->response.data->len);
memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
local_buffer->len += w->response.data->len;
+ sent = sent - size + w->response.data->len;
} else {
#endif
buffer_strcat(local_buffer, w->response.data->buffer);
@@ -502,6 +532,23 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
+ struct timeval tv;
+ now_realtime_timeval(&tv);
+
+ log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
+ w->id
+ , gettid()
+ , this_query->idx
+ , "DATA"
+ , sent
+ , size
+ , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
+ , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
+ , dt_usec(&tv, &w->tv_ready) / 1000.0
+ , dt_usec(&tv, &w->tv_in) / 1000.0
+ , w->response.code
+ , strip_control_characters(this_query->query)
+ );
cleanup:
#ifdef NETDATA_WITH_ZLIB
if(w->response.zinitialized)
@@ -550,6 +597,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
query_count++;
host = (RRDHOST*)this_query->data;
+ this_query->idx = t_info->idx;
debug(
D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
@@ -629,6 +677,12 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[t_info->idx]++;
ACLK_STATS_UNLOCK;
+
+ if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) {
+ getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]);
+ getrusage_called_this_tick[t_info->idx]++;
+ }
+
}
aclk_query_free(this_query);
diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h
index 53eef1392..026985c8d 100644
--- a/aclk/legacy/aclk_query.h
+++ b/aclk/legacy/aclk_query.h
@@ -8,8 +8,11 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+#define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread.
+
extern pthread_cond_t query_cond_wait;
extern pthread_mutex_t query_lock_wait;
+extern uint8_t *getrusage_called_this_tick;
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
@@ -28,6 +31,7 @@ struct aclk_query_threads {
struct aclk_cloud_req_v2 {
char *data;
RRDHOST *host;
+ char *query_endpoint;
};
void *aclk_query_main_thread(void *ptr);
diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c
index 99fa9d987..2681445b4 100644
--- a/aclk/legacy/aclk_rx_msgs.c
+++ b/aclk/legacy/aclk_rx_msgs.c
@@ -25,7 +25,7 @@ static inline int aclk_extract_v2_data(char *payload, char **data)
#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref))
static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req)
{
- const char *start, *end, *ptr;
+ const char *start, *end, *ptr, *query_type;
char uuid_str[UUID_STR_LEN];
uuid_t uuid;
@@ -66,6 +66,8 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req,
error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
return 1;
}
+ ptr += strlen(ACLK_CLOUD_REQ_V2_PREFIX);
+ query_type = ptr;
if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) {
errno = 0;
@@ -73,6 +75,11 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req,
return 1;
}
+ if(!(ptr = strchr(ptr, '?')) || ptr > end)
+ ptr = end;
+ cloud_req->query_endpoint = mallocz((ptr - query_type) + 1);
+ strncpyz(cloud_req->query_endpoint, query_type, ptr - query_type);
+
req->payload = mallocz((end - start) + 1);
strncpyz(req->payload, start, end - start);
@@ -122,6 +129,13 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_v1++;
+ aclk_metrics_per_sample.cloud_req_ok++;
+ ACLK_STATS_UNLOCK;
+ }
+
return 0;
}
@@ -131,6 +145,7 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
struct aclk_cloud_req_v2 *cloud_req;
char *data;
+ int stat_idx;
errno = 0;
if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
@@ -165,6 +180,10 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
goto cleanup;
}
+ // we do this here due to cloud_req being taken over by query thread
+ // which if crazy quick can free it after aclk_queue_query
+ stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint);
+
// aclk_queue_query takes ownership of data pointer
if (unlikely(aclk_queue_query(
cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
@@ -173,8 +192,17 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
goto cleanup;
}
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.cloud_req_v2++;
+ aclk_metrics_per_sample.cloud_req_ok++;
+ aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
+ ACLK_STATS_UNLOCK;
+ }
+
return 0;
cleanup:
+ freez(cloud_req->query_endpoint);
freez(cloud_req->data);
freez(cloud_req);
return 1;
@@ -289,12 +317,6 @@ int aclk_handle_cloud_message(char *payload)
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_req_recvd++;
- ACLK_STATS_UNLOCK;
- }
-
if (unlikely(!payload)) {
errno = 0;
error("ACLK incoming message is empty");
diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c
index 2a57cd6f0..7124380a2 100644
--- a/aclk/legacy/aclk_stats.c
+++ b/aclk/legacy/aclk_stats.c
@@ -11,8 +11,17 @@ struct aclk_qt_data {
RRDDIM *dim;
} *aclk_qt_data = NULL;
+// ACLK per query thread cpu stats
+struct aclk_cpu_data {
+ RRDDIM *user;
+ RRDDIM *system;
+ RRDSET *st;
+} *aclk_cpu_data = NULL;
+
uint32_t *aclk_queries_per_thread = NULL;
uint32_t *aclk_queries_per_thread_sample = NULL;
+struct rusage *rusage_per_thread;
+uint8_t *getrusage_called_this_tick = NULL;
struct aclk_metrics aclk_metrics = {
.online = 0,
@@ -153,7 +162,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
- static RRDDIM *rd_rq_rcvd = NULL;
+ static RRDDIM *rd_rq_ok = NULL;
static RRDDIM *rd_rq_err = NULL;
if (unlikely(!st)) {
@@ -161,17 +170,82 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
"netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
"netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
- rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_ok = rrddim_add(st, "accepted", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_err = rrddim_add(st, "rejected", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st);
- rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err);
+ rrddim_set_by_pointer(st, rd_rq_ok, per_sample->cloud_req_ok);
rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
rrdset_done(st);
}
+static void aclk_stats_cloud_req_version(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_rq_v1 = NULL;
+ static RRDDIM *rd_rq_v2 = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req_version", NULL, "aclk", NULL, "Requests received from cloud by their version", "req/s",
+ "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_rq_v1 = rrddim_add(st, "v1", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ rd_rq_v2 = rrddim_add(st, "v2+", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_rq_v1, per_sample->cloud_req_v1);
+ rrddim_set_by_pointer(st, rd_rq_v2, per_sample->cloud_req_v2);
+
+ rrdset_done(st);
+}
+
+static char *cloud_req_type_names[ACLK_STATS_CLOUD_REQ_TYPE_CNT] = {
+ "other",
+ "info",
+ "data",
+ "alarms",
+ "alarm_log",
+ "chart",
+ "charts"
+ // if you change update:
+ // #define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
+};
+
+int aclk_cloud_req_type_to_idx(const char *name)
+{
+ for (int i = 1; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
+ if (!strcmp(cloud_req_type_names[i], name))
+ return i;
+ return 0;
+}
+
+static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample)
+{
+ static RRDSET *st;
+ static int initialized = 0;
+ static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_REQ_TYPE_CNT];
+
+ if (unlikely(!initialized)) {
+ initialized = 1;
+ st = rrdset_create_localhost(
+ "netdata", "aclk_cloud_req_cmd", NULL, "aclk", NULL, "Requests received from cloud by their type (api endpoint queried)", "req/s",
+ "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
+ rd_rq_types[i] = rrddim_add(st, cloud_req_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(st);
+
+ for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++)
+ rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_by_type[i]);
+
+ rrdset_done(st);
+}
+
#define MAX_DIM_NAME 16
static void aclk_stats_query_threads(uint32_t *queries_per_thread)
{
@@ -182,7 +256,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
if (unlikely(!st)) {
st = rrdset_create_localhost(
"netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
- "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+ "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
for (int i = 0; i < query_thread_count; i++) {
if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
@@ -222,11 +296,42 @@ static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct
rrdset_done(metric->st);
}
+static void aclk_stats_cpu_threads(void)
+{
+ char id[100 + 1];
+ char title[100 + 1];
+
+ for (int i = 0; i < query_thread_count; i++) {
+ if (unlikely(!aclk_cpu_data[i].st)) {
+
+ snprintfz(id, 100, "aclk_thread%d_cpu", i);
+ snprintfz(title, 100, "Cpu Usage For Thread No %d", i);
+
+ aclk_cpu_data[i].st = rrdset_create_localhost(
+ "netdata", id, NULL, "aclk", NULL, title, "milliseconds/s",
+ "netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ aclk_cpu_data[i].user = rrddim_add(aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ aclk_cpu_data[i].system = rrddim_add(aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+
+ } else
+ rrdset_next(aclk_cpu_data[i].st);
+ }
+
+ for (int i = 0; i < query_thread_count; i++) {
+ rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec);
+ rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec);
+ rrdset_done(aclk_cpu_data[i].st);
+ }
+}
+
void aclk_stats_thread_cleanup()
{
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
+ freez(aclk_cpu_data);
+ freez(rusage_per_thread);
}
void *aclk_stats_main_thread(void *ptr)
@@ -235,8 +340,11 @@ void *aclk_stats_main_thread(void *ptr)
query_thread_count = args->query_thread_count;
aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
+ aclk_cpu_data = callocz(query_thread_count, sizeof(struct aclk_cpu_data));
aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
+ rusage_per_thread = callocz(query_thread_count, sizeof(struct rusage));
+ getrusage_called_this_tick = callocz(query_thread_count, sizeof(uint8_t));
heartbeat_t hb;
heartbeat_init(&hb);
@@ -264,6 +372,7 @@ void *aclk_stats_main_thread(void *ptr)
memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
+ memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * query_thread_count);
ACLK_STATS_UNLOCK;
aclk_stats_collect(&per_sample, &permanent);
@@ -273,8 +382,14 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_read_q(&per_sample);
aclk_stats_cloud_req(&per_sample);
+ aclk_stats_cloud_req_version(&per_sample);
+
+ aclk_stats_cloud_req_cmd(&per_sample);
+
aclk_stats_query_threads(aclk_queries_per_thread_sample);
+ aclk_stats_cpu_threads();
+
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency);
#endif
diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h
index 7e74fdf88..5e50a2272 100644
--- a/aclk/legacy/aclk_stats.h
+++ b/aclk/legacy/aclk_stats.h
@@ -55,6 +55,11 @@ extern struct aclk_mat_metrics {
void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
+#define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
+// if you change update cloud_req_type_names
+
+int aclk_cloud_req_type_to_idx(const char *name);
+
// reset to 0 on every sample
extern struct aclk_metrics_per_sample {
/* in the unlikely event of ACLK disconnecting
@@ -72,9 +77,14 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t read_q_added;
volatile uint32_t read_q_consumed;
- volatile uint32_t cloud_req_recvd;
+ volatile uint32_t cloud_req_ok;
volatile uint32_t cloud_req_err;
+ volatile uint16_t cloud_req_v1;
+ volatile uint16_t cloud_req_v2;
+
+ volatile uint16_t cloud_req_by_type[ACLK_STATS_CLOUD_REQ_TYPE_CNT];
+
#ifdef NETDATA_INTERNAL_CHECKS
struct aclk_metric_mat_data latency;
#endif
@@ -83,6 +93,7 @@ extern struct aclk_metrics_per_sample {
} aclk_metrics_per_sample;
extern uint32_t *aclk_queries_per_thread;
+extern struct rusage *rusage_per_thread;
void *aclk_stats_main_thread(void *ptr);
void aclk_stats_thread_cleanup();
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c
index e51a01308..5767df3a7 100644
--- a/aclk/legacy/agent_cloud_link.c
+++ b/aclk/legacy/agent_cloud_link.c
@@ -189,7 +189,8 @@ unsigned long int aclk_reconnect_delay(int mode)
delay = ACLK_MAX_BACKOFF_DELAY * 1000;
} else {
fail++;
- delay = (delay * 1000) + (random() % 1000);
+ delay *= 1000;
+ delay += (random() % (MAX(1000, delay/2)));
}
return delay;