summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-11-30 18:47:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-11-30 18:47:00 +0000
commit03bf87dcb06f7021bfb2df2fa8691593c6148aff (patch)
treee16b06711a2ed77cafb4b7754be0220c3d14a9d7 /aclk
parentAdding upstream version 1.36.1. (diff)
downloadnetdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.tar.xz
netdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.zip
Adding upstream version 1.37.0.upstream/1.37.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--aclk/README.md8
-rw-r--r--aclk/aclk.c209
-rw-r--r--aclk/aclk.h21
-rw-r--r--aclk/aclk_alarm_api.c3
-rw-r--r--aclk/aclk_api.c88
-rw-r--r--aclk/aclk_api.h45
-rw-r--r--aclk/aclk_capas.c47
-rw-r--r--aclk/aclk_capas.h14
-rw-r--r--aclk/aclk_charts_api.c77
-rw-r--r--aclk/aclk_charts_api.h22
-rw-r--r--aclk/aclk_contexts_api.c18
-rw-r--r--aclk/aclk_contexts_api.h2
-rw-r--r--aclk/aclk_otp.c11
-rw-r--r--aclk/aclk_query.c32
-rw-r--r--aclk/aclk_query.h2
-rw-r--r--aclk/aclk_query_queue.c16
-rw-r--r--aclk/aclk_rx_msgs.c70
-rw-r--r--aclk/aclk_stats.c33
-rw-r--r--aclk/aclk_tx_msgs.c94
-rw-r--r--aclk/aclk_util.c1
-rw-r--r--aclk/helpers/mqtt_wss_pal.h19
-rw-r--r--aclk/helpers/ringbuffer_pal.h11
-rw-r--r--aclk/schema-wrappers/capability.cc2
-rw-r--r--aclk/schema-wrappers/capability.h2
-rw-r--r--aclk/schema-wrappers/chart_config.cc105
-rw-r--r--aclk/schema-wrappers/chart_config.h50
-rw-r--r--aclk/schema-wrappers/chart_stream.cc337
-rw-r--r--aclk/schema-wrappers/chart_stream.h121
-rw-r--r--aclk/schema-wrappers/connection.cc10
-rw-r--r--aclk/schema-wrappers/connection.h2
-rw-r--r--aclk/schema-wrappers/node_connection.cc4
-rw-r--r--aclk/schema-wrappers/node_connection.h2
-rw-r--r--aclk/schema-wrappers/node_creation.cc6
-rw-r--r--aclk/schema-wrappers/node_creation.h6
-rw-r--r--aclk/schema-wrappers/node_info.cc4
-rw-r--r--aclk/schema-wrappers/node_info.h54
-rw-r--r--aclk/schema-wrappers/proto_2_json.cc16
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h2
38 files changed, 341 insertions, 1225 deletions
diff --git a/aclk/README.md b/aclk/README.md
index 6f541c38e..af0f5fdde 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -19,7 +19,7 @@ The Cloud App lives at app.netdata.cloud which currently resolves to the followi
:::caution
-This list of IPs can change without notice, we strongly advise you to whitelist the domain `app.netdata.cloud`, if
+This list of IPs can change without notice, we strongly advise you to whitelist following domains `api.netdata.cloud`, `mqtt.netdata.cloud`, if
this is not an option in your case always verify the current domain resolution (e.g via the `host` command).
:::
@@ -49,7 +49,7 @@ configuration uses two settings:
```conf
[global]
enabled = yes
- cloud base url = https://app.netdata.cloud
+ cloud base url = https://api.netdata.cloud
```
If your Agent needs to use a proxy to access the internet, you must [set up a proxy for
@@ -60,12 +60,10 @@ You can configure following keys in the `netdata.conf` section `[cloud]`:
[cloud]
statistics = yes
query thread count = 2
- mqtt5 = yes
```
- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
-- `mqtt5` allows disabling the new MQTT5 implementation which is used now by default in case of issues. This option will be removed in future stable release.
## Disable the ACLK
@@ -112,7 +110,7 @@ must contain only `EOF`.
```bash
[global]
enabled = no
- cloud base url = https://app.netdata.cloud
+ cloud base url = https://api.netdata.cloud
EOF
```
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 7b3641b1e..3b035b849 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -2,6 +2,7 @@
#include "aclk.h"
+#ifdef ENABLE_ACLK
#include "aclk_stats.h"
#include "mqtt_wss_client.h"
#include "aclk_otp.h"
@@ -12,6 +13,7 @@
#include "aclk_rx_msgs.h"
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
+#include "aclk_capas.h"
#include "aclk_proxy.h"
@@ -23,21 +25,31 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
+#endif /* ENABLE_ACLK */
+
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
int aclk_rcvd_cloud_msgs = 0;
int aclk_connection_counter = 0;
int disconnect_req = 0;
+int aclk_connected = 0;
+int aclk_ctx_based = 0;
+int aclk_disable_runtime = 0;
+int aclk_stats_enabled;
+int aclk_kill_link = 0;
+
+usec_t aclk_session_us = 0;
+time_t aclk_session_sec = 0;
+
time_t last_conn_time_mqtt = 0;
time_t last_conn_time_appl = 0;
time_t last_disconnect_time = 0;
time_t next_connection_attempt = 0;
float last_backoff_value = 0;
-int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
-
time_t aclk_block_until = 0;
+#ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -447,9 +459,9 @@ static int aclk_block_till_recon_allowed() {
*/
static int aclk_get_transport_idx(aclk_env_t *env) {
for (size_t i = 0; i < env->transport_count; i++) {
- // currently we support only MQTT 3
+ // currently we support only MQTT 5
// therefore select first transport that matches
- if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+ if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
return i;
}
}
@@ -483,7 +495,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
- error("Do not move the cloud base url out of post_conf_load!!");
+ error_report("Do not move the cloud base url out of post_conf_load!!");
return -1;
}
@@ -493,13 +505,13 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
info("Attempting connection now");
memset(&base_url, 0, sizeof(url_t));
if (url_parse(cloud_base_url, &base_url)) {
- error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
+ error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
url_t_destroy(&base_url);
continue;
}
- struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
struct mqtt_connect_params mqtt_conn_params = {
@@ -523,7 +535,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
url_t_destroy(&base_url);
if (ret) {
- error("Failed to Get ACLK environment");
+ error_report("Failed to Get ACLK environment");
// delay handled by aclk_block_till_recon_allowed
continue;
}
@@ -537,14 +549,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
}
if (!aclk_env_has_capa("proto")) {
- error ("Can't use encoding=proto without at least \"proto\" capability.");
+ error_report("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
info("New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
- error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+ error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
url_t_destroy(&auth_url);
continue;
}
@@ -552,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
url_t_destroy(&auth_url);
if (ret) {
- error("Error passing Challenge/Response to get OTP");
+ error_report("Error passing Challenge/Response to get OTP");
continue;
}
@@ -561,20 +573,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
- error("Couldn't get LWT topic. Will not send LWT.");
+ error_report("Couldn't get LWT topic. Will not send LWT.");
continue;
}
// Do the MQTT connection
ret = aclk_get_transport_idx(aclk_env);
if (ret < 0) {
- error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+ error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
continue;
}
memset(&mqtt_url, 0, sizeof(url_t));
if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
- error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+ error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
url_t_destroy(&mqtt_url);
continue;
}
@@ -660,9 +672,7 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
+ if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -672,7 +682,7 @@ void *aclk_main(void *ptr)
// that send JSON payloads of 10 MB as single messages
mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
- aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
+ aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
if (aclk_stats_enabled) {
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
@@ -748,7 +758,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_instance_creation_t node_instance_creation = {
.claim_id = localhost->aclk_state.claimed_id,
.hops = host->system_info->hops,
- .hostname = host->hostname,
+ .hostname = rrdhost_hostname(host),
.machine_guid = host->machine_guid
};
create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
@@ -770,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_agent_capas();
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -815,15 +818,8 @@ void aclk_send_node_instances()
char host_id[UUID_STR_LEN];
uuid_unparse_lower(list->host_id, host_id);
- RRDHOST *host = rrdhost_find_by_guid(host_id, 0);
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ RRDHOST *host = rrdhost_find_by_guid(host_id);
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -832,6 +828,8 @@ void aclk_send_node_instances()
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
list->live,
list->hops);
+
+ freez((void*)node_state_update.capabilities);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
@@ -853,7 +851,7 @@ void aclk_send_node_instances()
rrdhost_aclk_state_unlock(localhost);
info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
list->hops);
- freez(node_instance_creation.machine_guid);
+ freez((void *)node_instance_creation.machine_guid);
aclk_queue_query(create_query);
}
freez(list->hostname);
@@ -891,41 +889,13 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
status.last_submitted_sequence_id
);
}
+#endif /* ENABLE_ACLK */
-static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
-{
- struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
- if (!stats) {
- buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host");
- return;
- }
- buffer_sprintf(wb,
- "\n\t\tUpdates: %d"
- "\n\t\tBatch ID: %"PRIu64
- "\n\t\tMin Seq ID: %"PRIu64
- "\n\t\tMax Seq ID: %"PRIu64
- "\n\t\tPending Min Seq ID: %"PRIu64
- "\n\t\tPending Max Seq ID: %"PRIu64
- "\n\t\tSent Min Seq ID: %"PRIu64
- "\n\t\tSent Max Seq ID: %"PRIu64
- "\n\t\tAcked Min Seq ID: %"PRIu64
- "\n\t\tAcked Max Seq ID: %"PRIu64,
- stats->updates,
- stats->batch_id,
- stats->min_seqid,
- stats->max_seqid,
- stats->min_seqid_pend,
- stats->max_seqid_pend,
- stats->min_seqid_sent,
- stats->max_seqid_sent,
- stats->min_seqid_ack,
- stats->max_seqid_ack
- );
- freez(stats);
-}
-
-char *ng_aclk_state(void)
+char *aclk_state(void)
{
+#ifndef ENABLE_ACLK
+ return strdupz("ACLK Available: No");
+#else
BUFFER *wb = buffer_create(1024);
struct tm *tmptr, tmbuf;
char *ret;
@@ -935,7 +905,7 @@ char *ng_aclk_state(void)
"ACLK Version: 2\n"
"Protocols Supported: Protobuf\n"
);
- buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
+ buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
char *agent_id = get_agent_claimid();
if (agent_id == NULL)
@@ -974,7 +944,7 @@ char *ng_aclk_state(void)
RRDHOST *host;
rrd_rdlock();
rrdhost_foreach_read(host) {
- buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname);
+ buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
buffer_strcat(wb, "\tClaimed ID: ");
rrdhost_aclk_state_lock(host);
@@ -1000,9 +970,6 @@ char *ng_aclk_state(void)
buffer_strcat(wb, "\n\tAlert Streaming Status:");
fill_alert_status_for_host(wb, host);
-
- buffer_strcat(wb, "\n\tChart Streaming Status:");
- fill_chart_status_for_host(wb, host);
}
rrd_unlock();
}
@@ -1010,8 +977,10 @@ char *ng_aclk_state(void)
ret = strdupz(buffer_tostring(wb));
buffer_free(wb);
return ret;
+#endif /* ENABLE_ACLK */
}
+#ifdef ENABLE_ACLK
static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
{
struct proto_alert_status status;
@@ -1038,45 +1007,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
json_object_object_add(obj, "last-submitted-seq-id", tmp);
}
-static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
-{
- struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
- if (!stats)
- return;
-
- json_object *tmp = json_object_new_int(stats->updates);
- json_object_object_add(obj, "updates", tmp);
-
- tmp = json_object_new_int(stats->batch_id);
- json_object_object_add(obj, "batch-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid);
- json_object_object_add(obj, "min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid);
- json_object_object_add(obj, "max-seq-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid_pend);
- json_object_object_add(obj, "pending-min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid_pend);
- json_object_object_add(obj, "pending-max-seq-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid_sent);
- json_object_object_add(obj, "sent-min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid_sent);
- json_object_object_add(obj, "sent-max-seq-id", tmp);
-
- tmp = json_object_new_int(stats->min_seqid_ack);
- json_object_object_add(obj, "acked-min-seq-id", tmp);
-
- tmp = json_object_new_int(stats->max_seqid_ack);
- json_object_object_add(obj, "acked-max-seq-id", tmp);
-
- freez(stats);
-}
-
static json_object *timestamp_to_json(const time_t *t)
{
struct tm *tmptr, tmbuf;
@@ -1087,9 +1017,13 @@ static json_object *timestamp_to_json(const time_t *t)
}
return NULL;
}
+#endif /* ENABLE_ACLK */
-char *ng_aclk_state_json(void)
+char *aclk_state_json(void)
{
+#ifndef ENABLE_ACLK
+ return strdupz("{\"aclk-available\":false}");
+#else
json_object *tmp, *grp, *msg = json_object_new_object();
tmp = json_object_new_boolean(1);
@@ -1124,7 +1058,7 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_string("Protobuf");
json_object_object_add(msg, "used-cloud-protocol", tmp);
- tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
+ tmp = json_object_new_int(5);
json_object_object_add(msg, "mqtt-version", tmp);
tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
@@ -1155,7 +1089,7 @@ char *ng_aclk_state_json(void)
rrdhost_foreach_read(host) {
json_object *nodeinstance = json_object_new_object();
- tmp = json_object_new_string(host->hostname);
+ tmp = json_object_new_string(rrdhost_hostname(host));
json_object_object_add(nodeinstance, "hostname", tmp);
tmp = json_object_new_string(host->machine_guid);
@@ -1191,10 +1125,6 @@ char *ng_aclk_state_json(void)
fill_alert_status_for_host_json(tmp, host);
json_object_object_add(nodeinstance, "alert-sync-status", tmp);
- tmp = json_object_new_object();
- fill_chart_status_for_host_json(tmp, host);
- json_object_object_add(nodeinstance, "chart-sync-status", tmp);
-
json_object_array_add(grp, nodeinstance);
}
rrd_unlock();
@@ -1203,4 +1133,41 @@ char *ng_aclk_state_json(void)
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
return str;
+#endif /* ENABLE_ACLK */
+}
+
+void add_aclk_host_labels(void) {
+ DICTIONARY *labels = localhost->rrdlabels;
+
+#ifdef ENABLE_ACLK
+ rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+ ACLK_PROXY_TYPE aclk_proxy;
+ char *proxy_str;
+ aclk_get_proxy(&aclk_proxy);
+
+ switch(aclk_proxy) {
+ case PROXY_TYPE_SOCKS5:
+ proxy_str = "SOCKS5";
+ break;
+ case PROXY_TYPE_HTTP:
+ proxy_str = "HTTP";
+ break;
+ default:
+ proxy_str = "none";
+ break;
+ }
+
+ rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#else
+ rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
+#endif
+}
+
+void aclk_queue_node_info(RRDHOST *host) {
+ struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
+ if (likely(wc)) {
+ wc->node_info_send = 1;
+ }
}
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 5065ac2bf..6aed548b7 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -3,17 +3,30 @@
#define ACLK_H
#include "daemon/common.h"
+
+#ifdef ENABLE_ACLK
#include "aclk_util.h"
#include "aclk_rrdhost_state.h"
// How many MQTT PUBACKs we need to get to consider connection
// stable for the purposes of TBEB (truncated binary exponential backoff)
#define ACLK_PUBACKS_CONN_STABLE 3
+#endif /* ENABLE_ACLK */
+
+extern int aclk_connected;
+extern int aclk_ctx_based;
+extern int aclk_disable_runtime;
+extern int aclk_stats_enabled;
+extern int aclk_kill_link;
+
+extern usec_t aclk_session_us;
+extern time_t aclk_session_sec;
extern time_t aclk_block_until;
extern int disconnect_req;
+#ifdef ENABLE_ACLK
void *aclk_main(void *ptr);
extern netdata_mutex_t aclk_shared_state_mutex;
@@ -34,7 +47,11 @@ void aclk_send_node_instances(void);
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
-char *ng_aclk_state(void);
-char *ng_aclk_state_json(void);
+#endif /* ENABLE_ACLK */
+
+char *aclk_state(void);
+char *aclk_state_json(void);
+void add_aclk_host_labels(void);
+void aclk_queue_node_info(RRDHOST *host);
#endif /* ACLK_H */
diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c
index a181eb291..7df51a7b5 100644
--- a/aclk/aclk_alarm_api.c
+++ b/aclk/aclk_alarm_api.c
@@ -23,9 +23,6 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
char *payload = generate_alarm_log_entry(&payload_size, log_entry);
aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
-
- if (!use_mqtt_5)
- freez(payload);
}
void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
deleted file mode 100644
index 141d267af..000000000
--- a/aclk/aclk_api.c
+++ /dev/null
@@ -1,88 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#include "libnetdata/libnetdata.h"
-#include "database/rrd.h"
-
-#ifdef ENABLE_ACLK
-#include "aclk.h"
-#endif
-
-int aclk_connected = 0;
-int aclk_kill_link = 0;
-
-usec_t aclk_session_us = 0;
-time_t aclk_session_sec = 0;
-
-int aclk_disable_runtime = 0;
-
-int aclk_stats_enabled;
-int use_mqtt_5 = 0;
-int aclk_ctx_based = 0;
-
-#define ACLK_IMPL_KEY_NAME "aclk implementation"
-
-#ifdef ENABLE_ACLK
-void *aclk_starter(void *ptr) {
- char *aclk_impl_req = config_get(CONFIG_SECTION_CLOUD, ACLK_IMPL_KEY_NAME, "ng");
-
- if (!strcasecmp(aclk_impl_req, "ng")) {
- return aclk_main(ptr);
- } else if (!strcasecmp(aclk_impl_req, "legacy")) {
- error("Legacy ACLK is not supported anymore key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\" ignored. Using ACLK-NG.");
- } else {
- error("Unknown value \"%s\" of key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\". Using ACLK-NG. This config key will be deprecated.", aclk_impl_req);
- }
- return aclk_main(ptr);
-}
-#endif /* ENABLE_ACLK */
-
-void add_aclk_host_labels(void) {
- DICTIONARY *labels = localhost->host_labels;
-
-#ifdef ENABLE_ACLK
- rrdlabels_add(labels, "_aclk_ng_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#else
- rrdlabels_add(labels, "_aclk_ng_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#endif
- rrdlabels_add(labels, "_aclk_legacy_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#ifdef ENABLE_ACLK
- ACLK_PROXY_TYPE aclk_proxy;
- char *proxy_str;
- aclk_get_proxy(&aclk_proxy);
-
- switch(aclk_proxy) {
- case PROXY_TYPE_SOCKS5:
- proxy_str = "SOCKS5";
- break;
- case PROXY_TYPE_HTTP:
- proxy_str = "HTTP";
- break;
- default:
- proxy_str = "none";
- break;
- }
-
-
- int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-
- rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO);
- rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO);
- rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
- rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
-#endif
-}
-
-char *aclk_state(void) {
-#ifndef ENABLE_ACLK
- return strdupz("ACLK Available: No");
-#else
- return ng_aclk_state();
-#endif
-}
-
-char *aclk_state_json(void) {
-#ifndef ENABLE_ACLK
- return strdupz("{\"aclk-available\":false}");
-#else
- return ng_aclk_state_json();
-#endif
-}
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
deleted file mode 100644
index 36a6d603f..000000000
--- a/aclk/aclk_api.h
+++ /dev/null
@@ -1,45 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#ifndef ACLK_API_H
-#define ACLK_API_H
-
-#include "libnetdata/libnetdata.h"
-
-#include "aclk_proxy.h"
-
-// TODO get rid global vars as soon as
-// ACLK Legacy is removed
-extern int aclk_connected;
-extern int aclk_kill_link;
-
-extern usec_t aclk_session_us;
-extern time_t aclk_session_sec;
-
-extern int aclk_disable_runtime;
-
-extern int aclk_stats_enabled;
-extern int aclk_alert_reloaded;
-
-extern int use_mqtt_5;
-extern int aclk_ctx_based;
-
-#ifdef ENABLE_ACLK
-void *aclk_starter(void *ptr);
-
-void aclk_host_state_update(RRDHOST *host, int connect);
-
-#define NETDATA_ACLK_HOOK \
- { .name = "ACLK_Main", \
- .config_section = NULL, \
- .config_name = NULL, \
- .enabled = 1, \
- .thread = NULL, \
- .init_routine = NULL, \
- .start_routine = aclk_starter },
-
-#endif
-
-void add_aclk_host_labels(void);
-char *aclk_state(void);
-char *aclk_state_json(void);
-
-#endif /* ACLK_API_H */
diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c
new file mode 100644
index 000000000..df9d18f63
--- /dev/null
+++ b/aclk/aclk_capas.c
@@ -0,0 +1,47 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_capas.h"
+
+#include "ml/ml.h"
+
+const struct capability *aclk_get_agent_capas()
+{
+ static struct capability agent_capabilities[] = {
+ { .name = "json", .version = 2, .enabled = 0 },
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = 0, .enabled = 0 },
+ { .name = "mc", .version = 0, .enabled = 0 },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 1, .enabled = 1 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ agent_capabilities[2].version = ml_capable() ? 1 : 0;
+ agent_capabilities[2].enabled = ml_enabled(localhost);
+
+ agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0;
+ agent_capabilities[3].enabled = enable_metric_correlations;
+
+ return agent_capabilities;
+}
+
+struct capability *aclk_get_node_instance_capas(RRDHOST *host)
+{
+ struct capability ni_caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) },
+ { .name = "mc",
+ .version = enable_metric_correlations ? metric_correlations_version : 0,
+ .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = 1 },
+ { .name = "funcs", .version = 0, .enabled = 0 },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) {
+ ni_caps[4].version = 1;
+ ni_caps[4].enabled = 1;
+ }
+
+ struct capability *ret = mallocz(sizeof(ni_caps));
+ memcpy(ret, ni_caps, sizeof(ni_caps));
+ return ret;
+}
diff --git a/aclk/aclk_capas.h b/aclk/aclk_capas.h
new file mode 100644
index 000000000..c39a197b8
--- /dev/null
+++ b/aclk/aclk_capas.h
@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_CAPAS_H
+#define ACLK_CAPAS_H
+
+#include "daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+#include "schema-wrappers/capability.h"
+
+const struct capability *aclk_get_agent_capas();
+struct capability *aclk_get_node_instance_capas(RRDHOST *host);
+
+#endif /* ACLK_CAPAS_H */
diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c
deleted file mode 100644
index 51d8dad58..000000000
--- a/aclk/aclk_charts_api.c
+++ /dev/null
@@ -1,77 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#include "aclk_charts_api.h"
-
-#include "aclk_query_queue.h"
-
-#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated"
-
-void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
-{
- aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
- query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
- query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
-
-void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
-{
- aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
- query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
- query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
-
-void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
-{
- aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
- query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id);
- query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
-
-void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size)
-{
- aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED;
- query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size);
- query->data.bin_payload.msg_name = "ChartConfigsUpdated";
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
-
-void aclk_chart_reset(chart_reset_t reset)
-{
- aclk_query_t query = aclk_query_new(CHART_RESET);
- query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET;
- query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset);
- query->data.bin_payload.msg_name = "ResetChartMessages";
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
-
-void aclk_retention_updated(struct retention_updated *data)
-{
- aclk_query_t query = aclk_query_new(RETENTION_UPDATED);
- query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED;
- query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data);
- query->data.bin_payload.msg_name = "RetentionUpdated";
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
-
-void aclk_update_node_info(struct update_node_info *info)
-{
- aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO);
- query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO;
- query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info);
- query->data.bin_payload.msg_name = "UpdateNodeInfo";
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
-
-void aclk_update_node_collectors(struct update_node_collectors *collectors)
-{
- aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS);
- query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS;
- query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors);
- query->data.bin_payload.msg_name = "UpdateNodeCollectors";
- QUEUE_IF_PAYLOAD_PRESENT(query);
-}
diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h
deleted file mode 100644
index 71f07dd33..000000000
--- a/aclk/aclk_charts_api.h
+++ /dev/null
@@ -1,22 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#ifndef ACLK_CHARTS_H
-#define ACLK_CHARTS_H
-
-#include "../daemon/common.h"
-#include "schema-wrappers/schema_wrappers.h"
-
-void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
-void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
-void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id);
-
-void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size);
-
-void aclk_chart_reset(chart_reset_t reset);
-
-void aclk_retention_updated(struct retention_updated *data);
-
-void aclk_update_node_info(struct update_node_info *info);
-
-void aclk_update_node_collectors(struct update_node_collectors *collectors);
-
-#endif /* ACLK_CHARTS_H */
diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c
index f17d3cabd..f3344935e 100644
--- a/aclk/aclk_contexts_api.c
+++ b/aclk/aclk_contexts_api.c
@@ -21,3 +21,21 @@ void aclk_send_contexts_updated(contexts_updated_t data)
query->data.bin_payload.msg_name = "ContextsUpdated";
QUEUE_IF_PAYLOAD_PRESENT(query);
}
+
+void aclk_update_node_collectors(struct update_node_collectors *collectors)
+{
+ aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS);
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS;
+ query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors);
+ query->data.bin_payload.msg_name = "UpdateNodeCollectors";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_update_node_info(struct update_node_info *info)
+{
+ aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO);
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO;
+ query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info);
+ query->data.bin_payload.msg_name = "UpdateNodeInfo";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h
index 46b916d22..f0b5ec77e 100644
--- a/aclk/aclk_contexts_api.h
+++ b/aclk/aclk_contexts_api.h
@@ -7,6 +7,8 @@
void aclk_send_contexts_snapshot(contexts_snapshot_t data);
void aclk_send_contexts_updated(contexts_updated_t data);
+void aclk_update_node_collectors(struct update_node_collectors *collectors);
+void aclk_update_node_info(struct update_node_info *info);
#endif /* ACLK_CONTEXTS_API_H */
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index b7bf173c4..2bdbb70fb 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -13,7 +13,7 @@ static int aclk_https_request(https_req_t *request, https_req_response_t *respon
int rc;
// wrapper for ACLK only which loads ACLK specific proxy settings
// then only calls https_request
- struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
+ struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
if (proxy_conf.type == MQTT_WSS_PROXY_HTTP) {
@@ -380,7 +380,7 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **
base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64));
if (*challenge_bytes != CHALLENGE_LEN) {
error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN);
- freez(challenge);
+ freez(*challenge);
*challenge = NULL;
goto cleanup_json;
}
@@ -490,7 +490,7 @@ int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **m
int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target)
#endif
{
- unsigned char *challenge;
+ unsigned char *challenge = NULL;
int challenge_bytes;
char *agent_id = get_agent_claimid();
@@ -844,10 +844,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
return 1;
}
- if (rrdcontext_enabled)
- buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
- else
- buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
freez(agent_id);
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index 981c01965..5301c281f 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -38,9 +38,7 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id)
int res;
uuid_t node_id_bin, host_id_bin;
- rrd_rdlock();
- RRDHOST *host = find_host_by_node_id((char *) node_id);
- rrd_unlock();
+ RRDHOST *host = find_host_by_node_id((char *)node_id);
if (host)
return host;
@@ -54,7 +52,7 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id)
return NULL;
}
uuid_unparse_lower(host_id_bin, host_id);
- return rrdhost_find_by_guid(host_id, 0);
+ return rrdhost_find_by_guid(host_id);
}
#define NODE_ID_QUERY "/node/"
@@ -82,7 +80,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
- w->acl = 0x1f;
+ w->acl = WEB_CLIENT_ACL_ACLK;
buffer_strcat(log_buffer, query->data.http_api_v2.query);
size_t size = 0;
@@ -101,7 +99,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
}
- RRDHOST *temp_host = NULL;
if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
@@ -116,14 +113,11 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
query_host = node_id_2_rrdhost(nodeid);
if (!query_host) {
- temp_host = sql_create_host_by_uuid(nodeid);
- if (!temp_host) {
- error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
- retval = 1;
- w->response.code = 404;
- aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
- goto cleanup;
- }
+ error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
+ retval = 1;
+ w->response.code = 404;
+ aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
+ goto cleanup;
}
}
@@ -144,8 +138,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// execute the query
- t = aclk_web_api_v1_request(query_host ? query_host : temp_host, w, mysep ? mysep + 1 : "noop");
- free_temporary_host(temp_host);
+ t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
sent = size;
@@ -263,7 +256,7 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
return 0;
}
-const char *aclk_query_get_name(aclk_query_type_t qt)
+const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok)
{
switch (qt) {
case HTTP_API_V2: return "http_api_request_v2";
@@ -280,7 +273,8 @@ const char *aclk_query_get_name(aclk_query_type_t qt)
case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
default:
- error_report("Unknown query type used %d", (int) qt);
+ if (!unknown_ok)
+ error_report("Unknown query type used %d", (int) qt);
return "unknown";
}
}
@@ -329,7 +323,7 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
static void worker_aclk_register(void) {
worker_register("ACLKQUERY");
for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
- worker_register_job_name(i, aclk_query_get_name(i));
+ worker_register_job_name(i, aclk_query_get_name(i, 0));
}
}
diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h
index f86754a2a..c006b0138 100644
--- a/aclk/aclk_query.h
+++ b/aclk/aclk_query.h
@@ -31,6 +31,6 @@ struct aclk_query_threads {
void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
-const char *aclk_query_get_name(aclk_query_type_t qt);
+const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok);
#endif //NETDATA_AGENT_CLOUD_LINK_H
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 01b20d23f..9a450571e 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -111,22 +111,6 @@ void aclk_query_free(aclk_query_t query)
freez(query->data.http_api_v2.query);
break;
- case NODE_STATE_UPDATE:
- case REGISTER_NODE:
- case CHART_DIMS_UPDATE:
- case CHART_CONFIG_UPDATED:
- case CHART_RESET:
- case RETENTION_UPDATED:
- case UPDATE_NODE_INFO:
- case ALARM_LOG_HEALTH:
- case ALARM_PROVIDE_CFG:
- case ALARM_SNAPSHOT:
- case UPDATE_NODE_COLLECTORS:
- case PROTO_BIN_MESSAGE:
- if (!use_mqtt_5)
- freez(query->data.bin_payload.payload);
- break;
-
default:
break;
}
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index e6ed332cc..83bc5508b 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk.h"
+#include "aclk_capas.h"
#include "schema-wrappers/proto_2_json.h"
@@ -274,7 +275,7 @@ int create_node_instance_result(const char *msg, size_t msg_len)
.node_id = res.node_id
};
- RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0);
+ RRDHOST *host = rrdhost_find_by_guid(res.machine_guid);
if (host) {
// not all host must have RRDHOST struct created for them
// if they never connected during runtime of agent
@@ -289,20 +290,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
}
- struct capability caps[] = {
- { .name = "proto", .version = 1, .enabled = 1 },
- { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
- node_state_update.capabilities = caps;
+ node_state_update.capabilities = aclk_get_node_instance_capas(host);
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
+ freez((void *)node_state_update.capabilities);
+
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
@@ -322,44 +318,25 @@ int send_node_instances(const char *msg, size_t msg_len)
int stream_charts_and_dimensions(const char *msg, size_t msg_len)
{
- aclk_ctx_based = 0;
- stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return 1;
- }
- chart_batch_id = res.batch_id;
- aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id);
- freez(res.claim_id);
- freez(res.node_id);
+ UNUSED(msg);
+ UNUSED(msg_len);
+ error_report("Received obsolete StreamChartsAndDimensions msg");
return 0;
}
int charts_and_dimensions_ack(const char *msg, size_t msg_len)
{
- chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len);
- if (!res.claim_id || !res.node_id) {
- error("Error parsing StreamChartsAndDimensions msg");
- freez(res.claim_id);
- freez(res.node_id);
- return 1;
- }
- aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id);
- freez(res.claim_id);
- freez(res.node_id);
+ UNUSED(msg);
+ UNUSED(msg_len);
+ error_report("Received obsolete StreamChartsAndDimensionsAck msg");
return 0;
}
int update_chart_configs(const char *msg, size_t msg_len)
{
- struct update_chart_config res = parse_update_chart_config(msg, msg_len);
- if (!res.claim_id || !res.node_id || !res.hashes)
- error("Error parsing UpdateChartConfigs msg");
- else
- aclk_get_chart_config(res.hashes);
- destroy_update_chart_config(&res);
+ UNUSED(msg);
+ UNUSED(msg_len);
+ error_report("Received obsolete UpdateChartConfigs msg");
return 0;
}
@@ -527,7 +504,7 @@ unsigned int aclk_init_rx_msg_handlers(void)
return i;
}
-void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic)
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic __maybe_unused)
{
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
@@ -546,15 +523,16 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
return;
}
-#ifdef NETDATA_INTERNAL_CHECKS
- if (!strncmp(message_type, "cmd", strlen("cmd"))) {
- log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name);
- } else {
- char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name);
- log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name);
- freez(json);
+
+ if (aclklog_enabled) {
+ if (!strncmp(message_type, "cmd", strlen("cmd"))) {
+ log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name);
+ } else {
+ char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name);
+ log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name);
+ freez(json);
+ }
}
-#endif
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 241e9b724..215313ff9 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -39,8 +39,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc
"connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE);
rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st_aclkstats);
+ }
rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online);
@@ -60,8 +59,7 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st_query_thread);
+ }
rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued);
rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched);
@@ -83,8 +81,8 @@ static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample)
rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
+ }
+
if(per_sample->latency_count)
rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count));
else
@@ -109,8 +107,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
+ }
rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err);
rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
@@ -129,10 +126,9 @@ static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample
"netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
- dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ dims[i] = rrddim_add(st, aclk_query_get_name(i, 1), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
+ }
for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++)
rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]);
@@ -171,8 +167,7 @@ static void aclk_stats_cloud_req_http_type(struct aclk_metrics_per_sample *per_s
for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++)
rd_rq_types[i] = rrddim_add(st, cloud_req_http_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
+ }
for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++)
rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_http_by_type[i]);
@@ -197,8 +192,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
error("snprintf encoding error");
aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
}
- } else
- rrdset_next(st);
+ }
for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) {
rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
@@ -222,8 +216,7 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
+ }
if(per_sample->cloud_q_process_count)
rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count));
@@ -248,8 +241,7 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) {
aclk_stats_cfg.rx_msg_dims[i] = rrddim_add(st, rx_handler_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
}
- } else
- rrdset_next(st);
+ }
for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++)
rrddim_set_by_pointer(st, aclk_stats_cfg.rx_msg_dims[i], rx_msgs_sample[i]);
@@ -275,8 +267,7 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
- } else
- rrdset_next(st);
+ }
rrddim_set_by_pointer(st, rd_sent, sent);
rrddim_set_by_pointer(st, rd_recvd, recvd);
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 822a90fa2..532b964ad 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -5,6 +5,7 @@
#include "aclk_util.h"
#include "aclk_stats.h"
#include "aclk.h"
+#include "aclk_capas.h"
#include "schema-wrappers/proto_2_json.h"
@@ -15,6 +16,13 @@
// version for aclk legacy (old cloud arch)
#define ACLK_VERSION 2
+static void freez_aclk_publish5a(void *ptr) {
+ freez(ptr);
+}
+static void freez_aclk_publish5b(void *ptr) {
+ freez(ptr);
+}
+
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
#ifndef ACLK_LOG_CONVERSATION_DIR
@@ -28,43 +36,27 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return 0;
}
- if (use_mqtt_5)
- mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
- else
- mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
+ mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
- char *json = protomsg_to_json(msg, msg_len, msgname);
- log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
- freez(json);
#endif
+ if (aclklog_enabled) {
+ char *json = protomsg_to_json(msg, msg_len, msgname);
+ log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
+ freez(json);
+ }
+
return packet_id;
}
-/* UNUSED now but can be used soon MVP1?
-static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
+// json_object_put returns int unfortunately :D
+// we need void(*fnc)(void *);
+static void json_object_put_wrapper(void *jsonobj)
{
- if (unlikely(!topic || topic[0] != '/')) {
- error ("Full topic required!");
- return;
- }
-
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
-
- mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published();
-#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif
+ json_object_put(jsonobj);
}
-*/
#define TOPIC_MAX_LEN 512
#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
@@ -73,10 +65,11 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
uint16_t packet_id;
const char *str;
char *full_msg = NULL;
- int len, rc;
+ int len;
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
+ json_object_put(msg);
return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
@@ -87,40 +80,20 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
memcpy(full_msg, str, len);
+ json_object_put(msg);
+ msg = NULL;
memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
memcpy(&full_msg[len], payload, payload_len);
len += payload_len;
}
-/* TODO
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif */
-
- if (use_mqtt_5)
- mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
- else {
- rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
- error("Timeout sending binpacked message");
- freez(full_msg);
- return HTTP_RESP_BACKEND_FETCH_FAILED;
- }
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
- error("Message is bigger than allowed maximum");
- freez(full_msg);
- return HTTP_RESP_FORBIDDEN;
- }
- }
+ mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
- freez(full_msg);
+
return 0;
}
@@ -203,7 +176,6 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char
if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
error("Failed to send cancelation message for http reply");
}
- json_object_put(msg);
}
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
@@ -222,7 +194,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
json_object_object_add(msg, "http-code", tmp);
int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
- json_object_put(msg);
switch (rc) {
case HTTP_RESP_FORBIDDEN:
@@ -241,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
size_t len;
uint16_t pid;
- struct capability agent_capabilities[] = {
- { .name = "json", .version = 2, .enabled = 0 },
- { .name = "proto", .version = 1, .enabled = 1 },
-#ifdef ENABLE_ML
- { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
-#endif
- { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
- { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
-
update_agent_connection_t conn = {
.reachable = (reachable ? 1 : 0),
.lwt = 0,
.session_id = aclk_session_newarch,
- .capabilities = agent_capabilities
+ .capabilities = aclk_get_agent_capas()
};
rrdhost_aclk_state_lock(localhost);
@@ -279,8 +239,6 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
}
pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
- if (!use_mqtt_5)
- freez(msg);
if (localhost->aclk_state.prev_claimed_id) {
freez(localhost->aclk_state.prev_claimed_id);
localhost->aclk_state.prev_claimed_id = NULL;
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index ec021aec5..01eaedc8e 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_util.h"
+#include "aclk_proxy.h"
#include "daemon/common.h"
diff --git a/aclk/helpers/mqtt_wss_pal.h b/aclk/helpers/mqtt_wss_pal.h
new file mode 100644
index 000000000..5c89f8bb7
--- /dev/null
+++ b/aclk/helpers/mqtt_wss_pal.h
@@ -0,0 +1,19 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef MQTT_WSS_PAL_H
+#define MQTT_WSS_PAL_H
+
+#include "libnetdata/libnetdata.h"
+
+#undef OPENSSL_VERSION_095
+#undef OPENSSL_VERSION_097
+#undef OPENSSL_VERSION_110
+#undef OPENSSL_VERSION_111
+
+#define mw_malloc(...) mallocz(__VA_ARGS__)
+#define mw_calloc(...) callocz(__VA_ARGS__)
+#define mw_free(...) freez(__VA_ARGS__)
+#define mw_strdup(...) strdupz(__VA_ARGS__)
+#define mw_realloc(...) reallocz(__VA_ARGS__)
+
+#endif /* MQTT_WSS_PAL_H */
diff --git a/aclk/helpers/ringbuffer_pal.h b/aclk/helpers/ringbuffer_pal.h
new file mode 100644
index 000000000..2f7e1cb93
--- /dev/null
+++ b/aclk/helpers/ringbuffer_pal.h
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef RINGBUFFER_PAL_H
+#define RINGBUFFER_PAL_H
+
+#include "libnetdata/libnetdata.h"
+
+#define crbuf_malloc(...) mallocz(__VA_ARGS__)
+#define crbuf_free(...) freez(__VA_ARGS__)
+
+#endif /* RINGBUFFER_PAL_H */
diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc
index 769806f90..af45740a9 100644
--- a/aclk/schema-wrappers/capability.cc
+++ b/aclk/schema-wrappers/capability.cc
@@ -4,7 +4,7 @@
#include "capability.h"
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) {
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) {
proto_capa->set_name(c_capa->name);
proto_capa->set_enabled(c_capa->enabled);
proto_capa->set_version(c_capa->version);
diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h
index 9517a8716..c6085a44b 100644
--- a/aclk/schema-wrappers/capability.h
+++ b/aclk/schema-wrappers/capability.h
@@ -18,7 +18,7 @@ struct capability {
#include "proto/aclk/v1/lib.pb.h"
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa);
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa);
#endif
#endif /* ACLK_SCHEMA_CAPABILITY_H */
diff --git a/aclk/schema-wrappers/chart_config.cc b/aclk/schema-wrappers/chart_config.cc
deleted file mode 100644
index 87e34e0df..000000000
--- a/aclk/schema-wrappers/chart_config.cc
+++ /dev/null
@@ -1,105 +0,0 @@
-#include "chart_config.h"
-
-#include "proto/chart/v1/config.pb.h"
-
-#include "libnetdata/libnetdata.h"
-
-#include "schema_wrapper_utils.h"
-
-void destroy_update_chart_config(struct update_chart_config *cfg)
-{
- freez(cfg->claim_id);
- freez(cfg->node_id);
- freez(cfg->hashes);
-}
-
-void destroy_chart_config_updated(struct chart_config_updated *cfg)
-{
- freez(cfg->type);
- freez(cfg->family);
- freez(cfg->context);
- freez(cfg->title);
- freez(cfg->plugin);
- freez(cfg->module);
- freez(cfg->units);
- freez(cfg->config_hash);
-}
-
-struct update_chart_config parse_update_chart_config(const char *data, size_t len)
-{
- chart::v1::UpdateChartConfigs cfgs;
- update_chart_config res;
- memset(&res, 0, sizeof(res));
-
- if (!cfgs.ParseFromArray(data, len))
- return res;
-
- res.claim_id = strdupz(cfgs.claim_id().c_str());
- res.node_id = strdupz(cfgs.node_id().c_str());
-
- // to not do bazillion tiny allocations for individual strings
- // we calculate how much memory we will need for all of them
- // and allocate at once
- int hash_count = cfgs.config_hashes_size();
- size_t total_strlen = 0;
- for (int i = 0; i < hash_count; i++)
- total_strlen += cfgs.config_hashes(i).length();
- total_strlen += hash_count; //null bytes
-
- res.hashes = (char**)callocz( 1,
- (hash_count+1) * sizeof(char*) + //char * array incl. terminating NULL at the end
- total_strlen //strings themselves incl. 1 null byte each
- );
-
- char* dest = ((char*)res.hashes) + (hash_count + 1 /* NULL ptr */) * sizeof(char *);
- // now copy them strings
- // null bytes handled by callocz
- for (int i = 0; i < hash_count; i++) {
- strcpy(dest, cfgs.config_hashes(i).c_str());
- res.hashes[i] = dest;
- dest += strlen(dest) + 1 /* end string null */;
- }
-
- return res;
-}
-
-char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size)
-{
- chart::v1::ChartConfigsUpdated configs;
- for (int i = 0; i < list_size; i++) {
- chart::v1::ChartConfigUpdated *config = configs.add_configs();
- config->set_type(config_list[i].type);
- if (config_list[i].family)
- config->set_family(config_list[i].family);
- config->set_context(config_list[i].context);
- config->set_title(config_list[i].title);
- config->set_priority(config_list[i].priority);
- config->set_plugin(config_list[i].plugin);
-
- if (config_list[i].module)
- config->set_module(config_list[i].module);
-
- switch (config_list[i].chart_type) {
- case RRDSET_TYPE_LINE:
- config->set_chart_type(chart::v1::LINE);
- break;
- case RRDSET_TYPE_AREA:
- config->set_chart_type(chart::v1::AREA);
- break;
- case RRDSET_TYPE_STACKED:
- config->set_chart_type(chart::v1::STACKED);
- break;
- default:
- return NULL;
- }
-
- config->set_units(config_list[i].units);
- config->set_config_hash(config_list[i].config_hash);
- }
-
- *len = PROTO_COMPAT_MSG_SIZE(configs);
- char *bin = (char*)mallocz(*len);
- configs.SerializeToArray(bin, *len);
-
- return bin;
-}
diff --git a/aclk/schema-wrappers/chart_config.h b/aclk/schema-wrappers/chart_config.h
deleted file mode 100644
index f08f76b61..000000000
--- a/aclk/schema-wrappers/chart_config.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H
-#define ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H
-
-#include <stdlib.h>
-
-#include "database/rrd.h"
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-struct update_chart_config {
- char *claim_id;
- char *node_id;
- char **hashes;
-};
-
-enum chart_config_chart_type {
- LINE,
- AREA,
- STACKED
-};
-
-struct chart_config_updated {
- char *type;
- char *family;
- char *context;
- char *title;
- uint64_t priority;
- char *plugin;
- char *module;
- RRDSET_TYPE chart_type;
- char *units;
- char *config_hash;
-};
-
-void destroy_update_chart_config(struct update_chart_config *cfg);
-void destroy_chart_config_updated(struct chart_config_updated *cfg);
-
-struct update_chart_config parse_update_chart_config(const char *data, size_t len);
-
-char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H */
diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc
deleted file mode 100644
index 54c940758..000000000
--- a/aclk/schema-wrappers/chart_stream.cc
+++ /dev/null
@@ -1,337 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "aclk/aclk_util.h"
-
-#include "proto/chart/v1/stream.pb.h"
-#include "chart_stream.h"
-
-#include "schema_wrapper_utils.h"
-
-#include <sys/time.h>
-#include <stdlib.h>
-
-stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len)
-{
- chart::v1::StreamChartsAndDimensions msg;
- stream_charts_and_dims_t res;
- memset(&res, 0, sizeof(res));
-
- if (!msg.ParseFromArray(data, len))
- return res;
-
- res.node_id = strdup(msg.node_id().c_str());
- res.claim_id = strdup(msg.claim_id().c_str());
- res.seq_id = msg.sequence_id();
- res.batch_id = msg.batch_id();
- set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at);
-
- return res;
-}
-
-chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len)
-{
- chart::v1::ChartsAndDimensionsAck msg;
- chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 };
-
- if (!msg.ParseFromArray(data, len))
- return res;
-
- res.node_id = strdup(msg.node_id().c_str());
- res.claim_id = strdup(msg.claim_id().c_str());
- res.last_seq_id = msg.last_sequence_id();
-
- return res;
-}
-
-char *generate_reset_chart_messages(size_t *len, chart_reset_t reset)
-{
- chart::v1::ResetChartMessages msg;
-
- msg.set_claim_id(reset.claim_id);
- msg.set_node_id(reset.node_id);
- switch (reset.reason) {
- case DB_EMPTY:
- msg.set_reason(chart::v1::ResetReason::DB_EMPTY);
- break;
- case SEQ_ID_NOT_EXISTS:
- msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS);
- break;
- case TIMESTAMP_MISMATCH:
- msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH);
- break;
- default:
- return NULL;
- }
-
- *len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)malloc(*len);
- if (bin)
- msg.SerializeToArray(bin, *len);
-
- return bin;
-}
-
-void chart_instance_updated_destroy(struct chart_instance_updated *instance)
-{
- freez((char*)instance->id);
- freez((char*)instance->claim_id);
-
- rrdlabels_destroy(instance->chart_labels);
-
- freez((char*)instance->config_hash);
-}
-
-static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update)
-{
- google::protobuf::Map<std::string, std::string> *map;
- aclk_lib::v1::ACLKMessagePosition *pos;
-
- chart->set_id(update->id);
- chart->set_claim_id(update->claim_id);
- chart->set_node_id(update->node_id);
- chart->set_name(update->name);
-
- map = chart->mutable_chart_labels();
- rrdlabels_walkthrough_read(update->chart_labels, label_add_to_map_callback, map);
-
- switch (update->memory_mode) {
- case RRD_MEMORY_MODE_NONE:
- chart->set_memory_mode(chart::v1::NONE);
- break;
- case RRD_MEMORY_MODE_RAM:
- chart->set_memory_mode(chart::v1::RAM);
- break;
- case RRD_MEMORY_MODE_MAP:
- chart->set_memory_mode(chart::v1::MAP);
- break;
- case RRD_MEMORY_MODE_SAVE:
- chart->set_memory_mode(chart::v1::SAVE);
- break;
- case RRD_MEMORY_MODE_ALLOC:
- chart->set_memory_mode(chart::v1::ALLOC);
- break;
- case RRD_MEMORY_MODE_DBENGINE:
- chart->set_memory_mode(chart::v1::DB_ENGINE);
- break;
- default:
- return 1;
- break;
- }
-
- chart->set_update_every_interval(update->update_every);
- chart->set_config_hash(update->config_hash);
-
- pos = chart->mutable_position();
- pos->set_sequence_id(update->position.sequence_id);
- pos->set_previous_sequence_id(update->position.previous_sequence_id);
- set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at());
-
- return 0;
-}
-
-static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim)
-{
- aclk_lib::v1::ACLKMessagePosition *pos;
-
- dim->set_id(c_dim->id);
- dim->set_chart_id(c_dim->chart_id);
- dim->set_node_id(c_dim->node_id);
- dim->set_claim_id(c_dim->claim_id);
- dim->set_name(c_dim->name);
-
- set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at());
- set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp());
-
- pos = dim->mutable_position();
- pos->set_sequence_id(c_dim->position.sequence_id);
- pos->set_previous_sequence_id(c_dim->position.previous_sequence_id);
- set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at());
-
- return 0;
-}
-
-char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
-{
- chart::v1::ChartsAndDimensionsUpdated msg;
- chart::v1::ChartInstanceUpdated db_chart;
- chart::v1::ChartDimensionUpdated db_dim;
- aclk_lib::v1::ACLKMessagePosition *pos;
-
- msg.set_batch_id(batch_id);
-
- for (int i = 0; payloads[i]; i++) {
- if (is_dim[i]) {
- if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) {
- error("[ACLK] Could not parse chart::v1::chart_dimension_updated");
- return NULL;
- }
-
- pos = db_dim.mutable_position();
- pos->set_sequence_id(new_positions[i].sequence_id);
- pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
- set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
-
- chart::v1::ChartDimensionUpdated *dim = msg.add_dimensions();
- *dim = db_dim;
- } else {
- if (!db_chart.ParseFromArray(payloads[i], payload_sizes[i])) {
- error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated");
- return NULL;
- }
-
- pos = db_chart.mutable_position();
- pos->set_sequence_id(new_positions[i].sequence_id);
- pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
- set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
-
- chart::v1::ChartInstanceUpdated *chart = msg.add_charts();
- *chart = db_chart;
- }
- }
-
- *len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)mallocz(*len);
- msg.SerializeToArray(bin, *len);
-
- return bin;
-}
-
-char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
-{
- chart::v1::ChartsAndDimensionsUpdated msg;
-
- msg.set_batch_id(chart_batch_id);
-
- for (int i = 0; payloads[i]; i++) {
- chart::v1::ChartInstanceUpdated db_msg;
- chart::v1::ChartInstanceUpdated *chart;
- aclk_lib::v1::ACLKMessagePosition *pos;
-
- if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) {
- error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated");
- return NULL;
- }
-
- pos = db_msg.mutable_position();
- pos->set_sequence_id(new_positions[i].sequence_id);
- pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
- set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
-
- chart = msg.add_charts();
- *chart = db_msg;
- }
-
- *len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)mallocz(*len);
- msg.SerializeToArray(bin, *len);
-
- return bin;
-}
-
-char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
-{
- chart::v1::ChartsAndDimensionsUpdated msg;
-
- msg.set_batch_id(chart_batch_id);
-
- for (int i = 0; payloads[i]; i++) {
- chart::v1::ChartDimensionUpdated db_msg;
- chart::v1::ChartDimensionUpdated *dim;
- aclk_lib::v1::ACLKMessagePosition *pos;
-
- if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) {
- error("[ACLK] Could not parse chart::v1::chart_dimension_updated");
- return NULL;
- }
-
- pos = db_msg.mutable_position();
- pos->set_sequence_id(new_positions[i].sequence_id);
- pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
- set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
-
- dim = msg.add_dimensions();
- *dim = db_msg;
- }
-
- *len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)mallocz(*len);
- msg.SerializeToArray(bin, *len);
-
- return bin;
-}
-
-char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update)
-{
- chart::v1::ChartInstanceUpdated *chart = new chart::v1::ChartInstanceUpdated();
-
- if (set_chart_instance_updated(chart, update))
- return NULL;
-
- *len = PROTO_COMPAT_MSG_SIZE_PTR(chart);
- char *bin = (char*)mallocz(*len);
- chart->SerializeToArray(bin, *len);
-
- delete chart;
- return bin;
-}
-
-char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim)
-{
- chart::v1::ChartDimensionUpdated *proto_dim = new chart::v1::ChartDimensionUpdated();
-
- if (set_chart_dim_updated(proto_dim, dim))
- return NULL;
-
- *len = PROTO_COMPAT_MSG_SIZE_PTR(proto_dim);
- char *bin = (char*)mallocz(*len);
- proto_dim->SerializeToArray(bin, *len);
-
- delete proto_dim;
- return bin;
-}
-
-using namespace google::protobuf;
-
-char *generate_retention_updated(size_t *len, struct retention_updated *data)
-{
- chart::v1::RetentionUpdated msg;
-
- msg.set_claim_id(data->claim_id);
- msg.set_node_id(data->node_id);
-
- switch (data->memory_mode) {
- case RRD_MEMORY_MODE_NONE:
- msg.set_memory_mode(chart::v1::NONE);
- break;
- case RRD_MEMORY_MODE_RAM:
- msg.set_memory_mode(chart::v1::RAM);
- break;
- case RRD_MEMORY_MODE_MAP:
- msg.set_memory_mode(chart::v1::MAP);
- break;
- case RRD_MEMORY_MODE_SAVE:
- msg.set_memory_mode(chart::v1::SAVE);
- break;
- case RRD_MEMORY_MODE_ALLOC:
- msg.set_memory_mode(chart::v1::ALLOC);
- break;
- case RRD_MEMORY_MODE_DBENGINE:
- msg.set_memory_mode(chart::v1::DB_ENGINE);
- break;
- default:
- return NULL;
- }
-
- for (int i = 0; i < data->interval_duration_count; i++) {
- Map<uint32, uint32> *map = msg.mutable_interval_durations();
- map->insert({data->interval_durations[i].update_every, data->interval_durations[i].retention});
- }
-
- set_google_timestamp_from_timeval(data->rotation_timestamp, msg.mutable_rotation_timestamp());
-
- *len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)mallocz(*len);
- msg.SerializeToArray(bin, *len);
-
- return bin;
-}
diff --git a/aclk/schema-wrappers/chart_stream.h b/aclk/schema-wrappers/chart_stream.h
deleted file mode 100644
index 904866868..000000000
--- a/aclk/schema-wrappers/chart_stream.h
+++ /dev/null
@@ -1,121 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef ACLK_SCHEMA_WRAPPER_CHART_STREAM_H
-#define ACLK_SCHEMA_WRAPPER_CHART_STREAM_H
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#include "database/rrd.h"
-
-typedef struct {
- char* claim_id;
- char* node_id;
-
- uint64_t seq_id;
- uint64_t batch_id;
-
- struct timeval seq_id_created_at;
-} stream_charts_and_dims_t;
-
-stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len);
-
-typedef struct {
- char* claim_id;
- char* node_id;
-
- uint64_t last_seq_id;
-} chart_and_dim_ack_t;
-
-chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len);
-
-enum chart_reset_reason {
- DB_EMPTY,
- SEQ_ID_NOT_EXISTS,
- TIMESTAMP_MISMATCH
-};
-
-typedef struct {
- char *claim_id;
- char *node_id;
-
- enum chart_reset_reason reason;
-} chart_reset_t;
-
-char *generate_reset_chart_messages(size_t *len, const chart_reset_t reset);
-
-struct aclk_message_position {
- uint64_t sequence_id;
- struct timeval seq_id_creation_time;
- uint64_t previous_sequence_id;
-};
-
-struct chart_instance_updated {
- const char *id;
- const char *claim_id;
- const char *node_id;
- const char *name;
-
- DICTIONARY *chart_labels;
-
- RRD_MEMORY_MODE memory_mode;
-
- uint32_t update_every;
- const char * config_hash;
-
- struct aclk_message_position position;
-};
-
-void chart_instance_updated_destroy(struct chart_instance_updated *instance);
-
-struct chart_dimension_updated {
- const char *id;
- const char *chart_id;
- const char *node_id;
- const char *claim_id;
- const char *name;
- struct timeval created_at;
- struct timeval last_timestamp;
- struct aclk_message_position position;
-};
-
-typedef struct {
- struct chart_instance_updated *charts;
- uint16_t chart_count;
-
- struct chart_dimension_updated *dims;
- uint16_t dim_count;
-
- uint64_t batch_id;
-} charts_and_dims_updated_t;
-
-struct interval_duration {
- uint32_t update_every;
- uint32_t retention;
-};
-
-struct retention_updated {
- char *claim_id;
- char *node_id;
-
- RRD_MEMORY_MODE memory_mode;
-
- struct interval_duration *interval_durations;
- int interval_duration_count;
-
- struct timeval rotation_timestamp;
-};
-
-char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id);
-char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
-char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update);
-char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
-char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim);
-char *generate_retention_updated(size_t *len, struct retention_updated *data);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* ACLK_SCHEMA_WRAPPER_CHART_STREAM_H */
diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc
index 7520a4600..20b40ece2 100644
--- a/aclk/schema-wrappers/connection.cc
+++ b/aclk/schema-wrappers/connection.cc
@@ -29,7 +29,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio
timestamp->set_nanos(tv.tv_usec * 1000);
if (data->capabilities) {
- struct capability *capa = data->capabilities;
+ const struct capability *capa = data->capabilities;
while (capa->name) {
aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities();
capability_set(proto_capa, capa);
@@ -38,7 +38,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio
}
*len = PROTO_COMPAT_MSG_SIZE(connupd);
- char *msg = (char*)malloc(*len);
+ char *msg = (char*)mallocz(*len);
if (msg)
connupd.SerializeToArray(msg, *len);
@@ -52,7 +52,7 @@ struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) {
if (!req.ParseFromArray(data, len))
return NULL;
- res = (struct disconnect_cmd *)calloc(1, sizeof(struct disconnect_cmd));
+ res = (struct disconnect_cmd *)callocz(1, sizeof(struct disconnect_cmd));
if (!res)
return NULL;
@@ -61,9 +61,9 @@ struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) {
res->permaban = req.permaban();
res->error_code = req.error_code();
if (req.error_description().c_str()) {
- res->error_description = strdup(req.error_description().c_str());
+ res->error_description = strdupz(req.error_description().c_str());
if (!res->error_description) {
- free(res);
+ freez(res);
return NULL;
}
}
diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h
index fcbe6bd59..0356c7d78 100644
--- a/aclk/schema-wrappers/connection.h
+++ b/aclk/schema-wrappers/connection.h
@@ -17,7 +17,7 @@ typedef struct {
unsigned int lwt:1;
- struct capability *capabilities;
+ const struct capability *capabilities;
// TODO in future optional fields
// > 15 optional fields:
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
index a6ca8ef98..db1fa6449 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -29,7 +29,7 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
timestamp->set_nanos(tv.tv_usec * 1000);
if (data->capabilities) {
- struct capability *capa = data->capabilities;
+ const struct capability *capa = data->capabilities;
while (capa->name) {
aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
capability_set(proto_capa, capa);
@@ -38,7 +38,7 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
}
*len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)malloc(*len);
+ char *bin = (char*)mallocz(*len);
if (bin)
msg.SerializeToArray(bin, *len);
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
index c27729d15..dac0d8fe0 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/aclk/schema-wrappers/node_connection.h
@@ -19,7 +19,7 @@ typedef struct {
int64_t session_id;
int32_t hops;
- struct capability *capabilities;
+ const struct capability *capabilities;
} node_instance_connection_t;
char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
diff --git a/aclk/schema-wrappers/node_creation.cc b/aclk/schema-wrappers/node_creation.cc
index c696bb27b..5ad25b7e5 100644
--- a/aclk/schema-wrappers/node_creation.cc
+++ b/aclk/schema-wrappers/node_creation.cc
@@ -18,7 +18,7 @@ char *generate_node_instance_creation(size_t *len, const node_instance_creation_
msg.set_hops(data->hops);
*len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)malloc(*len);
+ char *bin = (char*)mallocz(*len);
if (bin)
msg.SerializeToArray(bin, *len);
@@ -33,7 +33,7 @@ node_instance_creation_result_t parse_create_node_instance_result(const char *da
if (!msg.ParseFromArray(data, len))
return res;
- res.node_id = strdup(msg.node_id().c_str());
- res.machine_guid = strdup(msg.machine_guid().c_str());
+ res.node_id = strdupz(msg.node_id().c_str());
+ res.machine_guid = strdupz(msg.machine_guid().c_str());
return res;
}
diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h
index 190ccb4d6..7a8c7f7c7 100644
--- a/aclk/schema-wrappers/node_creation.h
+++ b/aclk/schema-wrappers/node_creation.h
@@ -8,9 +8,9 @@ extern "C" {
#endif
typedef struct {
- char* claim_id;
- char* machine_guid;
- char* hostname;
+ const char *claim_id;
+ const char *machine_guid;
+ const char *hostname;
int32_t hops;
} node_instance_creation_t;
diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc
index 2a05ddaba..5e321f688 100644
--- a/aclk/schema-wrappers/node_info.cc
+++ b/aclk/schema-wrappers/node_info.cc
@@ -104,7 +104,7 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in
}
*len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)malloc(*len);
+ char *bin = (char*)mallocz(*len);
if (bin)
msg.SerializeToArray(bin, *len);
@@ -128,7 +128,7 @@ char *generate_update_node_collectors_message(size_t *len, struct update_node_co
dfe_done(colls);
*len = PROTO_COMPAT_MSG_SIZE(msg);
- char *bin = (char*)malloc(*len);
+ char *bin = (char*)mallocz(*len);
if (bin)
msg.SerializeToArray(bin, *len);
diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h
index e8ac2d7c6..de4ade78a 100644
--- a/aclk/schema-wrappers/node_info.h
+++ b/aclk/schema-wrappers/node_info.h
@@ -19,41 +19,27 @@ struct machine_learning_info {
};
struct aclk_node_info {
- char *name;
-
- char *os;
- char *os_name;
- char *os_version;
-
- char *kernel_name;
- char *kernel_version;
-
- char *architecture;
-
+ const char *name;
+
+ const char *os;
+ const char *os_name;
+ const char *os_version;
+ const char *kernel_name;
+ const char *kernel_version;
+ const char *architecture;
uint32_t cpus;
-
- char *cpu_frequency;
-
- char *memory;
-
- char *disk_space;
-
- char *version;
-
- char *release_channel;
-
- char *timezone;
-
- char *virtualization_type;
-
- char *container_type;
-
- char *custom_info;
-
- char *machine_guid;
+ const char *cpu_frequency;
+ const char *memory;
+ const char *disk_space;
+ const char *version;
+ const char *release_channel;
+ const char *timezone;
+ const char *virtualization_type;
+ const char *container_type;
+ const char *custom_info;
+ const char *machine_guid;
DICTIONARY *host_labels_ptr;
-
struct machine_learning_info ml_info;
};
@@ -72,8 +58,8 @@ struct update_node_info {
};
struct collector_info {
- char *module;
- char *plugin;
+ const char *module;
+ const char *plugin;
};
struct update_node_collectors {
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc
index 0e473eb6c..8853b2e08 100644
--- a/aclk/schema-wrappers/proto_2_json.cc
+++ b/aclk/schema-wrappers/proto_2_json.cc
@@ -4,8 +4,6 @@
#include "proto/alarm/v1/config.pb.h"
#include "proto/alarm/v1/stream.pb.h"
#include "proto/aclk/v1/lib.pb.h"
-#include "proto/chart/v1/config.pb.h"
-#include "proto/chart/v1/stream.pb.h"
#include "proto/agent/v1/connection.pb.h"
#include "proto/agent/v1/disconnect.pb.h"
#include "proto/nodeinstance/connection/v1/connection.pb.h"
@@ -29,14 +27,6 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new nodeinstance::v1::UpdateNodeInstanceConnection;
if (!strcmp(msgname, "CreateNodeInstance"))
return new nodeinstance::create::v1::CreateNodeInstance;
- if (!strcmp(msgname, "ChartsAndDimensionsUpdated"))
- return new chart::v1::ChartsAndDimensionsUpdated;
- if (!strcmp(msgname, "ChartConfigsUpdated"))
- return new chart::v1::ChartConfigsUpdated;
- if (!strcmp(msgname, "ResetChartMessages"))
- return new chart::v1::ResetChartMessages;
- if (!strcmp(msgname, "RetentionUpdated"))
- return new chart::v1::RetentionUpdated;
if (!strcmp(msgname, "UpdateNodeInfo"))
return new nodeinstance::info::v1::UpdateNodeInfo;
if (!strcmp(msgname, "AlarmLogHealth"))
@@ -59,12 +49,6 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new nodeinstance::create::v1::CreateNodeInstanceResult;
if (!strcmp(msgname, "SendNodeInstances"))
return new agent::v1::SendNodeInstances;
- if (!strcmp(msgname, "StreamChartsAndDimensions"))
- return new chart::v1::StreamChartsAndDimensions;
- if (!strcmp(msgname, "ChartsAndDimensionsAck"))
- return new chart::v1::ChartsAndDimensionsAck;
- if (!strcmp(msgname, "UpdateChartConfigs"))
- return new chart::v1::UpdateChartConfigs;
if (!strcmp(msgname, "StartAlarmStreaming"))
return new alarms::v1::StartAlarmStreaming;
if (!strcmp(msgname, "SendAlarmLogHealth"))
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
index 26412cacc..a96f7ea7a 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -8,8 +8,6 @@
#include "connection.h"
#include "node_connection.h"
#include "node_creation.h"
-#include "chart_config.h"
-#include "chart_stream.h"
#include "alarm_config.h"
#include "alarm_stream.h"
#include "node_info.h"