summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/README.md20
-rw-r--r--aclk/aclk.c495
-rw-r--r--aclk/aclk.h16
-rw-r--r--aclk/aclk_api.c39
-rw-r--r--aclk/aclk_api.h18
-rw-r--r--aclk/aclk_charts_api.c9
-rw-r--r--aclk/aclk_charts_api.h2
-rw-r--r--aclk/aclk_collector_list.c193
-rw-r--r--aclk/aclk_collector_list.h41
-rw-r--r--aclk/aclk_contexts_api.c23
-rw-r--r--aclk/aclk_contexts_api.h12
-rw-r--r--aclk/aclk_otp.c48
-rw-r--r--aclk/aclk_otp.h4
-rw-r--r--aclk/aclk_query.c130
-rw-r--r--aclk/aclk_query_queue.c11
-rw-r--r--aclk/aclk_query_queue.h21
-rw-r--r--aclk/aclk_rrdhost_state.h34
-rw-r--r--aclk/aclk_rx_msgs.c84
-rw-r--r--aclk/aclk_rx_msgs.h4
-rw-r--r--aclk/aclk_stats.c48
-rw-r--r--aclk/aclk_stats.h4
-rw-r--r--aclk/aclk_tx_msgs.c233
-rw-r--r--aclk/aclk_tx_msgs.h13
-rw-r--r--aclk/aclk_util.c25
-rw-r--r--aclk/aclk_util.h6
-rw-r--r--aclk/schema-wrappers/alarm_stream.cc3
-rw-r--r--aclk/schema-wrappers/alarm_stream.h2
-rw-r--r--aclk/schema-wrappers/chart_stream.cc9
-rw-r--r--aclk/schema-wrappers/chart_stream.h2
-rw-r--r--aclk/schema-wrappers/context.cc125
-rw-r--r--aclk/schema-wrappers/context.h53
-rw-r--r--aclk/schema-wrappers/context_stream.cc42
-rw-r--r--aclk/schema-wrappers/context_stream.h36
-rw-r--r--aclk/schema-wrappers/node_connection.cc9
-rw-r--r--aclk/schema-wrappers/node_connection.h3
-rw-r--r--aclk/schema-wrappers/node_info.cc35
-rw-r--r--aclk/schema-wrappers/node_info.h21
-rw-r--r--aclk/schema-wrappers/proto_2_json.cc101
-rw-r--r--aclk/schema-wrappers/proto_2_json.h18
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.cc7
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.h4
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h2
42 files changed, 827 insertions, 1178 deletions
diff --git a/aclk/README.md b/aclk/README.md
index f595726e3..6f541c38e 100644
--- a/aclk/README.md
+++ b/aclk/README.md
@@ -11,15 +11,25 @@ The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting
through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port
`443`. The ACLK is encrypted, safe, and _is only established if you connect your node_.
-The Cloud App lives at app.netdata.cloud which currently resolves to 35.196.244.138. However, this IP or range of
-IPs can change without notice. Watch this page for updates.
+The Cloud App lives at app.netdata.cloud which currently resolves to the following list of IPs:
+
+- 54.198.178.11
+- 44.207.131.212
+- 44.196.50.41
+
+:::caution
+
+This list of IPs can change without notice, we strongly advise you to whitelist the domain `app.netdata.cloud`, if
+this is not an option in your case always verify the current domain resolution (e.g via the `host` command).
+
+:::
For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [get
started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [connect to Cloud
documentation](/claim/README.md).
## Data privacy
-[Data privacy](https://netdata.cloud/data-privacy/) is very important to us. We firmly believe that your data belongs to
+[Data privacy](https://netdata.cloud/privacy/) is very important to us. We firmly believe that your data belongs to
you. This is why **we don't store any metric data in Netdata Cloud**.
All the data that you see in the web browser when using Netdata Cloud, is actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard.
@@ -50,12 +60,12 @@ You can configure following keys in the `netdata.conf` section `[cloud]`:
[cloud]
statistics = yes
query thread count = 2
- mqtt5 = no
+ mqtt5 = yes
```
- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
-- `mqtt5` enables the new MQTT5 protocol implementation in the Agent. Currently a technical preview.
+- `mqtt5` allows disabling the new MQTT5 implementation which is used now by default in case of issues. This option will be removed in future stable release.
## Disable the ACLK
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 6426c5b5e..7b3641b1e 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -10,7 +10,6 @@
#include "aclk_query_queue.h"
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
-#include "aclk_collector_list.h"
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
@@ -46,17 +45,29 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
- .agent_state = ACLK_HOST_INITIALIZING,
- .last_popcorn_interrupt = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+OSSL_DECODER_CTX *aclk_dctx = NULL;
+EVP_PKEY *aclk_private_key = NULL;
+#else
static RSA *aclk_private_key = NULL;
+#endif
static int load_private_key()
{
- if (aclk_private_key != NULL)
+ if (aclk_private_key != NULL) {
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ EVP_PKEY_free(aclk_private_key);
+ if (aclk_dctx)
+ OSSL_DECODER_CTX_free(aclk_dctx);
+
+ aclk_dctx = NULL;
+#else
RSA_free(aclk_private_key);
+#endif
+ }
aclk_private_key = NULL;
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
@@ -75,7 +86,25 @@ static int load_private_key()
goto biofailed;
}
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL,
+ "RSA",
+ OSSL_KEYMGMT_SELECT_PRIVATE_KEY,
+ NULL, NULL);
+
+ if (!aclk_dctx) {
+ error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
+ goto biofailed;
+ }
+
+ // this is necesseary to avoid RSA key with wrong size
+ if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
+ error("Decoding private key (from claiming) failed - invalid format.");
+ goto biofailed;
+ }
+#else
aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
+#endif
BIO_free(key_bio);
if (aclk_private_key!=NULL)
{
@@ -112,12 +141,12 @@ static int wait_till_cloud_enabled()
static int wait_till_agent_claimed(void)
{
//TODO prevent malloc and freez
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
if (netdata_exit)
return 1;
- agent_id = is_agent_claimed();
+ agent_id = get_agent_claimid();
}
freez(agent_id);
return 0;
@@ -188,54 +217,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
//TODO prevent big buffer on stack
#define RX_MSGLEN_MAX 4096
-static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
-{
- UNUSED(qos);
- char cmsg[RX_MSGLEN_MAX];
- size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
- const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
- if (!cmd_topic) {
- error("Error retrieving command topic");
- return;
- }
-
- if (msglen > RX_MSGLEN_MAX - 1)
- error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
-
- memcpy(cmsg,
- msg,
- len);
- cmsg[len] = 0;
-
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 512
- char filename[FN_MAX_LEN];
- int logfd;
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT());
- logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
- if(logfd < 0)
- error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
- write(logfd, msg, msglen);
- close(logfd);
-#endif
-
- debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg);
-
- if (strcmp(cmd_topic, topic))
- error("Received message on unexpected topic %s", topic);
-
- if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
- error("Link is shutting down. Ignoring incoming message.");
- return;
- }
-
- aclk_handle_cloud_cmd_message(cmsg);
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
+static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
+ aclk_rcvd_cloud_msgs++;
if (msglen > RX_MSGLEN_MAX)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@@ -269,17 +254,8 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t
close(logfd);
#endif
- aclk_handle_new_cloud_msg(msgtype, msg, msglen);
-}
-
-static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
- aclk_rcvd_cloud_msgs++;
- if (aclk_use_new_cloud_arch)
- msg_callback_new_protocol(topic, msg, msglen, qos);
- else
- msg_callback_old_protocol(topic, msg, msglen, qos);
+ aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic);
}
-#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
static void puback_callback(uint16_t packet_id)
{
@@ -356,40 +332,6 @@ static int handle_connection(mqtt_wss_client client)
return 0;
}
-inline static int aclk_popcorn_check()
-{
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return 1;
- }
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
-}
-
-inline static int aclk_popcorn_check_bump()
-{
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
- ACLK_SHARED_STATE_UNLOCK;
- return 1;
- }
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
-}
-
-static inline void queue_connect_payloads(void)
-{
- aclk_query_t query = aclk_query_new(METADATA_INFO);
- query->data.metadata_info.host = localhost;
- query->data.metadata_info.initial_on_connect = 1;
- aclk_queue_query(query);
- query = aclk_query_new(METADATA_ALARMS);
- query->data.metadata_alarms.initial_on_connect = 1;
- aclk_queue_query(query);
-}
-
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
@@ -399,15 +341,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (aclk_use_new_cloud_arch) {
- topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
- if (!topic)
- error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
- else
- mqtt_wss_subscribe(client, topic, 1);
- }
-#endif
+ topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+ if (!topic)
+ error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+ else
+ mqtt_wss_subscribe(client, topic, 1);
aclk_stats_upd_online(1);
aclk_connected = 1;
@@ -415,55 +353,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_rcvd_cloud_msgs = 0;
aclk_connection_counter++;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (!aclk_use_new_cloud_arch) {
-#endif
- ACLK_SHARED_STATE_LOCK;
- if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
- error("Sending `connect` payload immediately as popcorning was finished already.");
- queue_connect_payloads();
- }
- ACLK_SHARED_STATE_UNLOCK;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- } else {
- aclk_send_agent_connection_update(client, 1);
- }
-#endif
-}
-
-/* Waits until agent is ready or needs to exit
- * @param client instance of mqtt_wss_client
- * @param query_threads pointer to aclk_query_threads
- * structure where to store data about started query threads
- * @return 0 - Popcorning Finished - Agent STABLE,
- * !0 - netdata_exit
- */
-static int wait_popcorning_finishes()
-{
- time_t elapsed;
- int need_wait;
- if (aclk_use_new_cloud_arch)
- return 0;
-
- while (!netdata_exit) {
- ACLK_SHARED_STATE_LOCK;
- if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
- }
- elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
- if (elapsed >= ACLK_STABLE_TIMEOUT) {
- aclk_shared_state.agent_state = ACLK_HOST_STABLE;
- ACLK_SHARED_STATE_UNLOCK;
- error("ACLK localhost popcorn timer finished");
- return 0;
- }
- ACLK_SHARED_STATE_UNLOCK;
- need_wait = ACLK_STABLE_TIMEOUT - elapsed;
- error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait);
- sleep(need_wait);
- }
- return 1;
+ aclk_send_agent_connection_update(client, 1);
}
void aclk_graceful_disconnect(mqtt_wss_client client)
@@ -471,12 +361,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
info("Preparing to gracefully shutdown ACLK connection");
aclk_queue_lock();
aclk_queue_flush();
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (aclk_use_new_cloud_arch)
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
- else
-#endif
- aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+
+ aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
@@ -594,8 +480,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
- json_object *lwt = NULL;
-
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
@@ -629,8 +513,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.drop_on_publish_fail = 1
};
- aclk_use_new_cloud_arch = 0;
-
#ifndef ACLK_DISABLE_CHALLENGE
if (aclk_env) {
aclk_env_t_destroy(aclk_env);
@@ -649,20 +531,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (netdata_exit)
return 1;
- if (aclk_env->encoding == ACLK_ENC_PROTO) {
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
- error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
+ if (aclk_env->encoding != ACLK_ENC_PROTO) {
+ error_report("This agent can only use the new cloud protocol but cloud requested old one.");
continue;
-#else
- if (!aclk_env_has_capa("proto")) {
- error ("Can't encoding=proto without at least \"proto\" capability.");
- continue;
- }
- info("Switching ACLK to new protobuf protocol. Due to /env response.");
- aclk_use_new_cloud_arch = 1;
-#endif
}
+ if (!aclk_env_has_capa("proto")) {
+ error ("Can't use encoding=proto without at least \"proto\" capability.");
+ continue;
+ }
+ info("New ACLK protobuf protocol negotiated successfully (/env response).");
+
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
@@ -679,10 +558,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// aclk_get_topic moved here as during OTP we
// generate the topic cache
- if (aclk_use_new_cloud_arch)
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
- else
- mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+ mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
error("Couldn't get LWT topic. Will not send LWT.");
@@ -708,17 +584,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- if (aclk_use_new_cloud_arch) {
- mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
- } else {
-#endif
- lwt = aclk_generate_disconnect(NULL);
- mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
- mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- }
-#endif
+ mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -732,10 +598,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
- if (aclk_use_new_cloud_arch)
- freez((char *)mqtt_conn_params.will_msg);
- else
- json_object_put(lwt);
+ freez((char *)mqtt_conn_params.will_msg);
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
@@ -778,10 +641,7 @@ void *aclk_main(void *ptr)
return NULL;
}
- unsigned int proto_hdl_cnt;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- proto_hdl_cnt = aclk_init_rx_msg_handlers();
-#endif
+ unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
// as it must notify the far end that it shutdown gracefully and avoid the LWT.
@@ -792,7 +652,6 @@ void *aclk_main(void *ptr)
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
#endif
- aclk_popcorn_check_bump(); // start localhost popcorn timer
query_threads.count = read_query_thread_count();
if (wait_till_cloud_enabled())
@@ -801,13 +660,9 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;
- use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
+ use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
-#else
- if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) {
-#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@@ -822,6 +677,7 @@ void *aclk_main(void *ptr)
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
+ stats_thread->client = mqttwss_client;
aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
netdata_thread_create(
stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
@@ -834,28 +690,9 @@ void *aclk_main(void *ptr)
if (aclk_attempt_to_connect(mqttwss_client))
goto exit_full;
-#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL)
- error_report("############################ WARNING ###############################");
- error_report("# Your agent is configured to connect to cloud but has #");
- error_report("# no protobuf protocol support (uses legacy JSON protocol) #");
- error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #");
- error_report("# Visit following link for more info and instructions how to solve #");
- error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #");
- error_report("######################################################################");
-#endif
-
- // warning this assumes the popcorning is relative short (3s)
- // if that changes call mqtt_wss_service from within
- // to keep OpenSSL, WSS and MQTT connection alive
- if (wait_popcorning_finishes())
- goto exit_full;
-
if (unlikely(!query_threads.thread_list))
aclk_query_threads_start(&query_threads, mqttwss_client);
- if (!aclk_use_new_cloud_arch)
- queue_connect_payloads();
-
if (handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
@@ -889,168 +726,12 @@ exit:
return NULL;
}
-// TODO this is taken over as workaround from old ACLK
-// fix this in both old and new ACLK
-extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
-
-void aclk_alarm_reload(void)
-{
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return;
- }
- ACLK_SHARED_STATE_UNLOCK;
-
- aclk_queue_query(aclk_query_new(METADATA_ALARMS));
-}
-
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
-{
- BUFFER *local_buffer;
- json_object *msg;
-
- if (host != localhost)
- return 0;
-
- ACLK_SHARED_STATE_LOCK;
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
- ACLK_SHARED_STATE_UNLOCK;
- return 0;
- }
- ACLK_SHARED_STATE_UNLOCK;
-
- local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
-
- netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
- health_alarm_entry2json_nolock(local_buffer, ae, host);
- netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
-
- msg = json_tokener_parse(local_buffer->buffer);
-
- struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE);
- query->data.alarm_update = msg;
- aclk_queue_query(query);
-
- buffer_free(local_buffer);
- return 0;
-}
-
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
-{
- struct aclk_query *query;
-
- if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check())
- return 0;
-
- query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
- if(create) {
- query->data.chart_add_del.host = host;
- query->data.chart_add_del.chart_name = strdupz(chart_name);
- } else {
- query->data.metadata_info.host = host;
- query->data.metadata_info.initial_on_connect = 0;
- }
-
- aclk_queue_query(query);
- return 0;
-}
-
-/*
- * Add a new collector to the list
- * If it exists, update the chart count
- */
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
- struct aclk_query *query;
- struct _collector *tmp_collector;
- if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
- return;
- }
-
- COLLECTOR_LOCK;
-
- tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
-
- if (unlikely(tmp_collector->count != 1)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- COLLECTOR_UNLOCK;
-
- if (aclk_popcorn_check_bump())
- return;
-
- if (host != localhost)
- return;
-
- query = aclk_query_new(METADATA_INFO);
- query->data.metadata_info.host = localhost; //TODO
- query->data.metadata_info.initial_on_connect = 0;
- aclk_queue_query(query);
-
- query = aclk_query_new(METADATA_ALARMS);
- query->data.metadata_alarms.initial_on_connect = 0;
- aclk_queue_query(query);
-}
-
-/*
- * Delete a collector from the list
- * If the chart count reaches zero the collector will be removed
- * from the list by calling del_collector.
- *
- * This function will release the memory used and schedule
- * a cloud update
- */
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
-{
- struct aclk_query *query;
- struct _collector *tmp_collector;
- if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
- return;
- }
-
- COLLECTOR_LOCK;
-
- tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
-
- if (unlikely(!tmp_collector || tmp_collector->count)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- debug(
- D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
- tmp_collector->count);
-
- COLLECTOR_UNLOCK;
-
- _free_collector(tmp_collector);
-
- if (aclk_popcorn_check_bump())
- return;
-
- if (host != localhost)
- return;
-
- query = aclk_query_new(METADATA_INFO);
- query->data.metadata_info.host = localhost; //TODO
- query->data.metadata_info.initial_on_connect = 0;
- aclk_queue_query(query);
-
- query = aclk_query_new(METADATA_ALARMS);
- query->data.metadata_alarms.initial_on_connect = 0;
- aclk_queue_query(query);
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
int ret;
- if (!aclk_connected || !aclk_use_new_cloud_arch)
+ if (!aclk_connected)
return;
ret = get_node_id(&host->host_uuid, &node_id);
@@ -1088,6 +769,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
+
+ struct capability caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) },
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ node_state_update.capabilities = caps;
+
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -1120,6 +811,20 @@ void aclk_send_node_instances()
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
+
+ char host_id[UUID_STR_LEN];
+ uuid_unparse_lower(list->host_id, host_id);
+
+ RRDHOST *host = rrdhost_find_by_guid(host_id, 0);
+ struct capability caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ node_state_update.capabilities = caps;
+
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -1157,14 +862,12 @@ void aclk_send_node_instances()
}
freez(list_head);
}
-#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
{
struct proto_alert_status status;
@@ -1220,7 +923,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
);
freez(stats);
}
-#endif
char *ng_aclk_state(void)
{
@@ -1231,15 +933,11 @@ char *ng_aclk_state(void)
buffer_strcat(wb,
"ACLK Available: Yes\n"
"ACLK Version: 2\n"
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- "Protocols Supported: Legacy, Protobuf\n"
-#else
- "Protocols Supported: Legacy\n"
-#endif
+ "Protocols Supported: Protobuf\n"
);
- buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3);
+ buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
if (agent_id == NULL)
buffer_strcat(wb, "No\n");
else {
@@ -1273,7 +971,6 @@ char *ng_aclk_state(void)
if (aclk_connected) {
buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
RRDHOST *host;
rrd_rdlock();
rrdhost_foreach_read(host) {
@@ -1308,7 +1005,6 @@ char *ng_aclk_state(void)
fill_chart_status_for_host(wb, host);
}
rrd_unlock();
-#endif
}
ret = strdupz(buffer_tostring(wb));
@@ -1316,7 +1012,6 @@ char *ng_aclk_state(void)
return ret;
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
{
struct proto_alert_status status;
@@ -1381,7 +1076,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
freez(stats);
}
-#endif
static json_object *timestamp_to_json(const time_t *t)
{
@@ -1405,18 +1099,11 @@ char *ng_aclk_state_json(void)
json_object_object_add(msg, "aclk-version", tmp);
grp = json_object_new_array();
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- tmp = json_object_new_string("Legacy");
- json_object_array_add(grp, tmp);
tmp = json_object_new_string("Protobuf");
json_object_array_add(grp, tmp);
-#else
- tmp = json_object_new_string("Legacy");
- json_object_array_add(grp, tmp);
-#endif
json_object_object_add(msg, "protocols-supported", grp);
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
tmp = json_object_new_boolean(agent_id != NULL);
json_object_object_add(msg, "agent-claimed", tmp);
@@ -1434,7 +1121,7 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_boolean(aclk_connected);
json_object_object_add(msg, "online", tmp);
- tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
+ tmp = json_object_new_string("Protobuf");
json_object_object_add(msg, "used-cloud-protocol", tmp);
tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
@@ -1461,7 +1148,6 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_boolean(aclk_disable_runtime);
json_object_object_add(msg, "banned-by-cloud", tmp);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
grp = json_object_new_array();
RRDHOST *host;
@@ -1513,7 +1199,6 @@ char *ng_aclk_state_json(void)
}
rrd_unlock();
json_object_object_add(msg, "node-instances", grp);
-#endif
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 41c4e05e4..5065ac2bf 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -21,9 +21,6 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
extern struct aclk_shared_state {
- ACLK_AGENT_STATE agent_state;
- time_t last_popcorn_interrupt;
-
// To wait for `disconnect` message PUBACK
// when shutting down
// at the same time if > 0 we know link is
@@ -32,21 +29,8 @@ extern struct aclk_shared_state {
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;
-void aclk_alarm_reload(void);
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-
-/* Informs ACLK about created/deleted chart
- * @param create 0 - if chart was deleted, other if chart created
- */
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
-
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd);
void aclk_send_node_instances(void);
-#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c
index a2e738ab1..141d267af 100644
--- a/aclk/aclk_api.c
+++ b/aclk/aclk_api.c
@@ -13,10 +13,10 @@ usec_t aclk_session_us = 0;
time_t aclk_session_sec = 0;
int aclk_disable_runtime = 0;
-int aclk_disable_single_updates = 0;
int aclk_stats_enabled;
int use_mqtt_5 = 0;
+int aclk_ctx_based = 0;
#define ACLK_IMPL_KEY_NAME "aclk implementation"
@@ -33,25 +33,17 @@ void *aclk_starter(void *ptr) {
}
return aclk_main(ptr);
}
-
-void aclk_single_update_disable()
-{
- aclk_disable_single_updates = 1;
-}
-
-void aclk_single_update_enable()
-{
- aclk_disable_single_updates = 0;
-}
#endif /* ENABLE_ACLK */
-struct label *add_aclk_host_labels(struct label *label) {
+void add_aclk_host_labels(void) {
+ DICTIONARY *labels = localhost->host_labels;
+
#ifdef ENABLE_ACLK
- label = add_label_to_list(label, "_aclk_ng_available", "true", LABEL_SOURCE_AUTO);
+ rrdlabels_add(labels, "_aclk_ng_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
#else
- label = add_label_to_list(label, "_aclk_ng_available", "false", LABEL_SOURCE_AUTO);
+ rrdlabels_add(labels, "_aclk_ng_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
#endif
- label = add_label_to_list(label, "_aclk_legacy_available", "false", LABEL_SOURCE_AUTO);
+ rrdlabels_add(labels, "_aclk_legacy_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
#ifdef ENABLE_ACLK
ACLK_PROXY_TYPE aclk_proxy;
char *proxy_str;
@@ -69,17 +61,14 @@ struct label *add_aclk_host_labels(struct label *label) {
break;
}
- int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
- label = add_label_to_list(label, "_mqtt_version", mqtt5 ? "5" : "3", LABEL_SOURCE_AUTO);
- label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
- label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "true", LABEL_SOURCE_AUTO);
-#else
- label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "false", LABEL_SOURCE_AUTO);
-#endif
+
+ int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
+
+ rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
+ rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
#endif
- return label;
}
char *aclk_state(void) {
diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h
index 557b70d70..36a6d603f 100644
--- a/aclk/aclk_api.h
+++ b/aclk/aclk_api.h
@@ -15,31 +15,17 @@ extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
extern int aclk_disable_runtime;
-extern int aclk_disable_single_updates;
extern int aclk_stats_enabled;
extern int aclk_alert_reloaded;
-extern int aclk_ng;
extern int use_mqtt_5;
+extern int aclk_ctx_based;
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr);
-void aclk_single_update_disable();
-void aclk_single_update_enable();
-
-void aclk_alarm_reload(void);
-
-int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
-
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int connect);
-#endif
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
@@ -52,7 +38,7 @@ void aclk_host_state_update(RRDHOST *host, int connect);
#endif
-struct label *add_aclk_host_labels(struct label *label);
+void add_aclk_host_labels(void);
char *aclk_state(void);
char *aclk_state_json(void);
diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c
index 4e1c466e8..51d8dad58 100644
--- a/aclk/aclk_charts_api.c
+++ b/aclk/aclk_charts_api.c
@@ -66,3 +66,12 @@ void aclk_update_node_info(struct update_node_info *info)
query->data.bin_payload.msg_name = "UpdateNodeInfo";
QUEUE_IF_PAYLOAD_PRESENT(query);
}
+
+void aclk_update_node_collectors(struct update_node_collectors *collectors)
+{
+ aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS);
+ query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS;
+ query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors);
+ query->data.bin_payload.msg_name = "UpdateNodeCollectors";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h
index 305fe4f74..71f07dd33 100644
--- a/aclk/aclk_charts_api.h
+++ b/aclk/aclk_charts_api.h
@@ -17,4 +17,6 @@ void aclk_retention_updated(struct retention_updated *data);
void aclk_update_node_info(struct update_node_info *info);
+void aclk_update_node_collectors(struct update_node_collectors *collectors);
+
#endif /* ACLK_CHARTS_H */
diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c
deleted file mode 100644
index 2920c9a5c..000000000
--- a/aclk/aclk_collector_list.c
+++ /dev/null
@@ -1,193 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-// This is copied from Legacy ACLK, Original Author: amoss
-
-// TODO unmess this
-
-#include "aclk_collector_list.h"
-
-netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
-
-struct _collector *collector_list = NULL;
-
-/*
- * Free a collector structure
- */
-void _free_collector(struct _collector *collector)
-{
- if (likely(collector->plugin_name))
- freez(collector->plugin_name);
-
- if (likely(collector->module_name))
- freez(collector->module_name);
-
- if (likely(collector->hostname))
- freez(collector->hostname);
-
- freez(collector);
-}
-
-/*
- * This will report the collector list
- *
- */
-#ifdef ACLK_DEBUG
-static void _dump_collector_list()
-{
- struct _collector *tmp_collector;
-
- COLLECTOR_LOCK;
-
- info("DUMPING ALL COLLECTORS");
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- info("DUMPING ALL COLLECTORS -- nothing found");
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
-
- while (tmp_collector) {
- info(
- "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
- tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
- tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
-
- tmp_collector = tmp_collector->next;
- }
- info("DUMPING ALL COLLECTORS DONE");
- COLLECTOR_UNLOCK;
-}
-#endif
-
-/*
- * This will cleanup the collector list
- *
- */
-void _reset_collector_list()
-{
- struct _collector *tmp_collector, *next_collector;
-
- COLLECTOR_LOCK;
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
- collector_list->count = 0;
- collector_list->next = NULL;
-
- // We broke the link; we can unlock
- COLLECTOR_UNLOCK;
-
- while (tmp_collector) {
- next_collector = tmp_collector->next;
- _free_collector(tmp_collector);
- tmp_collector = next_collector;
- }
-}
-
-/*
- * Find a collector (if it exists)
- * Must lock before calling this
- * If last_collector is not null, it will return the previous collector in the linked
- * list (used in collector delete)
- */
-static struct _collector *_find_collector(
- const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
-{
- struct _collector *tmp_collector, *prev_collector;
- uint32_t plugin_hash;
- uint32_t module_hash;
- uint32_t hostname_hash;
-
- if (unlikely(!collector_list)) {
- collector_list = callocz(1, sizeof(struct _collector));
- return NULL;
- }
-
- if (unlikely(!collector_list->next))
- return NULL;
-
- plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
- module_hash = module_name ? simple_hash(module_name) : 1;
- hostname_hash = simple_hash(hostname);
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
- prev_collector = collector_list;
- while (tmp_collector) {
- if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
- hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
- (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
- (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
- if (unlikely(last_collector))
- *last_collector = prev_collector;
-
- return tmp_collector;
- }
-
- prev_collector = tmp_collector;
- tmp_collector = tmp_collector->next;
- }
-
- return tmp_collector;
-}
-
-/*
- * Called to delete a collector
- * It will reduce the count (chart_count) and will remove it
- * from the linked list if the count reaches zero
- * The structure will be returned to the caller to free
- * the resources
- *
- */
-struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector, *prev_collector = NULL;
-
- tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
-
- if (likely(tmp_collector)) {
- --tmp_collector->count;
- if (unlikely(!tmp_collector->count))
- prev_collector->next = tmp_collector->next;
- }
- return tmp_collector;
-}
-
-/*
- * Add a new collector (plugin / module) to the list
- * If it already exists just update the chart count
- *
- * Lock before calling
- */
-struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector;
-
- tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
-
- if (unlikely(!tmp_collector)) {
- tmp_collector = callocz(1, sizeof(struct _collector));
- tmp_collector->hostname_hash = simple_hash(hostname);
- tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
- tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
-
- tmp_collector->hostname = strdupz(hostname);
- tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
- tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
-
- tmp_collector->next = collector_list->next;
- collector_list->next = tmp_collector;
- }
- tmp_collector->count++;
- debug(
- D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
- module_name ? module_name : "*", tmp_collector->count);
- return tmp_collector;
-}
diff --git a/aclk/aclk_collector_list.h b/aclk/aclk_collector_list.h
deleted file mode 100644
index 09c06b14a..000000000
--- a/aclk/aclk_collector_list.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-// This is copied from Legacy ACLK, Original Author: amoss
-
-// TODO unmess this
-
-#ifndef ACLK_COLLECTOR_LIST_H
-#define ACLK_COLLECTOR_LIST_H
-
-#include "libnetdata/libnetdata.h"
-
-extern netdata_mutex_t collector_mutex;
-
-#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
-#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
-
-/*
- * Maintain a list of collectors and chart count
- * If all the charts of a collector are deleted
- * then a new metadata dataset must be send to the cloud
- *
- */
-struct _collector {
- time_t created;
- uint32_t count; //chart count
- uint32_t hostname_hash;
- uint32_t plugin_hash;
- uint32_t module_hash;
- char *hostname;
- char *plugin_name;
- char *module_name;
- struct _collector *next;
-};
-
-extern struct _collector *collector_list;
-
-struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
-struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
-void _reset_collector_list();
-void _free_collector(struct _collector *collector);
-
-#endif /* ACLK_COLLECTOR_LIST_H */
diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c
new file mode 100644
index 000000000..f17d3cabd
--- /dev/null
+++ b/aclk/aclk_contexts_api.c
@@ -0,0 +1,23 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_query_queue.h"
+
+#include "aclk_contexts_api.h"
+
+void aclk_send_contexts_snapshot(contexts_snapshot_t data)
+{
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT;
+ query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsSnapshot";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
+
+void aclk_send_contexts_updated(contexts_updated_t data)
+{
+ aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
+ query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED;
+ query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size);
+ query->data.bin_payload.msg_name = "ContextsUpdated";
+ QUEUE_IF_PAYLOAD_PRESENT(query);
+}
diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h
new file mode 100644
index 000000000..46b916d22
--- /dev/null
+++ b/aclk/aclk_contexts_api.h
@@ -0,0 +1,12 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#ifndef ACLK_CONTEXTS_API_H
+#define ACLK_CONTEXTS_API_H
+
+#include "schema-wrappers/schema_wrappers.h"
+
+
+void aclk_send_contexts_snapshot(contexts_snapshot_t data);
+void aclk_send_contexts_updated(contexts_updated_t data);
+
+#endif /* ACLK_CONTEXTS_API_H */
+
diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c
index c99c65637..b7bf173c4 100644
--- a/aclk/aclk_otp.c
+++ b/aclk/aclk_otp.c
@@ -446,11 +446,37 @@ cleanup_buffers:
return rc;
}
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+static int private_decrypt(EVP_PKEY *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted)
+#else
static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted)
+#endif
{
+ int result;
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+ size_t outlen = EVP_PKEY_size(p_key);
+ EVP_PKEY_CTX *ctx = EVP_PKEY_CTX_new(p_key, NULL);
+ if (!ctx)
+ return 1;
+
+ if (EVP_PKEY_decrypt_init(ctx) <= 0)
+ return 1;
+
+ if (EVP_PKEY_CTX_set_rsa_padding(ctx, RSA_PKCS1_OAEP_PADDING) <= 0)
+ return 1;
+
+ *decrypted = mallocz(outlen);
+
+ if (EVP_PKEY_decrypt(ctx, *decrypted, &outlen, enc_data, data_len) == 1)
+ result = (int) outlen;
+ else
+ result = -1;
+#else
*decrypted = mallocz(RSA_size(p_key));
- int result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING);
- if (result == -1) {
+ result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING);
+#endif
+ if (result == -1)
+ {
char err[512];
ERR_error_string_n(ERR_get_error(), err, sizeof(err));
error("Decryption of the challenge failed: %s", err);
@@ -458,12 +484,16 @@ static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, u
return result;
}
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target)
+#else
int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target)
+#endif
{
unsigned char *challenge;
int challenge_bytes;
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
if (agent_id == NULL) {
error("Agent was not claimed - cannot perform challenge/response");
return 1;
@@ -806,7 +836,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
req.request_type = HTTP_REQ_GET;
- char *agent_id = is_agent_claimed();
+ char *agent_id = get_agent_claimid();
if (agent_id == NULL)
{
error("Agent was not claimed - cannot perform challenge/response");
@@ -814,11 +844,11 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
return 1;
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json,proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
-#else
- buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
-#endif
+ if (rrdcontext_enabled)
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+ else
+ buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
+
freez(agent_id);
req.host = (char*)aclk_hostname;
diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h
index 1ca9245c2..2d660e5a4 100644
--- a/aclk/aclk_otp.h
+++ b/aclk/aclk_otp.h
@@ -8,7 +8,11 @@
#include "https_client.h"
#include "aclk_util.h"
+#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
+int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target);
+#else
int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target);
+#endif
int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port);
#endif /* ACLK_OTP_H */
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index de970fc3d..981c01965 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -13,27 +13,6 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
-typedef struct aclk_query_handler {
- aclk_query_type_t type;
- char *name; // for logging purposes
- int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query);
-} aclk_query_handler;
-
-static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_send_info_metadata(query_thr->client,
- !query->data.metadata_info.initial_on_connect,
- query->data.metadata_info.host);
- return 0;
-}
-
-static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_send_alarm_metadata(query_thr->client,
- !query->data.metadata_info.initial_on_connect);
- return 0;
-}
-
static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url)
{
usec_t t;
@@ -277,84 +256,63 @@ cleanup:
return retval;
}
-static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name);
- return 0;
-}
-
-static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_alarm_state_msg(query_thr->client, query->data.alarm_update);
- // aclk_alarm_state_msg frees the json object including the header it generates
- query->data.alarm_update = NULL;
- return 0;
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
// this will be simplified when legacy support is removed
aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
return 0;
}
-#endif
-
-aclk_query_handler aclk_query_handlers[] = {
- { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 },
- { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query },
- { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata },
- { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata },
- { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
- { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg },
- { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg },
- { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
- { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
- { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
- { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg },
- { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg },
- { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg },
- { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg },
- { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg },
-#endif
- { .type = UNKNOWN, .name = NULL, .fnc = NULL }
-};
const char *aclk_query_get_name(aclk_query_type_t qt)
{
- aclk_query_handler *ptr = aclk_query_handlers;
- while (ptr->type != UNKNOWN) {
- if (ptr->type == qt)
- return ptr->name;
- ptr++;
+ switch (qt) {
+ case HTTP_API_V2: return "http_api_request_v2";
+ case REGISTER_NODE: return "register_node";
+ case NODE_STATE_UPDATE: return "node_state_update";
+ case CHART_DIMS_UPDATE: return "chart_and_dim_update";
+ case CHART_CONFIG_UPDATED: return "chart_config_updated";
+ case CHART_RESET: return "reset_chart_messages";
+ case RETENTION_UPDATED: return "update_retention_info";
+ case UPDATE_NODE_INFO: return "update_node_info";
+ case ALARM_LOG_HEALTH: return "alarm_log_health";
+ case ALARM_PROVIDE_CFG: return "provide_alarm_config";
+ case ALARM_SNAPSHOT: return "alarm_snapshot";
+ case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
+ case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
+ default:
+ error_report("Unknown query type used %d", (int) qt);
+ return "unknown";
}
- return "unknown";
}
static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
- if (aclk_query_handlers[i].type == query->type) {
- worker_is_busy(i);
-
- debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
- aclk_query_handlers[i].fnc(query_thr, query);
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_dispatched++;
- aclk_queries_per_thread[query_thr->idx]++;
- aclk_metrics_per_sample.queries_per_type[query->type]++;
- ACLK_STATS_UNLOCK;
- }
- aclk_query_free(query);
+{
+ if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
+ error_report("Unknown query in query queue. %u", query->type);
+ aclk_query_free(query);
+ return;
+ }
- worker_is_idle();
- return;
- }
+ worker_is_busy(query->type);
+ if (query->type == HTTP_API_V2) {
+ debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
+ http_api_v2(query_thr, query);
+ } else {
+ debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name);
+ send_bin_msg(query_thr, query);
}
- fatal("Unknown query in query queue. %u", query->type);
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_dispatched++;
+ aclk_queries_per_thread[query_thr->idx]++;
+ aclk_metrics_per_sample.queries_per_type[query->type]++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ aclk_query_free(query);
+
+ worker_is_idle();
}
/* Processes messages from queue. Compete for work with other threads
@@ -370,8 +328,8 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
static void worker_aclk_register(void) {
worker_register("ACLKQUERY");
- for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
- worker_register_job_name(i, aclk_query_handlers[i].name);
+ for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
+ worker_register_job_name(i, aclk_query_get_name(i));
}
}
diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c
index 2422b01e1..01b20d23f 100644
--- a/aclk/aclk_query_queue.c
+++ b/aclk/aclk_query_queue.c
@@ -111,15 +111,6 @@ void aclk_query_free(aclk_query_t query)
freez(query->data.http_api_v2.query);
break;
- case CHART_NEW:
- freez(query->data.chart_add_del.chart_name);
- break;
-
- case ALARM_STATE_UPDATE:
- if (query->data.alarm_update)
- json_object_put(query->data.alarm_update);
- break;
-
case NODE_STATE_UPDATE:
case REGISTER_NODE:
case CHART_DIMS_UPDATE:
@@ -130,6 +121,8 @@ void aclk_query_free(aclk_query_t query)
case ALARM_LOG_HEALTH:
case ALARM_PROVIDE_CFG:
case ALARM_SNAPSHOT:
+ case UPDATE_NODE_COLLECTORS:
+ case PROTO_BIN_MESSAGE:
if (!use_mqtt_5)
freez(query->data.bin_payload.payload);
break;
diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h
index 0b5ef8faa..ab94b6384 100644
--- a/aclk/aclk_query_queue.h
+++ b/aclk/aclk_query_queue.h
@@ -11,12 +11,7 @@
typedef enum {
UNKNOWN = 0,
- METADATA_INFO,
- METADATA_ALARMS,
HTTP_API_V2,
- CHART_NEW,
- CHART_DEL,
- ALARM_STATE_UPDATE,
REGISTER_NODE,
NODE_STATE_UPDATE,
CHART_DIMS_UPDATE,
@@ -27,19 +22,11 @@ typedef enum {
ALARM_LOG_HEALTH,
ALARM_PROVIDE_CFG,
ALARM_SNAPSHOT,
+ UPDATE_NODE_COLLECTORS,
+ PROTO_BIN_MESSAGE,
ACLK_QUERY_TYPE_COUNT // always keep this as last
} aclk_query_type_t;
-struct aclk_query_metadata {
- RRDHOST *host;
- int initial_on_connect;
-};
-
-struct aclk_query_chart_add_del {
- RRDHOST *host;
- char* chart_name;
-};
-
struct aclk_query_http_api_v2 {
char *payload;
char *query;
@@ -73,12 +60,8 @@ struct aclk_query {
// TODO maybe remove?
int version;
union {
- struct aclk_query_metadata metadata_info;
- struct aclk_query_metadata metadata_alarms;
struct aclk_query_http_api_v2 http_api_v2;
- struct aclk_query_chart_add_del chart_add_del;
struct aclk_bin_payload bin_payload;
- json_object *alarm_update;
} data;
};
diff --git a/aclk/aclk_rrdhost_state.h b/aclk/aclk_rrdhost_state.h
index 9138123df..5c8a2ddc9 100644
--- a/aclk/aclk_rrdhost_state.h
+++ b/aclk/aclk_rrdhost_state.h
@@ -3,43 +3,9 @@
#include "libnetdata/libnetdata.h"
-#ifdef ACLK_LEGACY
-typedef enum aclk_cmd {
- ACLK_CMD_CLOUD,
- ACLK_CMD_ONCONNECT,
- ACLK_CMD_INFO,
- ACLK_CMD_CHART,
- ACLK_CMD_CHARTDEL,
- ACLK_CMD_ALARM,
- ACLK_CMD_CLOUD_QUERY_2,
- ACLK_CMD_CHILD_CONNECT,
- ACLK_CMD_CHILD_DISCONNECT
-} ACLK_CMD;
-
-typedef enum aclk_metadata_state {
- ACLK_METADATA_REQUIRED,
- ACLK_METADATA_CMD_QUEUED,
- ACLK_METADATA_SENT
-} ACLK_METADATA_STATE;
-#endif
-
-typedef enum aclk_agent_state {
- ACLK_HOST_INITIALIZING,
- ACLK_HOST_STABLE
-} ACLK_AGENT_STATE;
-
typedef struct aclk_rrdhost_state {
char *claimed_id; // Claimed ID if host has one otherwise NULL
char *prev_claimed_id; // Claimed ID if changed (reclaimed) during runtime
-
-#ifdef ACLK_LEGACY
- // per child popcorning
- ACLK_AGENT_STATE state;
- ACLK_METADATA_STATE metadata;
-
- time_t timestamp_created;
- time_t t_last_popcorn_update;
-#endif /* ACLK_LEGACY */
} aclk_rrdhost_state;
#endif /* ACLK_RRDHOST_STATE_H */
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 27f1bf2dc..e6ed332cc 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -6,6 +6,8 @@
#include "aclk_query_queue.h"
#include "aclk.h"
+#include "schema-wrappers/proto_2_json.h"
+
#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
#define ACLK_CLOUD_REQ_V2_PREFIX "GET /"
@@ -55,19 +57,19 @@ static int cloud_to_agent_parse(JSON_ENTRY *e)
break;
case JSON_NUMBER:
if (!strcmp(e->name, "version")) {
- data->version = e->data.number;
+ data->version = (int)e->data.number;
break;
}
if (!strcmp(e->name, "timeout")) {
- data->timeout = e->data.number;
+ data->timeout = (int)e->data.number;
break;
}
if (!strcmp(e->name, "min-version")) {
- data->min_version = e->data.number;
+ data->min_version = (int)e->data.number;
break;
}
if (!strcmp(e->name, "max-version")) {
- data->max_version = e->data.number;
+ data->max_version = (int)e->data.number;
break;
}
@@ -116,20 +118,8 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
return 0;
}
-#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
- if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\
- debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
- ACLK_SHARED_STATE_UNLOCK;\
- return 1;\
- }\
- ACLK_SHARED_STATE_UNLOCK;
-
static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
- if (!aclk_use_new_cloud_arch) {
- HTTP_CHECK_AGENT_INITIALIZED();
- }
-
aclk_query_t query;
errno = 0;
@@ -229,7 +219,6 @@ err_cleanup:
return 1;
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
typedef uint32_t simple_hash_t;
typedef int(*rx_msg_handler)(const char *msg, size_t msg_len);
@@ -300,6 +289,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
}
}
+ struct capability caps[] = {
+ { .name = "proto", .version = 1, .enabled = 1 },
+ { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
+ { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
+ { .name = NULL, .version = 0, .enabled = 0 }
+ };
+ node_state_update.capabilities = caps;
+
rrdhost_aclk_state_lock(localhost);
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
@@ -324,6 +322,7 @@ int send_node_instances(const char *msg, size_t msg_len)
int stream_charts_and_dimensions(const char *msg, size_t msg_len)
{
+ aclk_ctx_based = 0;
stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len);
if (!res.claim_id || !res.node_id) {
error("Error parsing StreamChartsAndDimensions msg");
@@ -437,6 +436,41 @@ int handle_disconnect_req(const char *msg, size_t msg_len)
return 0;
}
+int contexts_checkpoint(const char *msg, size_t msg_len)
+{
+ aclk_ctx_based = 1;
+
+ struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_checkpoint_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
+int stop_streaming_contexts(const char *msg, size_t msg_len)
+{
+ if (!aclk_ctx_based) {
+ error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message");
+ return 1;
+ }
+
+ struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len);
+ if (!cmd)
+ return 1;
+
+ rrdcontext_hub_stop_streaming_command(cmd);
+
+ freez(cmd->claim_id);
+ freez(cmd->node_id);
+ freez(cmd);
+ return 0;
+}
+
typedef struct {
const char *name;
simple_hash_t name_hash;
@@ -455,6 +489,8 @@ new_cloud_rx_msg_t rx_msgs[] = {
{ .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
{ .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
{ .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },
+ { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint },
+ { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts },
{ .name = NULL, .name_hash = 0, .fnc = NULL },
};
@@ -491,7 +527,7 @@ unsigned int aclk_init_rx_msg_handlers(void)
return i;
}
-void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len)
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic)
{
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
@@ -509,6 +545,17 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
}
return;
}
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if (!strncmp(message_type, "cmd", strlen("cmd"))) {
+ log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name);
+ } else {
+ char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name);
+ log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name);
+ freez(json);
+ }
+#endif
+
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++;
@@ -524,4 +571,3 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
return;
}
}
-#endif
diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h
index 00f88c6a8..61921faec 100644
--- a/aclk/aclk_rx_msgs.h
+++ b/aclk/aclk_rx_msgs.h
@@ -10,10 +10,8 @@
int aclk_handle_cloud_cmd_message(char *payload);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
const char *rx_handler_get_name(size_t i);
unsigned int aclk_init_rx_msg_handlers(void);
-void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len);
-#endif
+void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic);
#endif /* ACLK_RX_MSGS_H */
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index ca0532638..241e9b724 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -8,11 +8,9 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
struct {
int query_thread_count;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
unsigned int proto_hdl_cnt;
uint32_t *aclk_proto_rx_msgs_sample;
RRDDIM **rx_msg_dims;
-#endif
} aclk_stats_cfg; // there is only 1 stats thread at a time
// data ACLK stats need per query thread
@@ -237,7 +235,6 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
const char *rx_handler_get_name(size_t i);
static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
{
@@ -259,35 +256,53 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
rrdset_done(st);
}
-#endif
-void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
+static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
{
-#ifndef ENABLE_NEW_CLOUD_PROTOCOL
- UNUSED(proto_hdl_cnt);
-#endif
+ static RRDSET *st = NULL;
+ static RRDDIM *rd_sent = NULL;
+ static RRDDIM *rd_recvd = NULL;
+ static uint64_t sent = 0;
+ static uint64_t recvd = 0;
+
+ sent += stats->bytes_tx;
+ recvd += stats->bytes_rx;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata", "aclk_openssl_bytes", NULL, "aclk", NULL, "Received and Sent bytes.", "B/s",
+ "netdata", "stats", 200011, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
+
+ rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ } else
+ rrdset_next(st);
+
+ rrddim_set_by_pointer(st, rd_sent, sent);
+ rrddim_set_by_pointer(st, rd_recvd, recvd);
+ rrdset_done(st);
+}
+
+void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
+{
aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt;
aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*));
-#endif
}
void aclk_stats_thread_cleanup()
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
freez(aclk_stats_cfg.rx_msg_dims);
freez(aclk_proto_rx_msgs_sample);
freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
-#endif
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
@@ -318,10 +333,10 @@ void *aclk_stats_main_thread(void *ptr)
// to not hold lock longer than necessary, especially not to hold it
// during database rrd* operations
memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+
memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
-#endif
+
memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
@@ -343,9 +358,10 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_query_time(&per_sample);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client);
+ aclk_stats_mqtt_wss(&mqtt_wss_stats);
+
aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
-#endif
}
return 0;
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index 4f2894798..bec9ac247 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -6,6 +6,7 @@
#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
#include "aclk_query_queue.h"
+#include "mqtt_wss_client.h"
#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
@@ -22,6 +23,7 @@ int aclk_cloud_req_http_type_to_idx(const char *name);
struct aclk_stats_thread {
netdata_thread_t *thread;
int query_thread_count;
+ mqtt_wss_client client;
};
// preserve between samples
@@ -60,9 +62,7 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_q_process_max;
} aclk_metrics_per_sample;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
extern uint32_t *aclk_proto_rx_msgs_sample;
-#endif
extern uint32_t *aclk_queries_per_thread;
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 3530dccff..822a90fa2 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -6,6 +6,8 @@
#include "aclk_stats.h"
#include "aclk.h"
+#include "schema-wrappers/proto_2_json.h"
+
#ifndef __GNUC__
#pragma region aclk_tx_msgs helper functions
#endif
@@ -13,29 +15,6 @@
// version for aclk legacy (old cloud arch)
#define ACLK_VERSION 2
-static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
-{
- uint16_t packet_id;
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- const char *topic = aclk_get_topic(subtopic);
-
- if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting message send");
- return;
- }
-
- mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
-#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif
-}
-
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
#ifndef ACLK_LOG_CONVERSATION_DIR
@@ -56,42 +35,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
+ char *json = protomsg_to_json(msg, msg_len, msgname);
+ log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
+ freez(json);
#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname);
- FILE *fptr;
- if (fptr = fopen(filename,"w")) {
- fwrite(msg, msg_len, 1, fptr);
- fclose(fptr);
- }
-#endif
-
- return packet_id;
-}
-
-static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
-{
- uint16_t packet_id;
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- const char *topic = aclk_get_topic(subtopic);
-
- if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting message send");
- return 0;
- }
- mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
-#endif
-#ifdef ACLK_LOG_CONVERSATION_DIR
-#define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
-#endif
return packet_id;
}
@@ -231,17 +179,6 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time
return obj;
}
-static char *create_uuid()
-{
- uuid_t uuid;
- char *uuid_str = mallocz(36 + 1);
-
- uuid_generate(uuid);
- uuid_unparse(uuid, uuid_str);
-
- return uuid_str;
-}
-
#ifndef __GNUC__
#pragma endregion
#endif
@@ -250,90 +187,6 @@ static char *create_uuid()
#pragma region aclk_tx_msgs message generators
#endif
-/*
- * This will send the /api/v1/info
- */
-#define BUFFER_INITIAL_SIZE (1024 * 16)
-void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
-{
- BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- json_object *msg, *payload, *tmp;
-
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
- if (metadata_submitted)
- msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
- else
- msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
-
- payload = json_object_new_object();
- json_object_object_add(msg, "payload", payload);
-
- web_client_api_request_v1_info_fill_buffer(host, local_buffer);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "info", tmp);
-
- buffer_flush(local_buffer);
-
- charts2json(host, local_buffer, 1, 0);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "charts", tmp);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
-
- json_object_put(msg);
- freez(msg_id);
- buffer_free(local_buffer);
-}
-
-// TODO should include header instead
-void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
-
-void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
-{
- BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- json_object *msg, *payload, *tmp;
-
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
-
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
-
- if (metadata_submitted)
- msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
- else
- msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
-
- payload = json_object_new_object();
- json_object_object_add(msg, "payload", payload);
-
- health_alarms2json(localhost, local_buffer, 1);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "configured-alarms", tmp);
-
- buffer_flush(local_buffer);
-
- health_active_log_alarms_2json(localhost, local_buffer);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "alarms-active", tmp);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
-
- json_object_put(msg);
- freez(msg_id);
- buffer_free(local_buffer);
-}
-
void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@@ -384,80 +237,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
}
}
-void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
-{
- json_object *msg, *payload;
- BUFFER *tmp_buffer;
- RRDSET *st;
-
- st = rrdset_find(host, chart);
- if (!st)
- st = rrdset_find_byname(host, chart);
- if (!st) {
- info("FAILED to find chart %s", chart);
- return;
- }
-
- tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- rrdset2json(st, tmp_buffer, NULL, NULL, 1);
- payload = json_tokener_parse(tmp_buffer->buffer);
- if (!payload) {
- error("Failed to parse JSON from rrdset2json");
- buffer_free(tmp_buffer);
- return;
- }
-
- msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
- json_object_object_add(msg, "payload", payload);
-
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
-
- buffer_free(tmp_buffer);
- json_object_put(msg);
-}
-
-void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
-{
- // we create header here on purpose (and not send message with it already as `msg` param)
- // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
- // send message with timestamps already to Query Queue they would be incorrect at time
- // when query queue would get to send them)
- json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
- json_object_object_add(obj, "payload", msg);
-
- aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
- json_object_put(obj);
-}
-
-/*
- * Will generate disconnect message.
- * @param message if NULL it will generate LWT message (unexpected).
- * Otherwise string pointed to by this parameter will be used as
- * reason.
- */
-json_object *aclk_generate_disconnect(const char *message)
-{
- json_object *tmp, *msg;
-
- msg = create_hdr("disconnect", NULL, 0, 0, 2);
-
- tmp = json_object_new_string(message ? message : "unexpected");
- json_object_object_add(msg, "payload", tmp);
-
- return msg;
-}
-
-int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
-{
- int pid;
- json_object *msg = aclk_generate_disconnect(message);
- pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
- json_object_put(msg);
- return pid;
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
size_t len;
uint16_t pid;
@@ -469,6 +248,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
{ .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
#endif
{ .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
+ { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled },
{ .name = NULL, .version = 0, .enabled = 0 }
};
@@ -532,7 +312,6 @@ char *aclk_generate_lwt(size_t *size) {
return msg;
}
-#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
#ifndef __GNUC__
#pragma endregion
diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h
index 44281eb68..31e592410 100644
--- a/aclk/aclk_tx_msgs.h
+++ b/aclk/aclk_tx_msgs.h
@@ -11,23 +11,10 @@
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
-void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
-void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
-
void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len);
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
-void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart);
-
-void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg);
-
-json_object *aclk_generate_disconnect(const char *message);
-int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message);
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
-// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
char *aclk_generate_lwt(size_t *size);
-#endif
#endif
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 430925460..ec021aec5 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -4,7 +4,6 @@
#include "daemon/common.h"
-int aclk_use_new_cloud_arch = 0;
usec_t aclk_session_newarch = 0;
aclk_env_t *aclk_env = NULL;
@@ -124,22 +123,17 @@ struct topic_name {
{ .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" },
{ .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" },
{ .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" },
+ { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" },
+ { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" },
+ { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" },
{ .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
};
-enum aclk_topics compulsory_topics_legacy[] = {
- ACLK_TOPICID_CHART,
- ACLK_TOPICID_ALARMS,
- ACLK_TOPICID_METADATA,
- ACLK_TOPICID_COMMAND,
- ACLK_TOPICID_UNKNOWN
-};
-
-enum aclk_topics compulsory_topics_new_cloud_arch[] = {
+enum aclk_topics compulsory_topics[] = {
// TODO remove old topics once not needed anymore
- ACLK_TOPICID_CHART,
- ACLK_TOPICID_ALARMS,
- ACLK_TOPICID_METADATA,
+ ACLK_TOPICID_CHART, //TODO from legacy
+ ACLK_TOPICID_ALARMS, //TODO from legacy
+ ACLK_TOPICID_METADATA, //TODO from legacy
ACLK_TOPICID_COMMAND,
ACLK_TOPICID_AGENT_CONN,
ACLK_TOPICID_CMD_NG_V1,
@@ -154,6 +148,9 @@ enum aclk_topics compulsory_topics_new_cloud_arch[] = {
ACLK_TOPICID_ALARM_HEALTH,
ACLK_TOPICID_ALARM_CONFIG,
ACLK_TOPICID_ALARM_SNAPSHOT,
+ ACLK_TOPICID_NODE_COLLECTORS,
+ ACLK_TOPICID_CTXS_SNAPSHOT,
+ ACLK_TOPICID_CTXS_UPDATED,
ACLK_TOPICID_UNKNOWN
};
@@ -279,8 +276,6 @@ int aclk_generate_topic_cache(struct json_object *json)
}
}
- enum aclk_topics *compulsory_topics = aclk_use_new_cloud_arch ? compulsory_topics_new_cloud_arch : compulsory_topics_legacy;
-
for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) {
if (!aclk_get_topic(compulsory_topics[i])) {
error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i]));
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index fb0492ac8..ed715e046 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -20,7 +20,6 @@
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK
-extern int aclk_use_new_cloud_arch;
extern usec_t aclk_session_newarch;
extern int chart_batch_id;
@@ -88,7 +87,10 @@ enum aclk_topics {
ACLK_TOPICID_ALARM_LOG = 14,
ACLK_TOPICID_ALARM_HEALTH = 15,
ACLK_TOPICID_ALARM_CONFIG = 16,
- ACLK_TOPICID_ALARM_SNAPSHOT = 17
+ ACLK_TOPICID_ALARM_SNAPSHOT = 17,
+ ACLK_TOPICID_NODE_COLLECTORS = 18,
+ ACLK_TOPICID_CTXS_SNAPSHOT = 19,
+ ACLK_TOPICID_CTXS_UPDATED = 20
};
const char *aclk_get_topic(enum aclk_topics topic);
diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc
index 338e512d8..f64393300 100644
--- a/aclk/schema-wrappers/alarm_stream.cc
+++ b/aclk/schema-wrappers/alarm_stream.cc
@@ -118,6 +118,7 @@ void destroy_alarm_log_entry(struct alarm_log_entry *entry)
freez(entry->old_value_string);
freez(entry->rendered_info);
+ freez(entry->chart_context);
}
static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto)
@@ -166,6 +167,8 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr
proto->set_updated(data->updated);
proto->set_rendered_info(data->rendered_info);
+
+ proto->set_chart_context(data->chart_context);
}
char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h
index 2932bb192..63911da3f 100644
--- a/aclk/schema-wrappers/alarm_stream.h
+++ b/aclk/schema-wrappers/alarm_stream.h
@@ -97,6 +97,8 @@ struct alarm_log_entry {
// rendered_info
char *rendered_info;
+
+ char *chart_context;
};
struct send_alarm_snapshot {
diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc
index 7d820e533..54c940758 100644
--- a/aclk/schema-wrappers/chart_stream.cc
+++ b/aclk/schema-wrappers/chart_stream.cc
@@ -76,7 +76,7 @@ void chart_instance_updated_destroy(struct chart_instance_updated *instance)
freez((char*)instance->id);
freez((char*)instance->claim_id);
- free_label_list(instance->label_head);
+ rrdlabels_destroy(instance->chart_labels);
freez((char*)instance->config_hash);
}
@@ -85,7 +85,6 @@ static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, co
{
google::protobuf::Map<std::string, std::string> *map;
aclk_lib::v1::ACLKMessagePosition *pos;
- struct label *label;
chart->set_id(update->id);
chart->set_claim_id(update->claim_id);
@@ -93,11 +92,7 @@ static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, co
chart->set_name(update->name);
map = chart->mutable_chart_labels();
- label = update->label_head;
- while (label) {
- map->insert({label->key, label->value});
- label = label->next;
- }
+ rrdlabels_walkthrough_read(update->chart_labels, label_add_to_map_callback, map);
switch (update->memory_mode) {
case RRD_MEMORY_MODE_NONE:
diff --git a/aclk/schema-wrappers/chart_stream.h b/aclk/schema-wrappers/chart_stream.h
index 7a46ecd8e..904866868 100644
--- a/aclk/schema-wrappers/chart_stream.h
+++ b/aclk/schema-wrappers/chart_stream.h
@@ -57,7 +57,7 @@ struct chart_instance_updated {
const char *node_id;
const char *name;
- struct label *label_head;
+ DICTIONARY *chart_labels;
RRD_MEMORY_MODE memory_mode;
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc
new file mode 100644
index 000000000..b04c9d20c
--- /dev/null
+++ b/aclk/schema-wrappers/context.cc
@@ -0,0 +1,125 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+#include "context.h"
+
+using namespace context::v1;
+
+// ContextsSnapshot
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version)
+{
+ ContextsSnapshot *ctxs_snap = new ContextsSnapshot;
+
+ if (ctxs_snap == NULL)
+ fatal("Cannot allocate ContextsSnapshot object. OOM");
+
+ ctxs_snap->set_claim_id(claim_id);
+ ctxs_snap->set_node_id(node_id);
+ ctxs_snap->set_version(version);
+
+ return ctxs_snap;
+}
+
+void contexts_snapshot_delete(contexts_snapshot_t snapshot)
+{
+ delete (ContextsSnapshot *)snapshot;
+}
+
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version)
+{
+ ((ContextsSnapshot *)ctxs_snapshot)->set_version(version);
+}
+
+static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx)
+{
+ ctx->set_id(c_ctx->id);
+ ctx->set_version(c_ctx->version);
+ ctx->set_first_entry(c_ctx->first_entry);
+ ctx->set_last_entry(c_ctx->last_entry);
+ ctx->set_deleted(c_ctx->deleted);
+ ctx->set_title(c_ctx->title);
+ ctx->set_priority(c_ctx->priority);
+ ctx->set_chart_type(c_ctx->chart_type);
+ ctx->set_units(c_ctx->units);
+ ctx->set_family(c_ctx->family);
+}
+
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ ContextUpdated *ctx = ctxs_snap->add_contexts();
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_snap->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_snap;
+ return NULL;
+ }
+
+ delete ctxs_snap;
+ return bin;
+}
+
+// ContextsUpdated
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at)
+{
+ ContextsUpdated *ctxs_updated = new ContextsUpdated;
+
+ if (ctxs_updated == NULL)
+ fatal("Cannot allocate ContextsUpdated object. OOM");
+
+ ctxs_updated->set_claim_id(claim_id);
+ ctxs_updated->set_node_id(node_id);
+ ctxs_updated->set_version_hash(version_hash);
+ ctxs_updated->set_created_at(created_at);
+
+ return ctxs_updated;
+}
+
+void contexts_updated_delete(contexts_updated_t ctxs_updated)
+{
+ delete (ContextsUpdated *)ctxs_updated;
+}
+
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash)
+{
+ ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash);
+}
+
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ ContextUpdated *ctx = ctxs_update->add_contextupdates();
+
+ if (ctx == NULL)
+ fatal("Cannot allocate ContextUpdated object. OOM");
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_update->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_update;
+ return NULL;
+ }
+
+ delete ctxs_update;
+ return bin;
+}
diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h
new file mode 100644
index 000000000..cbb7701a8
--- /dev/null
+++ b/aclk/schema-wrappers/context.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_H
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void* contexts_updated_t;
+typedef void* contexts_snapshot_t;
+
+struct context_updated {
+ // context id
+ const char *id;
+
+ uint64_t version;
+
+ uint64_t first_entry;
+ uint64_t last_entry;
+
+ int deleted;
+
+ const char *title;
+ uint64_t priority;
+ const char *chart_type;
+ const char *units;
+ const char *family;
+};
+
+// ContextS Snapshot related
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version);
+void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
+
+// ContextS Updated related
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
+void contexts_updated_delete(contexts_updated_t ctxs_updated);
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */
diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc
new file mode 100644
index 000000000..3bb1956cb
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.cc
@@ -0,0 +1,42 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/stream.pb.h"
+
+#include "context_stream.h"
+
+#include "libnetdata/libnetdata.h"
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len)
+{
+ context::v1::StopStreamingContexts msg;
+
+ struct stop_streaming_ctxs *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+
+ return res;
+}
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len)
+{
+ context::v1::ContextsCheckpoint msg;
+
+ struct ctxs_checkpoint *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+ res->version_hash = msg.version_hash();
+
+ return res;
+}
diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h
new file mode 100644
index 000000000..8c691d2cc
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct stop_streaming_ctxs {
+ char *claim_id;
+ char *node_id;
+ // we omit reason as there is only one defined at this point
+ // as soon as there is more than one defined in StopStreaminContextsReason
+ // we should add it
+ // 0 - RATE_LIMIT_EXCEEDED
+};
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len);
+
+struct ctxs_checkpoint {
+ char *claim_id;
+ char *node_id;
+
+ uint64_t version_hash;
+};
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len);
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
index 0a4c8ece1..a6ca8ef98 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -28,6 +28,15 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
timestamp->set_seconds(tv.tv_sec);
timestamp->set_nanos(tv.tv_usec * 1000);
+ if (data->capabilities) {
+ struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)malloc(*len);
if (bin)
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
index 3fd207213..c27729d15 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/aclk/schema-wrappers/node_connection.h
@@ -3,6 +3,8 @@
#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+#include "capability.h"
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -17,6 +19,7 @@ typedef struct {
int64_t session_id;
int32_t hops;
+ struct capability *capabilities;
} node_instance_connection_t;
char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc
index f66985246..2a05ddaba 100644
--- a/aclk/schema-wrappers/node_info.cc
+++ b/aclk/schema-wrappers/node_info.cc
@@ -6,7 +6,6 @@
static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data)
{
- struct label *label;
google::protobuf::Map<std::string, std::string> *map;
if (data->name)
@@ -56,9 +55,6 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl
if (data->custom_info)
info->set_custom_info(data->custom_info);
- for (size_t i = 0; i < data->service_count; i++)
- info->add_services(data->services[i]);
-
if (data->machine_guid)
info->set_machine_guid(data->machine_guid);
@@ -67,12 +63,7 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl
ml_info->set_ml_enabled(data->ml_info.ml_enabled);
map = info->mutable_host_labels();
- label = data->host_labels_head;
- while (label) {
- map->insert({label->key, label->value});
- label = label->next;
- }
-
+ rrdlabels_walkthrough_read(data->host_labels_ptr, label_add_to_map_callback, map);
return 0;
}
@@ -119,3 +110,27 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in
return bin;
}
+
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *upd_node_collectors)
+{
+ nodeinstance::info::v1::UpdateNodeCollectors msg;
+
+ msg.set_node_id(upd_node_collectors->node_id);
+ msg.set_claim_id(upd_node_collectors->claim_id);
+
+ void *colls;
+ dfe_start_read(upd_node_collectors->node_collectors, colls) {
+ struct collector_info *c =(struct collector_info *)colls;
+ nodeinstance::info::v1::CollectorInfo *col = msg.add_collectors();
+ col->set_plugin(c->plugin);
+ col->set_module(c->module);
+ }
+ dfe_done(colls);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)malloc(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h
index e67f3e1da..e8ac2d7c6 100644
--- a/aclk/schema-wrappers/node_info.h
+++ b/aclk/schema-wrappers/node_info.h
@@ -4,9 +4,10 @@
#define ACLK_SCHEMA_WRAPPER_NODE_INFO_H
#include <stdlib.h>
+#include <stdint.h>
-#include "database/rrd.h"
#include "capability.h"
+#include "database/rrd.h"
#ifdef __cplusplus
extern "C" {
@@ -49,12 +50,9 @@ struct aclk_node_info {
char *custom_info;
- char **services;
- size_t service_count;
-
char *machine_guid;
- struct label *host_labels_head;
+ DICTIONARY *host_labels_ptr;
struct machine_learning_info ml_info;
};
@@ -73,8 +71,21 @@ struct update_node_info {
struct capability *node_instance_capabilities;
};
+struct collector_info {
+ char *module;
+ char *plugin;
+};
+
+struct update_node_collectors {
+ char *claim_id;
+ char *node_id;
+ DICTIONARY *node_collectors;
+};
+
char *generate_update_node_info_message(size_t *len, struct update_node_info *info);
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *collectors);
+
#ifdef __cplusplus
}
#endif
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc
new file mode 100644
index 000000000..0e473eb6c
--- /dev/null
+++ b/aclk/schema-wrappers/proto_2_json.cc
@@ -0,0 +1,101 @@
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "proto/alarm/v1/config.pb.h"
+#include "proto/alarm/v1/stream.pb.h"
+#include "proto/aclk/v1/lib.pb.h"
+#include "proto/chart/v1/config.pb.h"
+#include "proto/chart/v1/stream.pb.h"
+#include "proto/agent/v1/connection.pb.h"
+#include "proto/agent/v1/disconnect.pb.h"
+#include "proto/nodeinstance/connection/v1/connection.pb.h"
+#include "proto/nodeinstance/create/v1/creation.pb.h"
+#include "proto/nodeinstance/info/v1/info.pb.h"
+#include "proto/context/v1/stream.pb.h"
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "proto_2_json.h"
+
+using namespace google::protobuf::util;
+
+static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
+{
+//tx side
+ if (!strcmp(msgname, "UpdateAgentConnection"))
+ return new agent::v1::UpdateAgentConnection;
+ if (!strcmp(msgname, "UpdateNodeInstanceConnection"))
+ return new nodeinstance::v1::UpdateNodeInstanceConnection;
+ if (!strcmp(msgname, "CreateNodeInstance"))
+ return new nodeinstance::create::v1::CreateNodeInstance;
+ if (!strcmp(msgname, "ChartsAndDimensionsUpdated"))
+ return new chart::v1::ChartsAndDimensionsUpdated;
+ if (!strcmp(msgname, "ChartConfigsUpdated"))
+ return new chart::v1::ChartConfigsUpdated;
+ if (!strcmp(msgname, "ResetChartMessages"))
+ return new chart::v1::ResetChartMessages;
+ if (!strcmp(msgname, "RetentionUpdated"))
+ return new chart::v1::RetentionUpdated;
+ if (!strcmp(msgname, "UpdateNodeInfo"))
+ return new nodeinstance::info::v1::UpdateNodeInfo;
+ if (!strcmp(msgname, "AlarmLogHealth"))
+ return new alarms::v1::AlarmLogHealth;
+ if (!strcmp(msgname, "ProvideAlarmConfiguration"))
+ return new alarms::v1::ProvideAlarmConfiguration;
+ if (!strcmp(msgname, "AlarmSnapshot"))
+ return new alarms::v1::AlarmSnapshot;
+ if (!strcmp(msgname, "AlarmLogEntry"))
+ return new alarms::v1::AlarmLogEntry;
+ if (!strcmp(msgname, "UpdateNodeCollectors"))
+ return new nodeinstance::info::v1::UpdateNodeCollectors;
+ if (!strcmp(msgname, "ContextsUpdated"))
+ return new context::v1::ContextsUpdated;
+ if (!strcmp(msgname, "ContextsSnapshot"))
+ return new context::v1::ContextsSnapshot;
+
+//rx side
+ if (!strcmp(msgname, "CreateNodeInstanceResult"))
+ return new nodeinstance::create::v1::CreateNodeInstanceResult;
+ if (!strcmp(msgname, "SendNodeInstances"))
+ return new agent::v1::SendNodeInstances;
+ if (!strcmp(msgname, "StreamChartsAndDimensions"))
+ return new chart::v1::StreamChartsAndDimensions;
+ if (!strcmp(msgname, "ChartsAndDimensionsAck"))
+ return new chart::v1::ChartsAndDimensionsAck;
+ if (!strcmp(msgname, "UpdateChartConfigs"))
+ return new chart::v1::UpdateChartConfigs;
+ if (!strcmp(msgname, "StartAlarmStreaming"))
+ return new alarms::v1::StartAlarmStreaming;
+ if (!strcmp(msgname, "SendAlarmLogHealth"))
+ return new alarms::v1::SendAlarmLogHealth;
+ if (!strcmp(msgname, "SendAlarmConfiguration"))
+ return new alarms::v1::SendAlarmConfiguration;
+ if (!strcmp(msgname, "SendAlarmSnapshot"))
+ return new alarms::v1::SendAlarmSnapshot;
+ if (!strcmp(msgname, "DisconnectReq"))
+ return new agent::v1::DisconnectReq;
+ if (!strcmp(msgname, "ContextsCheckpoint"))
+ return new context::v1::ContextsCheckpoint;
+ if (!strcmp(msgname, "StopStreamingContexts"))
+ return new context::v1::StopStreamingContexts;
+
+ return NULL;
+}
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname)
+{
+ google::protobuf::Message *msg = msg_name_to_protomsg(msgname);
+ if (msg == NULL)
+ return strdupz("Don't know this message type by name.");
+
+ if (!msg->ParseFromArray(protobin, len))
+ return strdupz("Can't parse this message. Malformed or wrong parser used.");
+
+ JsonPrintOptions options;
+
+ std::string output;
+ google::protobuf::util::MessageToJsonString(*msg, &output, options);
+ delete msg;
+ return strdupz(output.c_str());
+}
diff --git a/aclk/schema-wrappers/proto_2_json.h b/aclk/schema-wrappers/proto_2_json.h
new file mode 100644
index 000000000..3bd98478c
--- /dev/null
+++ b/aclk/schema-wrappers/proto_2_json.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef PROTO_2_JSON_H
+#define PROTO_2_JSON_H
+
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROTO_2_JSON_H */
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/aclk/schema-wrappers/schema_wrapper_utils.cc
index b100e20c3..6573e6299 100644
--- a/aclk/schema-wrappers/schema_wrapper_utils.cc
+++ b/aclk/schema-wrappers/schema_wrapper_utils.cc
@@ -13,3 +13,10 @@ void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, st
tv->tv_sec = ts.seconds();
tv->tv_usec = ts.nanos()/1000;
}
+
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ (void)ls;
+ auto map = (google::protobuf::Map<std::string, std::string> *)data;
+ map->insert({name, value});
+ return 1;
+}
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h
index 494855f82..2815d0f20 100644
--- a/aclk/schema-wrappers/schema_wrapper_utils.h
+++ b/aclk/schema-wrappers/schema_wrapper_utils.h
@@ -3,8 +3,11 @@
#ifndef SCHEMA_WRAPPER_UTILS_H
#define SCHEMA_WRAPPER_UTILS_H
+#include "database/rrd.h"
+
#include <sys/time.h>
#include <google/protobuf/timestamp.pb.h>
+#include <google/protobuf/map.h>
#if GOOGLE_PROTOBUF_VERSION < 3001000
#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize();
@@ -16,5 +19,6 @@
void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts);
void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv);
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data);
#endif /* SCHEMA_WRAPPER_UTILS_H */
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
index a3248a69b..26412cacc 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -14,5 +14,7 @@
#include "alarm_stream.h"
#include "node_info.h"
#include "capability.h"
+#include "context_stream.h"
+#include "context.h"
#endif /* SCHEMA_WRAPPERS_H */