summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
commitaa2fe8ccbfcb117efa207d10229eeeac5d0f97c7 (patch)
tree941cbdd387b41c1a81587c20a6df9f0e5e0ff7ab /aclk
parentAdding upstream version 1.37.1. (diff)
downloadnetdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.tar.xz
netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.zip
Adding upstream version 1.38.0.upstream/1.38.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk')
-rw-r--r--aclk/README.md28
-rw-r--r--aclk/aclk.c133
-rw-r--r--aclk/aclk.h2
-rw-r--r--aclk/aclk_capas.c2
-rw-r--r--aclk/aclk_otp.c33
-rw-r--r--aclk/aclk_query.c28
-rw-r--r--aclk/aclk_query_queue.c4
-rw-r--r--aclk/aclk_rx_msgs.c4
-rw-r--r--aclk/aclk_stats.c78
-rw-r--r--aclk/aclk_stats.h4
-rw-r--r--aclk/aclk_util.c120
-rw-r--r--aclk/aclk_util.h8
-rw-r--r--aclk/https_client.c26
-rw-r--r--aclk/https_client.h2
14 files changed, 355 insertions, 117 deletions
diff --git a/aclk/README.md b/aclk/README.md
index af0f5fdde..5b338dc2e 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -1,8 +1,12 @@
<!--
title: "Agent-Cloud link (ACLK)"
description: "The Agent-Cloud link (ACLK) is the mechanism responsible for connecting a Netdata agent to Netdata Cloud."
-date: 2020-05-11
-custom_edit_url: https://github.com/netdata/netdata/edit/master/aclk/README.md
+date: "2020-05-11"
+custom_edit_url: "https://github.com/netdata/netdata/edit/master/aclk/README.md"
+sidebar_label: "Agent-Cloud link (ACLK)"
+learn_status: "Published"
+learn_topic_type: "Tasks"
+learn_rel_path: "Setup"
-->
# Agent-cloud link (ACLK)
@@ -25,8 +29,8 @@ this is not an option in your case always verify the current domain resolution (
:::
For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [get
-started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [connect to Cloud
-documentation](/claim/README.md).
+started with Cloud](https://github.com/netdata/netdata/blob/master/docs/cloud/get-started.mdx) guide or the full [connect to Cloud
+documentation](https://github.com/netdata/netdata/blob/master/claim/README.md).
## Data privacy
[Data privacy](https://netdata.cloud/privacy/) is very important to us. We firmly believe that your data belongs to
@@ -37,7 +41,7 @@ The data passes through our systems, but it isn't stored.
However, to be able to offer the stunning visualizations and advanced functionality of Netdata Cloud, it does store a limited number of _metadata_.
-Read more about [Data privacy in the Netdata Cloud](https://learn.netdata.cloud/docs/cloud/data-privacy) in the documentation.
+Read more about [Data privacy in the Netdata Cloud](https://github.com/netdata/netdata/blob/master/docs/cloud/data-privacy.mdx) in the documentation.
## Enable and configure the ACLK
@@ -53,7 +57,7 @@ configuration uses two settings:
```
If your Agent needs to use a proxy to access the internet, you must [set up a proxy for
-connecting to cloud](/claim/README.md#connect-through-a-proxy).
+connecting to cloud](https://github.com/netdata/netdata/blob/master/claim/README.md#connect-through-a-proxy).
You can configure following keys in the `netdata.conf` section `[cloud]`:
```
@@ -72,8 +76,8 @@ You have two options if you prefer to disable the ACLK and not use Netdata Cloud
### Disable at installation
You can pass the `--disable-cloud` parameter to the Agent installation when using a kickstart script
-([kickstart.sh](/packaging/installer/methods/kickstart.md), or a [manual installation from
-Git](/packaging/installer/methods/manual.md).
+([kickstart.sh](https://github.com/netdata/netdata/blob/master/packaging/installer/methods/kickstart.md), or a [manual installation from
+Git](https://github.com/netdata/netdata/blob/master/packaging/installer/methods/manual.md).
When you pass this parameter, the installer does not download or compile any extra libraries. Once running, the Agent
kills the thread responsible for the ACLK and connecting behavior, and behaves as though the ACLK, and thus Netdata Cloud,
@@ -127,12 +131,12 @@ Restart your Agent to disable the ACLK.
### Re-enable the ACLK
If you first disable the ACLK and any Cloud functionality and then decide you would like to use Cloud, you must either
-[reinstall Netdata](/packaging/installer/REINSTALL.md) with Cloud enabled or change the runtime setting in your
+[reinstall Netdata](https://github.com/netdata/netdata/blob/master/packaging/installer/REINSTALL.md) with Cloud enabled or change the runtime setting in your
`cloud.conf` file.
If you passed `--disable-cloud` to `netdata-installer.sh` during installation, you must
-[reinstall](/packaging/installer/REINSTALL.md) your Agent. Use the same method as before, but pass `--require-cloud` to
-the installer. When installation finishes you can [connect your node](/claim/README.md#how-to-connect-a-node).
+[reinstall](https://github.com/netdata/netdata/blob/master/packaging/installer/REINSTALL.md) your Agent. Use the same method as before, but pass `--require-cloud` to
+the installer. When installation finishes you can [connect your node](https://github.com/netdata/netdata/blob/master/claim/README.md#how-to-connect-a-node).
If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` file, edit the file again and change
`enabled` to `yes`:
@@ -142,6 +146,6 @@ If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf`
enabled = yes
```
-Restart your Agent and [connect your node](/claim/README.md#how-to-connect-a-node).
+Restart your Agent and [connect your node](https://github.com/netdata/netdata/blob/master/claim/README.md#how-to-connect-a-node).
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 3b035b849..e80897221 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -49,6 +49,8 @@ float last_backoff_value = 0;
time_t aclk_block_until = 0;
+int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
+
#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
@@ -61,6 +63,26 @@ struct aclk_shared_state aclk_shared_state = {
.mqtt_shutdown_msg_rcvd = 0
};
+#ifdef MQTT_WSS_DEBUG
+#include <openssl/ssl.h>
+#define DEFAULT_SSKEYLOGFILE_NAME "SSLKEYLOGFILE"
+const char *ssl_log_filename = NULL;
+FILE *ssl_log_file = NULL;
+static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line)
+{
+ (void)ssl;
+ if (!ssl_log_file)
+ ssl_log_file = fopen(ssl_log_filename, "a");
+ if (!ssl_log_file) {
+ error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
+ return;
+ }
+ fputs(line, ssl_log_file);
+ putc('\n', ssl_log_file);
+ fflush(ssl_log_file);
+}
+#endif
+
#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
OSSL_DECODER_CTX *aclk_dctx = NULL;
EVP_PKEY *aclk_private_key = NULL;
@@ -137,7 +159,7 @@ static int wait_till_cloud_enabled()
info("Waiting for Cloud to be enabled");
while (!netdata_cloud_setting) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
}
return 0;
@@ -156,7 +178,7 @@ static int wait_till_agent_claimed(void)
char *agent_id = get_agent_claimid();
while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
agent_id = get_agent_claimid();
}
@@ -176,7 +198,7 @@ static int wait_till_agent_claimed(void)
static int wait_till_agent_claim_ready()
{
url_t url;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
if (wait_till_agent_claimed())
return 1;
@@ -288,7 +310,7 @@ static void puback_callback(uint16_t packet_id)
static int read_query_thread_count()
{
- int threads = MIN(processors/2, 6);
+ int threads = MIN(get_netdata_cpus()/2, 6);
threads = MAX(threads, 2);
threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
if(threads < 1) {
@@ -310,7 +332,7 @@ void aclk_graceful_disconnect(mqtt_wss_client client);
static int handle_connection(mqtt_wss_client client)
{
time_t last_periodic_query_wakeup = now_monotonic_sec();
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
@@ -365,6 +387,10 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_rcvd_cloud_msgs = 0;
aclk_connection_counter++;
+ aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
+ while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
+ mqtt_wss_set_topic_alias(client, topic);
+
aclk_send_agent_connection_update(client, 1);
}
@@ -435,11 +461,11 @@ static int aclk_block_till_recon_allowed() {
next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
- info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
+ info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (recon_delay > NETDATA_EXIT_POLL_MS) {
sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
@@ -449,7 +475,7 @@ static int aclk_block_till_recon_allowed() {
sleep_usec(recon_delay * USEC_PER_MS);
recon_delay = 0;
}
- return netdata_exit;
+ return !service_running(SERVICE_ACLK);
}
#ifndef ACLK_DISABLE_CHALLENGE
@@ -492,7 +518,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK)) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error_report("Do not move the cloud base url out of post_conf_load!!");
@@ -512,7 +538,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
- aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type);
struct mqtt_connect_params mqtt_conn_params = {
.clientid = "anon",
@@ -540,7 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
continue;
}
- if (netdata_exit)
+ if (!service_running(SERVICE_ACLK))
return 1;
if (aclk_env->encoding != ACLK_ENC_PROTO) {
@@ -610,7 +636,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- freez((char *)mqtt_conn_params.will_msg);
+ freez((char*)mqtt_conn_params.will_msg);
+ freez((char*)proxy_conf.host);
+ freez((char*)proxy_conf.username);
+ freez((char*)proxy_conf.password);
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
@@ -672,11 +701,23 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
+#ifdef MQTT_WSS_DEBUG
+ size_t default_ssl_log_filename_size = strlen(netdata_configured_log_dir) + strlen(DEFAULT_SSKEYLOGFILE_NAME) + 2;
+ char *default_ssl_log_filename = mallocz(default_ssl_log_filename_size);
+ snprintfz(default_ssl_log_filename, default_ssl_log_filename_size, "%s/%s", netdata_configured_log_dir, DEFAULT_SSKEYLOGFILE_NAME);
+ ssl_log_filename = config_get(CONFIG_SECTION_CLOUD, "aclk ssl keylog file", default_ssl_log_filename);
+ freez(default_ssl_log_filename);
+ if (ssl_log_filename) {
+ error_report("SSLKEYLOGFILE active (path:\"%s\")!", ssl_log_filename);
+ mqtt_wss_set_SSL_CTX_keylog_cb(mqttwss_client, aclk_ssl_keylog_cb);
+ }
+#endif
+
// Enable MQTT buffer growth if necessary
// e.g. old cloud architecture clients with huge nodes
// that send JSON payloads of 10 MB as single messages
@@ -690,8 +731,8 @@ void *aclk_main(void *ptr)
stats_thread->client = mqttwss_client;
aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
netdata_thread_create(
- stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
- stats_thread);
+ stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread);
}
// Keep reconnecting and talking until our time has come
@@ -709,10 +750,15 @@ void *aclk_main(void *ptr)
aclk_connected = 0;
log_access("ACLK DISCONNECTED");
}
- } while (!netdata_exit);
+ } while (service_running(SERVICE_ACLK));
aclk_graceful_disconnect(mqttwss_client);
+#ifdef MQTT_WSS_DEBUG
+ if (ssl_log_file)
+ fclose(ssl_log_file);
+#endif
+
exit_full:
// Tear Down
QUERY_THREAD_WAKEUP_ALL;
@@ -739,35 +785,40 @@ exit:
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
- int ret;
+ int ret = 0;
if (!aclk_connected)
return;
- ret = get_node_id(&host->host_uuid, &node_id);
- if (ret > 0) {
- // this means we were not able to check if node_id already present
- error("Unable to check for node_id. Ignoring the host state update.");
- return;
+ if (host->node_id && !uuid_is_null(*host->node_id)) {
+ uuid_copy(node_id, *host->node_id);
}
- if (ret < 0) {
- // node_id not found
- aclk_query_t create_query;
- create_query = aclk_query_new(REGISTER_NODE);
- rrdhost_aclk_state_lock(localhost);
- node_instance_creation_t node_instance_creation = {
- .claim_id = localhost->aclk_state.claimed_id,
- .hops = host->system_info->hops,
- .hostname = rrdhost_hostname(host),
- .machine_guid = host->machine_guid
- };
- create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
- rrdhost_aclk_state_unlock(localhost);
- create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
- create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
- aclk_queue_query(create_query);
- return;
+ else {
+ ret = get_node_id(&host->host_uuid, &node_id);
+ if (ret > 0) {
+ // this means we were not able to check if node_id already present
+ error("Unable to check for node_id. Ignoring the host state update.");
+ return;
+ }
+ if (ret < 0) {
+ // node_id not found
+ aclk_query_t create_query;
+ create_query = aclk_query_new(REGISTER_NODE);
+ rrdhost_aclk_state_lock(localhost);
+ node_instance_creation_t node_instance_creation = {
+ .claim_id = localhost->aclk_state.claimed_id,
+ .hops = host->system_info->hops,
+ .hostname = rrdhost_hostname(host),
+ .machine_guid = host->machine_guid};
+ create_query->data.bin_payload.payload =
+ generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+ rrdhost_aclk_state_unlock(localhost);
+ create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
+ create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+ info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+ aclk_queue_query(create_query);
+ return;
+ }
}
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
@@ -896,7 +947,7 @@ char *aclk_state(void)
#ifndef ENABLE_ACLK
return strdupz("ACLK Available: No");
#else
- BUFFER *wb = buffer_create(1024);
+ BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
struct tm *tmptr, tmbuf;
char *ret;
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 6aed548b7..56b24add9 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -26,6 +26,8 @@ extern time_t aclk_block_until;
extern int disconnect_req;
+extern int aclk_alert_reloaded;
+
#ifdef ENABLE_ACLK
void *aclk_main(void *ptr);
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c
index df9d18f63..290e7d8f4 100644
--- a/aclk/aclk_capas.c
+++ b/aclk/aclk_capas.c
@@ -36,7 +36,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
{ .name = "funcs", .version = 0, .enabled = 0 },
{ .name = NULL, .version = 0, .enabled = 0 }
};
- if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) {
+ if (host == localhost || (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS))) {
ni_caps[4].version = 1;
ni_caps[4].enabled = 1;
}
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index 2bdbb70fb..391313ffe 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -14,15 +14,19 @@ static int aclk_https_request(https_req_t *request, https_req_response_t *respon
// wrapper for ACLK only which loads ACLK specific proxy settings
// then only calls https_request
struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
- aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
+ aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type);
if (proxy_conf.type == MQTT_WSS_PROXY_HTTP) {
request->proxy_host = (char*)proxy_conf.host; // TODO make it const as well
request->proxy_port = proxy_conf.port;
+ request->proxy_username = proxy_conf.username;
+ request->proxy_password = proxy_conf.password;
}
rc = https_request(request, response);
freez((char*)proxy_conf.host);
+ freez((char*)proxy_conf.username);
+ freez((char*)proxy_conf.password);
return rc;
}
@@ -303,25 +307,6 @@ inline static int base64_decode_helper(unsigned char *out, int *outl, const unsi
return 0;
}
-inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
-{
- int len;
- unsigned char *str = out;
- EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
- EVP_EncodeInit(ctx);
- EVP_EncodeUpdate(ctx, str, outl, in, in_len);
- str += *outl;
- EVP_EncodeFinal(ctx, str, &len);
- *outl += len;
- // if we ever expect longer output than what OpenSSL would pack into single line
- // we would have to skip the endlines, until then we can just cut the string short
- str = (unsigned char*)strchr((char*)out, '\n');
- if (str)
- *str = 0;
- EVP_ENCODE_CTX_free(ctx);
- return 0;
-}
-
#define OTP_URL_PREFIX "/api/v1/auth/node/"
int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **challenge, int *challenge_bytes)
{
@@ -329,7 +314,7 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **
https_req_t req = HTTPS_REQ_T_INITIALIZER;
https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
- BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+ BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20, &netdata_buffers_statistics.buffers_aclk);
req.host = target->host;
req.port = target->port;
@@ -409,8 +394,8 @@ int aclk_send_otp_response(const char *agent_id, const unsigned char *response,
base64_encode_helper(base64, &len, response, response_bytes);
- BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
- BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20);
+ BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20, &netdata_buffers_statistics.buffers_aclk);
+ BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20, &netdata_buffers_statistics.buffers_aclk);
buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id);
buffer_sprintf(resp_json, "{\"response\":\"%s\"}", base64);
@@ -829,7 +814,7 @@ exit:
}
int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
- BUFFER *buf = buffer_create(1024);
+ BUFFER *buf = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
https_req_t req = HTTPS_REQ_T_INITIALIZER;
https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 5301c281f..9eced0811 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -4,8 +4,6 @@
#include "aclk_stats.h"
#include "aclk_tx_msgs.h"
-#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
-
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
@@ -64,19 +62,19 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
int retval = 0;
usec_t t;
BUFFER *local_buffer = NULL;
- BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE);
+ BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE, &netdata_buffers_statistics.buffers_aclk);
RRDHOST *query_host = localhost;
#ifdef NETDATA_WITH_ZLIB
int z_ret;
- BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
char *start, *end;
#endif
struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
- w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
- w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
+ w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
+ w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk);
+ w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE, &netdata_buffers_statistics.buffers_aclk);
strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
@@ -193,7 +191,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w);
- local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+ local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
local_buffer->contenttype = CT_APPLICATION_JSON;
buffer_strcat(local_buffer, w->response.header_output->buffer);
@@ -327,6 +325,11 @@ static void worker_aclk_register(void) {
}
}
+static void aclk_query_request_cancel(void *data)
+{
+ pthread_cond_broadcast((pthread_cond_t *) data);
+}
+
/**
* Main query processing thread
*/
@@ -336,7 +339,9 @@ void *aclk_query_main_thread(void *ptr)
struct aclk_query_thread *query_thr = ptr;
- while (!netdata_exit) {
+ service_register(SERVICE_THREAD_TYPE_NETDATA, aclk_query_request_cancel, NULL, &query_cond_wait, false);
+
+ while (service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES)) {
aclk_query_process_msgs(query_thr);
worker_is_idle();
@@ -359,14 +364,13 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss
query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
for (int i = 0; i < query_threads->count; i++) {
query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
+ query_threads->thread_list[i].client = client;
- if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
+ if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "ACLK_QRY[%d]", i) < 0))
error("snprintf encoding error");
netdata_thread_create(
&query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
&query_threads->thread_list[i]);
-
- query_threads->thread_list[i].client = client;
}
}
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 9a450571e..e7cad5ded 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -26,7 +26,7 @@ static inline int _aclk_queue_query(aclk_query_t query)
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
- if(!netdata_exit)
+ if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
aclk_query_free(query);
return 1;
@@ -66,7 +66,7 @@ aclk_query_t aclk_queue_pop(void)
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
- if(!netdata_exit)
+ if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
return NULL;
}
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 83bc5508b..104fbcb3e 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -283,9 +283,7 @@ int create_node_instance_result(const char *msg, size_t msg_len)
node_state_update.live = 1;
node_state_update.hops = 0;
} else {
- netdata_mutex_lock(&host->receiver_lock);
- node_state_update.live = (host->receiver != NULL);
- netdata_mutex_unlock(&host->receiver_lock);
+ node_state_update.live = (!rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN));
node_state_update.hops = host->system_info->hops;
}
}
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 215313ff9..2b4d5e48a 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -1,5 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
+#define MQTT_WSS_CPUSTATS
+
#include "aclk_stats.h"
#include "aclk_query.h"
@@ -143,7 +145,9 @@ static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = {
"alarms",
"alarm_log",
"chart",
- "charts"
+ "charts",
+ "function",
+ "functions"
// if you change then update `ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT`.
};
@@ -257,6 +261,23 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
static uint64_t sent = 0;
static uint64_t recvd = 0;
+ static RRDSET *st_txbuf_perc = NULL;
+ static RRDDIM *rd_txbuf_perc = NULL;
+
+ static RRDSET *st_txbuf = NULL;
+ static RRDDIM *rd_tx_buffer_usable = NULL;
+ static RRDDIM *rd_tx_buffer_reclaimable = NULL;
+ static RRDDIM *rd_tx_buffer_used = NULL;
+ static RRDDIM *rd_tx_buffer_free = NULL;
+ static RRDDIM *rd_tx_buffer_size = NULL;
+
+ static RRDSET *st_timing = NULL;
+ static RRDDIM *rd_keepalive = NULL;
+ static RRDDIM *rd_read_socket = NULL;
+ static RRDDIM *rd_write_socket = NULL;
+ static RRDDIM *rd_process_websocket = NULL;
+ static RRDDIM *rd_process_mqtt = NULL;
+
sent += stats->bytes_tx;
recvd += stats->bytes_rx;
@@ -269,10 +290,61 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
+ if (unlikely(!st_txbuf_perc)) {
+ st_txbuf_perc = rrdset_create_localhost(
+ "netdata", "aclk_mqtt_tx_perc", NULL, "aclk", NULL, "Actively used percentage of MQTT Tx Buffer,", "%",
+ "netdata", "stats", 200012, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_txbuf_perc = rrddim_add(st_txbuf_perc, "used", NULL, 1, 100, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ if (unlikely(!st_txbuf)) {
+ st_txbuf = rrdset_create_localhost(
+ "netdata", "aclk_mqtt_tx_queue", NULL, "aclk", NULL, "State of transmit MQTT queue.", "B",
+ "netdata", "stats", 200013, localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ rd_tx_buffer_usable = rrddim_add(st_txbuf, "usable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_reclaimable = rrddim_add(st_txbuf, "reclaimable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_used = rrddim_add(st_txbuf, "used", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_free = rrddim_add(st_txbuf, "free", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_tx_buffer_size = rrddim_add(st_txbuf, "size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ if (unlikely(!st_timing)) {
+ st_timing = rrdset_create_localhost(
+ "netdata", "aclk_mqtt_wss_time", NULL, "aclk", NULL, "Time spent handling MQTT, WSS, SSL and network communication.", "us",
+ "netdata", "stats", 200014, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_keepalive = rrddim_add(st_timing, "keep-alive", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_read_socket = rrddim_add(st_timing, "socket_read_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_write_socket = rrddim_add(st_timing, "socket_write_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_process_websocket = rrddim_add(st_timing, "process_websocket", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rd_process_mqtt = rrddim_add(st_timing, "process_mqtt", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
rrddim_set_by_pointer(st, rd_sent, sent);
rrddim_set_by_pointer(st, rd_recvd, recvd);
+ float usage = ((float)stats->mqtt.tx_buffer_free + stats->mqtt.tx_buffer_reclaimable) / stats->mqtt.tx_buffer_size;
+ usage = (1 - usage) * 10000;
+ rrddim_set_by_pointer(st_txbuf_perc, rd_txbuf_perc, usage);
+
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_usable, stats->mqtt.tx_buffer_reclaimable + stats->mqtt.tx_buffer_free);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_reclaimable, stats->mqtt.tx_buffer_reclaimable);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_used, stats->mqtt.tx_buffer_used);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_free, stats->mqtt.tx_buffer_free);
+ rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_size, stats->mqtt.tx_buffer_size);
+
+ rrddim_set_by_pointer(st_timing, rd_keepalive, stats->time_keepalive);
+ rrddim_set_by_pointer(st_timing, rd_read_socket, stats->time_read_socket);
+ rrddim_set_by_pointer(st_timing, rd_write_socket, stats->time_write_socket);
+ rrddim_set_by_pointer(st_timing, rd_process_websocket, stats->time_process_websocket);
+ rrddim_set_by_pointer(st_timing, rd_process_mqtt, stats->time_process_mqtt);
+
rrdset_done(st);
+ rrdset_done(st_txbuf_perc);
+ rrdset_done(st_txbuf);
+ rrdset_done(st_timing);
}
void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
@@ -312,13 +384,13 @@ void *aclk_stats_main_thread(void *ptr)
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
- while (!netdata_exit) {
+ while (service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) {
netdata_thread_testcancel();
// ------------------------------------------------------------------------
// Wait for the next iteration point.
heartbeat_next(&hb, step_ut);
- if (netdata_exit) break;
+ if (!service_running(SERVICE_ACLK | SERVICE_COLLECTORS)) break;
ACLK_STATS_LOCK;
// to not hold lock longer than necessary, especially not to hold it
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index bec9ac247..002ebcfa6 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -8,15 +8,13 @@
#include "aclk_query_queue.h"
#include "mqtt_wss_client.h"
-#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
-
extern netdata_mutex_t aclk_stats_mutex;
#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
// if you change update `cloud_req_http_type_names`.
-#define ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT 7
+#define ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT 9
int aclk_cloud_req_http_type_to_idx(const char *name);
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 01eaedc8e..ebf428ff9 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -308,6 +308,24 @@ const char *aclk_get_topic(enum aclk_topics topic)
}
/*
+ * Allows iterating all topics in topic cache without
+ * having to resort to callbacks.
+ */
+
+const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter)
+{
+ if (!aclk_topic_cache) {
+ error("Topic cache not initialized when %s was called.", __FUNCTION__);
+ return NULL;
+ }
+
+ if (*iter >= aclk_topic_cache_items)
+ return NULL;
+
+ return aclk_topic_cache[(*iter)++]->topic;
+}
+
+/*
* TBEB with randomness
*
* @param reset 1 - to reset the delay,
@@ -346,43 +364,117 @@ unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, un
return delay;
}
+static inline int aclk_parse_pair(const char *src, const char c, char **a, char **b)
+{
+ const char *ptr = strchr(src, c);
+ if (ptr == NULL)
+ return 1;
+
+// allow empty string
+/* if (!*(ptr+1))
+ return 1;*/
+
+ *a = callocz(1, ptr - src + 1);
+ memcpy (*a, src, ptr - src);
+
+ *b = strdupz(ptr+1);
+
+ return 0;
+}
#define HTTP_PROXY_PREFIX "http://"
-void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type)
+void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type)
{
ACLK_PROXY_TYPE pt;
const char *ptr = aclk_get_proxy(&pt);
char *tmp;
- char *host;
+
if (pt != PROXY_TYPE_HTTP)
return;
+ *uname = NULL;
+ *pwd = NULL;
*port = 0;
+ char *proxy = strdupz(ptr);
+ ptr = proxy;
+
if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
ptr += strlen(HTTP_PROXY_PREFIX);
- if ((tmp = strchr(ptr, '@')))
- ptr = tmp;
+ if ((tmp = strchr(ptr, '@'))) {
+ *tmp = 0;
+ if(aclk_parse_pair(ptr, ':', uname, pwd)) {
+ error_report("Failed to get username and password for proxy. Will attempt connection without authentication");
+ }
+ ptr = tmp+1;
+ }
- if ((tmp = strchr(ptr, '/'))) {
- host = mallocz((tmp - ptr) + 1);
- memcpy(host, ptr, (tmp - ptr));
- host[tmp - ptr] = 0;
- } else
- host = strdupz(ptr);
+ if (!*ptr) {
+ freez(proxy);
+ freez(*uname);
+ freez(*pwd);
+ return;
+ }
- if ((tmp = strchr(host, ':'))) {
+ if ((tmp = strchr(ptr, ':'))) {
*tmp = 0;
tmp++;
- *port = atoi(tmp);
+ if(*tmp)
+ *port = atoi(tmp);
}
+ *ohost = strdupz(ptr);
if (*port <= 0 || *port > 65535)
*port = 8080;
- *ohost = host;
-
if (type)
*type = MQTT_WSS_PROXY_HTTP;
+ else {
+ freez(*uname);
+ freez(*pwd);
+ }
+
+ freez(proxy);
+}
+
+#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
+static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
+{
+ EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
+
+ if (ctx != NULL) {
+ memset(ctx, 0, sizeof(*ctx));
+ }
+ return ctx;
+}
+static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
+{
+ OPENSSL_free(ctx);
+ return;
+}
+#endif
+
+int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
+{
+ int len;
+ unsigned char *str = out;
+ EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
+ EVP_EncodeInit(ctx);
+ EVP_EncodeUpdate(ctx, str, outl, in, in_len);
+ str += *outl;
+ EVP_EncodeFinal(ctx, str, &len);
+ *outl += len;
+
+ str = out;
+ while(*str) {
+ if (*str != 0x0D && *str != 0x0A)
+ *out++ = *str++;
+ else
+ str++;
+ }
+ *out = 0;
+
+ EVP_ENCODE_CTX_free(ctx);
+ return 0;
}
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index ed715e046..76dc8cad9 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -93,9 +93,13 @@ enum aclk_topics {
ACLK_TOPICID_CTXS_UPDATED = 20
};
+typedef size_t aclk_topic_cache_iter_t;
+#define ACLK_TOPIC_CACHE_ITER_T_INITIALIZER (0)
+
const char *aclk_get_topic(enum aclk_topics topic);
int aclk_generate_topic_cache(struct json_object *json);
void free_topic_cache(void);
+const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter);
// TODO
// aclk_topics_reload //when claim id changes
@@ -107,6 +111,8 @@ extern volatile int aclk_conversation_log_counter;
unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max);
#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0)
-void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type);
+void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type);
+
+int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len);
#endif /* ACLK_UTIL_H */
diff --git a/aclk/https_client.c b/aclk/https_client.c
index 1a32f833f..e2a42eef3 100644
--- a/aclk/https_client.c
+++ b/aclk/https_client.c
@@ -6,6 +6,10 @@
#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
+#include "aclk_util.h"
+
+#include "daemon/global_statistics.h"
+
enum http_parse_state {
HTTP_PARSE_INITIAL = 0,
HTTP_PARSE_HEADERS,
@@ -352,7 +356,7 @@ static int read_parse_response(https_req_ctx_t *ctx) {
#define TX_BUFFER_SIZE 8192
#define RX_BUFFER_SIZE (TX_BUFFER_SIZE*2)
static int handle_http_request(https_req_ctx_t *ctx) {
- BUFFER *hdr = buffer_create(TX_BUFFER_SIZE);
+ BUFFER *hdr = buffer_create(TX_BUFFER_SIZE, &netdata_buffers_statistics.buffers_aclk);
int rc = 0;
http_parse_ctx_clear(&ctx->parse_ctx);
@@ -392,6 +396,24 @@ static int handle_http_request(https_req_ctx_t *ctx) {
if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) {
buffer_sprintf(hdr, "Content-Length: %zu\x0D\x0A", ctx->request->payload_size);
}
+ if (ctx->request->proxy_username) {
+ size_t creds_plain_len = strlen(ctx->request->proxy_username) + strlen(ctx->request->proxy_password) + 1 /* ':' */;
+ char *creds_plain = callocz(1, creds_plain_len + 1);
+ char *ptr = creds_plain;
+ strcpy(ptr, ctx->request->proxy_username);
+ ptr += strlen(ctx->request->proxy_username);
+ *ptr++ = ':';
+ strcpy(ptr, ctx->request->proxy_password);
+
+ int creds_base64_len = (((4 * creds_plain_len / 3) + 3) & ~3);
+ // OpenSSL encoder puts newline every 64 output bytes
+ // we remove those but during encoding we need that space in the buffer
+ creds_base64_len += (1+(creds_base64_len/64)) * strlen("\n");
+ char *creds_base64 = callocz(1, creds_base64_len + 1);
+ base64_encode_helper((unsigned char*)creds_base64, &creds_base64_len, (unsigned char*)creds_plain, creds_plain_len);
+ buffer_sprintf(hdr, "Proxy-Authorization: Basic %s\x0D\x0A", creds_base64);
+ freez(creds_plain);
+ }
buffer_strcat(hdr, "\x0D\x0A");
@@ -491,6 +513,8 @@ int https_request(https_req_t *request, https_req_response_t *response) {
req.host = request->host;
req.port = request->port;
req.url = request->url;
+ req.proxy_username = request->proxy_username;
+ req.proxy_password = request->proxy_password;
ctx->request = &req;
if (handle_http_request(ctx)) {
error("Failed to CONNECT with proxy");
diff --git a/aclk/https_client.h b/aclk/https_client.h
index f7bc3d43d..daf4766f8 100644
--- a/aclk/https_client.h
+++ b/aclk/https_client.h
@@ -25,6 +25,8 @@ typedef struct {
char *proxy_host;
int proxy_port;
+ const char *proxy_username;
+ const char *proxy_password;
} https_req_t;
typedef struct {