diff options
Diffstat (limited to '')
42 files changed, 827 insertions, 1178 deletions
diff --git a/aclk/README.md b/aclk/README.md index f595726e3..6f541c38e 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -11,15 +11,25 @@ The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port `443`. The ACLK is encrypted, safe, and _is only established if you connect your node_. -The Cloud App lives at app.netdata.cloud which currently resolves to 35.196.244.138. However, this IP or range of -IPs can change without notice. Watch this page for updates. +The Cloud App lives at app.netdata.cloud which currently resolves to the following list of IPs: + +- 54.198.178.11 +- 44.207.131.212 +- 44.196.50.41 + +:::caution + +This list of IPs can change without notice, we strongly advise you to whitelist the domain `app.netdata.cloud`, if +this is not an option in your case always verify the current domain resolution (e.g via the `host` command). + +::: For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [get started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [connect to Cloud documentation](/claim/README.md). ## Data privacy -[Data privacy](https://netdata.cloud/data-privacy/) is very important to us. We firmly believe that your data belongs to +[Data privacy](https://netdata.cloud/privacy/) is very important to us. We firmly believe that your data belongs to you. This is why **we don't store any metric data in Netdata Cloud**. All the data that you see in the web browser when using Netdata Cloud, is actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard. @@ -50,12 +60,12 @@ You can configure following keys in the `netdata.conf` section `[cloud]`: [cloud] statistics = yes query thread count = 2 - mqtt5 = no + mqtt5 = yes ``` - `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. - `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). -- `mqtt5` enables the new MQTT5 protocol implementation in the Agent. Currently a technical preview. +- `mqtt5` allows disabling the new MQTT5 implementation which is used now by default in case of issues. This option will be removed in future stable release. ## Disable the ACLK diff --git a/aclk/aclk.c b/aclk/aclk.c index 6426c5b5e..7b3641b1e 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -10,7 +10,6 @@ #include "aclk_query_queue.h" #include "aclk_util.h" #include "aclk_rx_msgs.h" -#include "aclk_collector_list.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" @@ -46,17 +45,29 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { - .agent_state = ACLK_HOST_INITIALIZING, - .last_popcorn_interrupt = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +OSSL_DECODER_CTX *aclk_dctx = NULL; +EVP_PKEY *aclk_private_key = NULL; +#else static RSA *aclk_private_key = NULL; +#endif static int load_private_key() { - if (aclk_private_key != NULL) + if (aclk_private_key != NULL) { +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + EVP_PKEY_free(aclk_private_key); + if (aclk_dctx) + OSSL_DECODER_CTX_free(aclk_dctx); + + aclk_dctx = NULL; +#else RSA_free(aclk_private_key); +#endif + } aclk_private_key = NULL; char filename[FILENAME_MAX + 1]; snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); @@ -75,7 +86,25 @@ static int load_private_key() goto biofailed; } +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL, + "RSA", + OSSL_KEYMGMT_SELECT_PRIVATE_KEY, + NULL, NULL); + + if (!aclk_dctx) { + error("Loading private key (from claiming) failed - no OpenSSL Decoders found"); + goto biofailed; + } + + // this is necesseary to avoid RSA key with wrong size + if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) { + error("Decoding private key (from claiming) failed - invalid format."); + goto biofailed; + } +#else aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); +#endif BIO_free(key_bio); if (aclk_private_key!=NULL) { @@ -112,12 +141,12 @@ static int wait_till_cloud_enabled() static int wait_till_agent_claimed(void) { //TODO prevent malloc and freez - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); while (likely(!agent_id)) { sleep_usec(USEC_PER_SEC * 1); if (netdata_exit) return 1; - agent_id = is_agent_claimed(); + agent_id = get_agent_claimid(); } freez(agent_id); return 0; @@ -188,54 +217,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) //TODO prevent big buffer on stack #define RX_MSGLEN_MAX 4096 -static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) -{ - UNUSED(qos); - char cmsg[RX_MSGLEN_MAX]; - size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); - const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); - if (!cmd_topic) { - error("Error retrieving command topic"); - return; - } - - if (msglen > RX_MSGLEN_MAX - 1) - error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); - - memcpy(cmsg, - msg, - len); - cmsg[len] = 0; - -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 512 - char filename[FN_MAX_LEN]; - int logfd; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT()); - logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); - if(logfd < 0) - error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); - write(logfd, msg, msglen); - close(logfd); -#endif - - debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg); - - if (strcmp(cmd_topic, topic)) - error("Received message on unexpected topic %s", topic); - - if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring incoming message."); - return; - } - - aclk_handle_cloud_cmd_message(cmsg); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); + aclk_rcvd_cloud_msgs++; if (msglen > RX_MSGLEN_MAX) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -269,17 +254,8 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t close(logfd); #endif - aclk_handle_new_cloud_msg(msgtype, msg, msglen); -} - -static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { - aclk_rcvd_cloud_msgs++; - if (aclk_use_new_cloud_arch) - msg_callback_new_protocol(topic, msg, msglen, qos); - else - msg_callback_old_protocol(topic, msg, msglen, qos); + aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic); } -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ static void puback_callback(uint16_t packet_id) { @@ -356,40 +332,6 @@ static int handle_connection(mqtt_wss_client client) return 0; } -inline static int aclk_popcorn_check() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -inline static int aclk_popcorn_check_bump() -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); - ACLK_SHARED_STATE_UNLOCK; - return 1; - } - ACLK_SHARED_STATE_UNLOCK; - return 0; -} - -static inline void queue_connect_payloads(void) -{ - aclk_query_t query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; - query->data.metadata_info.initial_on_connect = 1; - aclk_queue_query(query); - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 1; - aclk_queue_query(query); -} - static inline void mqtt_connected_actions(mqtt_wss_client client) { char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -399,15 +341,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); - if (!topic) - error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); - else - mqtt_wss_subscribe(client, topic, 1); - } -#endif + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); aclk_stats_upd_online(1); aclk_connected = 1; @@ -415,55 +353,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (!aclk_use_new_cloud_arch) { -#endif - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); - } - ACLK_SHARED_STATE_UNLOCK; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } else { - aclk_send_agent_connection_update(client, 1); - } -#endif -} - -/* Waits until agent is ready or needs to exit - * @param client instance of mqtt_wss_client - * @param query_threads pointer to aclk_query_threads - * structure where to store data about started query threads - * @return 0 - Popcorning Finished - Agent STABLE, - * !0 - netdata_exit - */ -static int wait_popcorning_finishes() -{ - time_t elapsed; - int need_wait; - if (aclk_use_new_cloud_arch) - return 0; - - while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; - if (elapsed >= ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = ACLK_HOST_STABLE; - ACLK_SHARED_STATE_UNLOCK; - error("ACLK localhost popcorn timer finished"); - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - need_wait = ACLK_STABLE_TIMEOUT - elapsed; - error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait); - sleep(need_wait); - } - return 1; + aclk_send_agent_connection_update(client, 1); } void aclk_graceful_disconnect(mqtt_wss_client client) @@ -471,12 +361,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client) info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); - else -#endif - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { @@ -594,8 +480,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt = NULL; - while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { @@ -629,8 +513,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .drop_on_publish_fail = 1 }; - aclk_use_new_cloud_arch = 0; - #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { aclk_env_t_destroy(aclk_env); @@ -649,20 +531,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (netdata_exit) return 1; - if (aclk_env->encoding == ACLK_ENC_PROTO) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); + if (aclk_env->encoding != ACLK_ENC_PROTO) { + error_report("This agent can only use the new cloud protocol but cloud requested old one."); continue; -#else - if (!aclk_env_has_capa("proto")) { - error ("Can't encoding=proto without at least \"proto\" capability."); - continue; - } - info("Switching ACLK to new protobuf protocol. Due to /env response."); - aclk_use_new_cloud_arch = 1; -#endif } + if (!aclk_env_has_capa("proto")) { + error ("Can't use encoding=proto without at least \"proto\" capability."); + continue; + } + info("New ACLK protobuf protocol negotiated successfully (/env response)."); + memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); @@ -679,10 +558,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - if (aclk_use_new_cloud_arch) - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); - else - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); @@ -708,17 +584,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; aclk_session_us = aclk_session_newarch % USEC_PER_SEC; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - if (aclk_use_new_cloud_arch) { - mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); - } else { -#endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - } -#endif + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -732,10 +598,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - if (aclk_use_new_cloud_arch) - freez((char *)mqtt_conn_params.will_msg); - else - json_object_put(lwt); + freez((char *)mqtt_conn_params.will_msg); if (!ret) { last_conn_time_mqtt = now_realtime_sec(); @@ -778,10 +641,7 @@ void *aclk_main(void *ptr) return NULL; } - unsigned int proto_hdl_cnt; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - proto_hdl_cnt = aclk_init_rx_msg_handlers(); -#endif + unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers(); // This thread is unusual in that it cannot be cancelled by cancel_main_threads() // as it must notify the far end that it shutdown gracefully and avoid the LWT. @@ -792,7 +652,6 @@ void *aclk_main(void *ptr) static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif - aclk_popcorn_check_bump(); // start localhost popcorn timer query_threads.count = read_query_thread_count(); if (wait_till_cloud_enabled()) @@ -801,13 +660,9 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); + use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { -#else - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) { -#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -822,6 +677,7 @@ void *aclk_main(void *ptr) stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; + stats_thread->client = mqttwss_client; aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); netdata_thread_create( stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, @@ -834,28 +690,9 @@ void *aclk_main(void *ptr) if (aclk_attempt_to_connect(mqttwss_client)) goto exit_full; -#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL) - error_report("############################ WARNING ###############################"); - error_report("# Your agent is configured to connect to cloud but has #"); - error_report("# no protobuf protocol support (uses legacy JSON protocol) #"); - error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #"); - error_report("# Visit following link for more info and instructions how to solve #"); - error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #"); - error_report("######################################################################"); -#endif - - // warning this assumes the popcorning is relative short (3s) - // if that changes call mqtt_wss_service from within - // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes()) - goto exit_full; - if (unlikely(!query_threads.thread_list)) aclk_query_threads_start(&query_threads, mqttwss_client); - if (!aclk_use_new_cloud_arch) - queue_connect_payloads(); - if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); @@ -889,168 +726,12 @@ exit: return NULL; } -// TODO this is taken over as workaround from old ACLK -// fix this in both old and new ACLK -extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); - -void aclk_alarm_reload(void) -{ - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return; - } - ACLK_SHARED_STATE_UNLOCK; - - aclk_queue_query(aclk_query_new(METADATA_ALARMS)); -} - -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) -{ - BUFFER *local_buffer; - json_object *msg; - - if (host != localhost) - return 0; - - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; - return 0; - } - ACLK_SHARED_STATE_UNLOCK; - - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - - netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - health_alarm_entry2json_nolock(local_buffer, ae, host); - netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - - msg = json_tokener_parse(local_buffer->buffer); - - struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE); - query->data.alarm_update = msg; - aclk_queue_query(query); - - buffer_free(local_buffer); - return 0; -} - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create) -{ - struct aclk_query *query; - - if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check()) - return 0; - - query = aclk_query_new(create ? CHART_NEW : CHART_DEL); - if(create) { - query->data.chart_add_del.host = host; - query->data.chart_add_del.chart_name = strdupz(chart_name); - } else { - query->data.metadata_info.host = host; - query->data.metadata_info.initial_on_connect = 0; - } - - aclk_queue_query(query); - return 0; -} - -/* - * Add a new collector to the list - * If it exists, update the chart count - */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(tmp_collector->count != 1)) { - COLLECTOR_UNLOCK; - return; - } - - COLLECTOR_UNLOCK; - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -/* - * Delete a collector from the list - * If the chart count reaches zero the collector will be removed - * from the list by calling del_collector. - * - * This function will release the memory used and schedule - * a cloud update - */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct aclk_query *query; - struct _collector *tmp_collector; - if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(!tmp_collector || tmp_collector->count)) { - COLLECTOR_UNLOCK; - return; - } - - debug( - D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", - tmp_collector->count); - - COLLECTOR_UNLOCK; - - _free_collector(tmp_collector); - - if (aclk_popcorn_check_bump()) - return; - - if (host != localhost) - return; - - query = aclk_query_new(METADATA_INFO); - query->data.metadata_info.host = localhost; //TODO - query->data.metadata_info.initial_on_connect = 0; - aclk_queue_query(query); - - query = aclk_query_new(METADATA_ALARMS); - query->data.metadata_alarms.initial_on_connect = 0; - aclk_queue_query(query); -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd) { uuid_t node_id; int ret; - if (!aclk_connected || !aclk_use_new_cloud_arch) + if (!aclk_connected) return; ret = get_node_id(&host->host_uuid, &node_id); @@ -1088,6 +769,16 @@ void aclk_host_state_update(RRDHOST *host, int cmd) }; node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(node_id, (char*)node_state_update.node_id); + + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -1120,6 +811,20 @@ void aclk_send_node_instances() }; node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id); + + char host_id[UUID_STR_LEN]; + uuid_unparse_lower(list->host_id, host_id); + + RRDHOST *host = rrdhost_find_by_guid(host_id, 0); + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -1157,14 +862,12 @@ void aclk_send_node_instances() } freez(list_head); } -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) { struct proto_alert_status status; @@ -1220,7 +923,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) ); freez(stats); } -#endif char *ng_aclk_state(void) { @@ -1231,15 +933,11 @@ char *ng_aclk_state(void) buffer_strcat(wb, "ACLK Available: Yes\n" "ACLK Version: 2\n" -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - "Protocols Supported: Legacy, Protobuf\n" -#else - "Protocols Supported: Legacy\n" -#endif + "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); if (agent_id == NULL) buffer_strcat(wb, "No\n"); else { @@ -1273,7 +971,6 @@ char *ng_aclk_state(void) if (aclk_connected) { buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL RRDHOST *host; rrd_rdlock(); rrdhost_foreach_read(host) { @@ -1308,7 +1005,6 @@ char *ng_aclk_state(void) fill_chart_status_for_host(wb, host); } rrd_unlock(); -#endif } ret = strdupz(buffer_tostring(wb)); @@ -1316,7 +1012,6 @@ char *ng_aclk_state(void) return ret; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { struct proto_alert_status status; @@ -1381,7 +1076,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) freez(stats); } -#endif static json_object *timestamp_to_json(const time_t *t) { @@ -1405,18 +1099,11 @@ char *ng_aclk_state_json(void) json_object_object_add(msg, "aclk-version", tmp); grp = json_object_new_array(); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); tmp = json_object_new_string("Protobuf"); json_object_array_add(grp, tmp); -#else - tmp = json_object_new_string("Legacy"); - json_object_array_add(grp, tmp); -#endif json_object_object_add(msg, "protocols-supported", grp); - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); tmp = json_object_new_boolean(agent_id != NULL); json_object_object_add(msg, "agent-claimed", tmp); @@ -1434,7 +1121,7 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_connected); json_object_object_add(msg, "online", tmp); - tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy"); + tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); @@ -1461,7 +1148,6 @@ char *ng_aclk_state_json(void) tmp = json_object_new_boolean(aclk_disable_runtime); json_object_object_add(msg, "banned-by-cloud", tmp); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL grp = json_object_new_array(); RRDHOST *host; @@ -1513,7 +1199,6 @@ char *ng_aclk_state_json(void) } rrd_unlock(); json_object_object_add(msg, "node-instances", grp); -#endif char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); diff --git a/aclk/aclk.h b/aclk/aclk.h index 41c4e05e4..5065ac2bf 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -21,9 +21,6 @@ extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) extern struct aclk_shared_state { - ACLK_AGENT_STATE agent_state; - time_t last_popcorn_interrupt; - // To wait for `disconnect` message PUBACK // when shutting down // at the same time if > 0 we know link is @@ -32,21 +29,8 @@ extern struct aclk_shared_state { int mqtt_shutdown_msg_rcvd; } aclk_shared_state; -void aclk_alarm_reload(void); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); - -/* Informs ACLK about created/deleted chart - * @param create 0 - if chart was deleted, other if chart created - */ -int aclk_update_chart(RRDHOST *host, char *chart_name, int create); - -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int cmd); void aclk_send_node_instances(void); -#endif void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c index a2e738ab1..141d267af 100644 --- a/aclk/aclk_api.c +++ b/aclk/aclk_api.c @@ -13,10 +13,10 @@ usec_t aclk_session_us = 0; time_t aclk_session_sec = 0; int aclk_disable_runtime = 0; -int aclk_disable_single_updates = 0; int aclk_stats_enabled; int use_mqtt_5 = 0; +int aclk_ctx_based = 0; #define ACLK_IMPL_KEY_NAME "aclk implementation" @@ -33,25 +33,17 @@ void *aclk_starter(void *ptr) { } return aclk_main(ptr); } - -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} #endif /* ENABLE_ACLK */ -struct label *add_aclk_host_labels(struct label *label) { +void add_aclk_host_labels(void) { + DICTIONARY *labels = localhost->host_labels; + #ifdef ENABLE_ACLK - label = add_label_to_list(label, "_aclk_ng_available", "true", LABEL_SOURCE_AUTO); + rrdlabels_add(labels, "_aclk_ng_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); #else - label = add_label_to_list(label, "_aclk_ng_available", "false", LABEL_SOURCE_AUTO); + rrdlabels_add(labels, "_aclk_ng_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); #endif - label = add_label_to_list(label, "_aclk_legacy_available", "false", LABEL_SOURCE_AUTO); + rrdlabels_add(labels, "_aclk_legacy_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); #ifdef ENABLE_ACLK ACLK_PROXY_TYPE aclk_proxy; char *proxy_str; @@ -69,17 +61,14 @@ struct label *add_aclk_host_labels(struct label *label) { break; } - int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO); - label = add_label_to_list(label, "_mqtt_version", mqtt5 ? "5" : "3", LABEL_SOURCE_AUTO); - label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO); - label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "true", LABEL_SOURCE_AUTO); -#else - label = add_label_to_list(label, "_aclk_ng_new_cloud_protocol", "false", LABEL_SOURCE_AUTO); -#endif + + int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); + + rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); #endif - return label; } char *aclk_state(void) { diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h index 557b70d70..36a6d603f 100644 --- a/aclk/aclk_api.h +++ b/aclk/aclk_api.h @@ -15,31 +15,17 @@ extern usec_t aclk_session_us; extern time_t aclk_session_sec; extern int aclk_disable_runtime; -extern int aclk_disable_single_updates; extern int aclk_stats_enabled; extern int aclk_alert_reloaded; -extern int aclk_ng; extern int use_mqtt_5; +extern int aclk_ctx_based; #ifdef ENABLE_ACLK void *aclk_starter(void *ptr); -void aclk_single_update_disable(); -void aclk_single_update_enable(); - -void aclk_alarm_reload(void); - -int aclk_update_chart(RRDHOST *host, char *chart_name, int create); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); - -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL void aclk_host_state_update(RRDHOST *host, int connect); -#endif #define NETDATA_ACLK_HOOK \ { .name = "ACLK_Main", \ @@ -52,7 +38,7 @@ void aclk_host_state_update(RRDHOST *host, int connect); #endif -struct label *add_aclk_host_labels(struct label *label); +void add_aclk_host_labels(void); char *aclk_state(void); char *aclk_state_json(void); diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c index 4e1c466e8..51d8dad58 100644 --- a/aclk/aclk_charts_api.c +++ b/aclk/aclk_charts_api.c @@ -66,3 +66,12 @@ void aclk_update_node_info(struct update_node_info *info) query->data.bin_payload.msg_name = "UpdateNodeInfo"; QUEUE_IF_PAYLOAD_PRESENT(query); } + +void aclk_update_node_collectors(struct update_node_collectors *collectors) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; + query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); + query->data.bin_payload.msg_name = "UpdateNodeCollectors"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h index 305fe4f74..71f07dd33 100644 --- a/aclk/aclk_charts_api.h +++ b/aclk/aclk_charts_api.h @@ -17,4 +17,6 @@ void aclk_retention_updated(struct retention_updated *data); void aclk_update_node_info(struct update_node_info *info); +void aclk_update_node_collectors(struct update_node_collectors *collectors); + #endif /* ACLK_CHARTS_H */ diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c deleted file mode 100644 index 2920c9a5c..000000000 --- a/aclk/aclk_collector_list.c +++ /dev/null @@ -1,193 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// This is copied from Legacy ACLK, Original Author: amoss - -// TODO unmess this - -#include "aclk_collector_list.h" - -netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; - -struct _collector *collector_list = NULL; - -/* - * Free a collector structure - */ -void _free_collector(struct _collector *collector) -{ - if (likely(collector->plugin_name)) - freez(collector->plugin_name); - - if (likely(collector->module_name)) - freez(collector->module_name); - - if (likely(collector->hostname)) - freez(collector->hostname); - - freez(collector); -} - -/* - * This will report the collector list - * - */ -#ifdef ACLK_DEBUG -static void _dump_collector_list() -{ - struct _collector *tmp_collector; - - COLLECTOR_LOCK; - - info("DUMPING ALL COLLECTORS"); - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - info("DUMPING ALL COLLECTORS -- nothing found"); - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - - while (tmp_collector) { - info( - "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, - tmp_collector->plugin_name ? tmp_collector->plugin_name : "", - tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); - - tmp_collector = tmp_collector->next; - } - info("DUMPING ALL COLLECTORS DONE"); - COLLECTOR_UNLOCK; -} -#endif - -/* - * This will cleanup the collector list - * - */ -void _reset_collector_list() -{ - struct _collector *tmp_collector, *next_collector; - - COLLECTOR_LOCK; - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - collector_list->count = 0; - collector_list->next = NULL; - - // We broke the link; we can unlock - COLLECTOR_UNLOCK; - - while (tmp_collector) { - next_collector = tmp_collector->next; - _free_collector(tmp_collector); - tmp_collector = next_collector; - } -} - -/* - * Find a collector (if it exists) - * Must lock before calling this - * If last_collector is not null, it will return the previous collector in the linked - * list (used in collector delete) - */ -static struct _collector *_find_collector( - const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) -{ - struct _collector *tmp_collector, *prev_collector; - uint32_t plugin_hash; - uint32_t module_hash; - uint32_t hostname_hash; - - if (unlikely(!collector_list)) { - collector_list = callocz(1, sizeof(struct _collector)); - return NULL; - } - - if (unlikely(!collector_list->next)) - return NULL; - - plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - module_hash = module_name ? simple_hash(module_name) : 1; - hostname_hash = simple_hash(hostname); - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - prev_collector = collector_list; - while (tmp_collector) { - if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && - hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && - (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && - (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { - if (unlikely(last_collector)) - *last_collector = prev_collector; - - return tmp_collector; - } - - prev_collector = tmp_collector; - tmp_collector = tmp_collector->next; - } - - return tmp_collector; -} - -/* - * Called to delete a collector - * It will reduce the count (chart_count) and will remove it - * from the linked list if the count reaches zero - * The structure will be returned to the caller to free - * the resources - * - */ -struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector, *prev_collector = NULL; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); - - if (likely(tmp_collector)) { - --tmp_collector->count; - if (unlikely(!tmp_collector->count)) - prev_collector->next = tmp_collector->next; - } - return tmp_collector; -} - -/* - * Add a new collector (plugin / module) to the list - * If it already exists just update the chart count - * - * Lock before calling - */ -struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); - - if (unlikely(!tmp_collector)) { - tmp_collector = callocz(1, sizeof(struct _collector)); - tmp_collector->hostname_hash = simple_hash(hostname); - tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; - - tmp_collector->hostname = strdupz(hostname); - tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; - tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; - - tmp_collector->next = collector_list->next; - collector_list->next = tmp_collector; - } - tmp_collector->count++; - debug( - D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", - module_name ? module_name : "*", tmp_collector->count); - return tmp_collector; -} diff --git a/aclk/aclk_collector_list.h b/aclk/aclk_collector_list.h deleted file mode 100644 index 09c06b14a..000000000 --- a/aclk/aclk_collector_list.h +++ /dev/null @@ -1,41 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// This is copied from Legacy ACLK, Original Author: amoss - -// TODO unmess this - -#ifndef ACLK_COLLECTOR_LIST_H -#define ACLK_COLLECTOR_LIST_H - -#include "libnetdata/libnetdata.h" - -extern netdata_mutex_t collector_mutex; - -#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) -#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) - -/* - * Maintain a list of collectors and chart count - * If all the charts of a collector are deleted - * then a new metadata dataset must be send to the cloud - * - */ -struct _collector { - time_t created; - uint32_t count; //chart count - uint32_t hostname_hash; - uint32_t plugin_hash; - uint32_t module_hash; - char *hostname; - char *plugin_name; - char *module_name; - struct _collector *next; -}; - -extern struct _collector *collector_list; - -struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name); -struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name); -void _reset_collector_list(); -void _free_collector(struct _collector *collector); - -#endif /* ACLK_COLLECTOR_LIST_H */ diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c new file mode 100644 index 000000000..f17d3cabd --- /dev/null +++ b/aclk/aclk_contexts_api.c @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_query_queue.h" + +#include "aclk_contexts_api.h" + +void aclk_send_contexts_snapshot(contexts_snapshot_t data) +{ + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT; + query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsSnapshot"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_send_contexts_updated(contexts_updated_t data) +{ + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED; + query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h new file mode 100644 index 000000000..46b916d22 --- /dev/null +++ b/aclk/aclk_contexts_api.h @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_CONTEXTS_API_H +#define ACLK_CONTEXTS_API_H + +#include "schema-wrappers/schema_wrappers.h" + + +void aclk_send_contexts_snapshot(contexts_snapshot_t data); +void aclk_send_contexts_updated(contexts_updated_t data); + +#endif /* ACLK_CONTEXTS_API_H */ + diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index c99c65637..b7bf173c4 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -446,11 +446,37 @@ cleanup_buffers: return rc; } +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +static int private_decrypt(EVP_PKEY *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted) +#else static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted) +#endif { + int result; +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + size_t outlen = EVP_PKEY_size(p_key); + EVP_PKEY_CTX *ctx = EVP_PKEY_CTX_new(p_key, NULL); + if (!ctx) + return 1; + + if (EVP_PKEY_decrypt_init(ctx) <= 0) + return 1; + + if (EVP_PKEY_CTX_set_rsa_padding(ctx, RSA_PKCS1_OAEP_PADDING) <= 0) + return 1; + + *decrypted = mallocz(outlen); + + if (EVP_PKEY_decrypt(ctx, *decrypted, &outlen, enc_data, data_len) == 1) + result = (int) outlen; + else + result = -1; +#else *decrypted = mallocz(RSA_size(p_key)); - int result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING); - if (result == -1) { + result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING); +#endif + if (result == -1) + { char err[512]; ERR_error_string_n(ERR_get_error(), err, sizeof(err)); error("Decryption of the challenge failed: %s", err); @@ -458,12 +484,16 @@ static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, u return result; } +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) +#else int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) +#endif { unsigned char *challenge; int challenge_bytes; - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); if (agent_id == NULL) { error("Agent was not claimed - cannot perform challenge/response"); return 1; @@ -806,7 +836,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { req.request_type = HTTP_REQ_GET; - char *agent_id = is_agent_claimed(); + char *agent_id = get_agent_claimid(); if (agent_id == NULL) { error("Agent was not claimed - cannot perform challenge/response"); @@ -814,11 +844,11 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { return 1; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json,proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); -#else - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); -#endif + if (rrdcontext_enabled) + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + else + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + freez(agent_id); req.host = (char*)aclk_hostname; diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h index 1ca9245c2..2d660e5a4 100644 --- a/aclk/aclk_otp.h +++ b/aclk/aclk_otp.h @@ -8,7 +8,11 @@ #include "https_client.h" #include "aclk_util.h" +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target); +#else int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target); +#endif int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port); #endif /* ACLK_OTP_H */ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index de970fc3d..981c01965 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -13,27 +13,6 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) -typedef struct aclk_query_handler { - aclk_query_type_t type; - char *name; // for logging purposes - int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query); -} aclk_query_handler; - -static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_send_info_metadata(query_thr->client, - !query->data.metadata_info.initial_on_connect, - query->data.metadata_info.host); - return 0; -} - -static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_send_alarm_metadata(query_thr->client, - !query->data.metadata_info.initial_on_connect); - return 0; -} - static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url) { usec_t t; @@ -277,84 +256,63 @@ cleanup: return retval; } -static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); - return 0; -} - -static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_alarm_state_msg(query_thr->client, query->data.alarm_update); - // aclk_alarm_state_msg frees the json object including the header it generates - query->data.alarm_update = NULL; - return 0; -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { // this will be simplified when legacy support is removed aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); return 0; } -#endif - -aclk_query_handler aclk_query_handlers[] = { - { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 }, - { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query }, - { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata }, - { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata }, - { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, - { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg }, - { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg }, - { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, - { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, - { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, - { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg }, - { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg }, - { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg }, - { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg }, - { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg }, -#endif - { .type = UNKNOWN, .name = NULL, .fnc = NULL } -}; const char *aclk_query_get_name(aclk_query_type_t qt) { - aclk_query_handler *ptr = aclk_query_handlers; - while (ptr->type != UNKNOWN) { - if (ptr->type == qt) - return ptr->name; - ptr++; + switch (qt) { + case HTTP_API_V2: return "http_api_request_v2"; + case REGISTER_NODE: return "register_node"; + case NODE_STATE_UPDATE: return "node_state_update"; + case CHART_DIMS_UPDATE: return "chart_and_dim_update"; + case CHART_CONFIG_UPDATED: return "chart_config_updated"; + case CHART_RESET: return "reset_chart_messages"; + case RETENTION_UPDATED: return "update_retention_info"; + case UPDATE_NODE_INFO: return "update_node_info"; + case ALARM_LOG_HEALTH: return "alarm_log_health"; + case ALARM_PROVIDE_CFG: return "provide_alarm_config"; + case ALARM_SNAPSHOT: return "alarm_snapshot"; + case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; + case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; + default: + error_report("Unknown query type used %d", (int) qt); + return "unknown"; } - return "unknown"; } static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { - if (aclk_query_handlers[i].type == query->type) { - worker_is_busy(i); - - debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); - aclk_query_handlers[i].fnc(query_thr, query); - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[query_thr->idx]++; - aclk_metrics_per_sample.queries_per_type[query->type]++; - ACLK_STATS_UNLOCK; - } - aclk_query_free(query); +{ + if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) { + error_report("Unknown query in query queue. %u", query->type); + aclk_query_free(query); + return; + } - worker_is_idle(); - return; - } + worker_is_busy(query->type); + if (query->type == HTTP_API_V2) { + debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); + http_api_v2(query_thr, query); + } else { + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); + send_bin_msg(query_thr, query); } - fatal("Unknown query in query queue. %u", query->type); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[query_thr->idx]++; + aclk_metrics_per_sample.queries_per_type[query->type]++; + ACLK_STATS_UNLOCK; + } + + aclk_query_free(query); + + worker_is_idle(); } /* Processes messages from queue. Compete for work with other threads @@ -370,8 +328,8 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) static void worker_aclk_register(void) { worker_register("ACLKQUERY"); - for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { - worker_register_job_name(i, aclk_query_handlers[i].name); + for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) { + worker_register_job_name(i, aclk_query_get_name(i)); } } diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 2422b01e1..01b20d23f 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -111,15 +111,6 @@ void aclk_query_free(aclk_query_t query) freez(query->data.http_api_v2.query); break; - case CHART_NEW: - freez(query->data.chart_add_del.chart_name); - break; - - case ALARM_STATE_UPDATE: - if (query->data.alarm_update) - json_object_put(query->data.alarm_update); - break; - case NODE_STATE_UPDATE: case REGISTER_NODE: case CHART_DIMS_UPDATE: @@ -130,6 +121,8 @@ void aclk_query_free(aclk_query_t query) case ALARM_LOG_HEALTH: case ALARM_PROVIDE_CFG: case ALARM_SNAPSHOT: + case UPDATE_NODE_COLLECTORS: + case PROTO_BIN_MESSAGE: if (!use_mqtt_5) freez(query->data.bin_payload.payload); break; diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 0b5ef8faa..ab94b6384 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -11,12 +11,7 @@ typedef enum { UNKNOWN = 0, - METADATA_INFO, - METADATA_ALARMS, HTTP_API_V2, - CHART_NEW, - CHART_DEL, - ALARM_STATE_UPDATE, REGISTER_NODE, NODE_STATE_UPDATE, CHART_DIMS_UPDATE, @@ -27,19 +22,11 @@ typedef enum { ALARM_LOG_HEALTH, ALARM_PROVIDE_CFG, ALARM_SNAPSHOT, + UPDATE_NODE_COLLECTORS, + PROTO_BIN_MESSAGE, ACLK_QUERY_TYPE_COUNT // always keep this as last } aclk_query_type_t; -struct aclk_query_metadata { - RRDHOST *host; - int initial_on_connect; -}; - -struct aclk_query_chart_add_del { - RRDHOST *host; - char* chart_name; -}; - struct aclk_query_http_api_v2 { char *payload; char *query; @@ -73,12 +60,8 @@ struct aclk_query { // TODO maybe remove? int version; union { - struct aclk_query_metadata metadata_info; - struct aclk_query_metadata metadata_alarms; struct aclk_query_http_api_v2 http_api_v2; - struct aclk_query_chart_add_del chart_add_del; struct aclk_bin_payload bin_payload; - json_object *alarm_update; } data; }; diff --git a/aclk/aclk_rrdhost_state.h b/aclk/aclk_rrdhost_state.h index 9138123df..5c8a2ddc9 100644 --- a/aclk/aclk_rrdhost_state.h +++ b/aclk/aclk_rrdhost_state.h @@ -3,43 +3,9 @@ #include "libnetdata/libnetdata.h" -#ifdef ACLK_LEGACY -typedef enum aclk_cmd { - ACLK_CMD_CLOUD, - ACLK_CMD_ONCONNECT, - ACLK_CMD_INFO, - ACLK_CMD_CHART, - ACLK_CMD_CHARTDEL, - ACLK_CMD_ALARM, - ACLK_CMD_CLOUD_QUERY_2, - ACLK_CMD_CHILD_CONNECT, - ACLK_CMD_CHILD_DISCONNECT -} ACLK_CMD; - -typedef enum aclk_metadata_state { - ACLK_METADATA_REQUIRED, - ACLK_METADATA_CMD_QUEUED, - ACLK_METADATA_SENT -} ACLK_METADATA_STATE; -#endif - -typedef enum aclk_agent_state { - ACLK_HOST_INITIALIZING, - ACLK_HOST_STABLE -} ACLK_AGENT_STATE; - typedef struct aclk_rrdhost_state { char *claimed_id; // Claimed ID if host has one otherwise NULL char *prev_claimed_id; // Claimed ID if changed (reclaimed) during runtime - -#ifdef ACLK_LEGACY - // per child popcorning - ACLK_AGENT_STATE state; - ACLK_METADATA_STATE metadata; - - time_t timestamp_created; - time_t t_last_popcorn_update; -#endif /* ACLK_LEGACY */ } aclk_rrdhost_state; #endif /* ACLK_RRDHOST_STATE_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 27f1bf2dc..e6ed332cc 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -6,6 +6,8 @@ #include "aclk_query_queue.h" #include "aclk.h" +#include "schema-wrappers/proto_2_json.h" + #define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" #define ACLK_CLOUD_REQ_V2_PREFIX "GET /" @@ -55,19 +57,19 @@ static int cloud_to_agent_parse(JSON_ENTRY *e) break; case JSON_NUMBER: if (!strcmp(e->name, "version")) { - data->version = e->data.number; + data->version = (int)e->data.number; break; } if (!strcmp(e->name, "timeout")) { - data->timeout = e->data.number; + data->timeout = (int)e->data.number; break; } if (!strcmp(e->name, "min-version")) { - data->min_version = e->data.number; + data->min_version = (int)e->data.number; break; } if (!strcmp(e->name, "max-version")) { - data->max_version = e->data.number; + data->max_version = (int)e->data.number; break; } @@ -116,20 +118,8 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur return 0; } -#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ - if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\ - debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ - ACLK_SHARED_STATE_UNLOCK;\ - return 1;\ - }\ - ACLK_SHARED_STATE_UNLOCK; - static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { - if (!aclk_use_new_cloud_arch) { - HTTP_CHECK_AGENT_INITIALIZED(); - } - aclk_query_t query; errno = 0; @@ -229,7 +219,6 @@ err_cleanup: return 1; } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL typedef uint32_t simple_hash_t; typedef int(*rx_msg_handler)(const char *msg, size_t msg_len); @@ -300,6 +289,15 @@ int create_node_instance_result(const char *msg, size_t msg_len) } } + struct capability caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_state_update.capabilities = caps; + rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); @@ -324,6 +322,7 @@ int send_node_instances(const char *msg, size_t msg_len) int stream_charts_and_dimensions(const char *msg, size_t msg_len) { + aclk_ctx_based = 0; stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); if (!res.claim_id || !res.node_id) { error("Error parsing StreamChartsAndDimensions msg"); @@ -437,6 +436,41 @@ int handle_disconnect_req(const char *msg, size_t msg_len) return 0; } +int contexts_checkpoint(const char *msg, size_t msg_len) +{ + aclk_ctx_based = 1; + + struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_checkpoint_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + +int stop_streaming_contexts(const char *msg, size_t msg_len) +{ + if (!aclk_ctx_based) { + error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message"); + return 1; + } + + struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_stop_streaming_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + typedef struct { const char *name; simple_hash_t name_hash; @@ -455,6 +489,8 @@ new_cloud_rx_msg_t rx_msgs[] = { { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, + { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint }, + { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts }, { .name = NULL, .name_hash = 0, .fnc = NULL }, }; @@ -491,7 +527,7 @@ unsigned int aclk_init_rx_msg_handlers(void) return i; } -void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic) { if (aclk_stats_enabled) { ACLK_STATS_LOCK; @@ -509,6 +545,17 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t } return; } + +#ifdef NETDATA_INTERNAL_CHECKS + if (!strncmp(message_type, "cmd", strlen("cmd"))) { + log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name); + } else { + char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name); + log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name); + freez(json); + } +#endif + if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++; @@ -524,4 +571,3 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t return; } } -#endif diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index 00f88c6a8..61921faec 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -10,10 +10,8 @@ int aclk_handle_cloud_cmd_message(char *payload); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL const char *rx_handler_get_name(size_t i); unsigned int aclk_init_rx_msg_handlers(void); -void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len); -#endif +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic); #endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index ca0532638..241e9b724 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -8,11 +8,9 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; struct { int query_thread_count; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL unsigned int proto_hdl_cnt; uint32_t *aclk_proto_rx_msgs_sample; RRDDIM **rx_msg_dims; -#endif } aclk_stats_cfg; // there is only 1 stats thread at a time // data ACLK stats need per query thread @@ -237,7 +235,6 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL const char *rx_handler_get_name(size_t i); static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) { @@ -259,35 +256,53 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) rrdset_done(st); } -#endif -void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) +static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL - UNUSED(proto_hdl_cnt); -#endif + static RRDSET *st = NULL; + static RRDDIM *rd_sent = NULL; + static RRDDIM *rd_recvd = NULL; + static uint64_t sent = 0; + static uint64_t recvd = 0; + + sent += stats->bytes_tx; + recvd += stats->bytes_rx; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_openssl_bytes", NULL, "aclk", NULL, "Received and Sent bytes.", "B/s", + "netdata", "stats", 200011, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_sent, sent); + rrddim_set_by_pointer(st, rd_recvd, recvd); + rrdset_done(st); +} + +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) +{ aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt; aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*)); -#endif } void aclk_stats_thread_cleanup() { -#ifdef ENABLE_NEW_CLOUD_PROTOCOL freez(aclk_stats_cfg.rx_msg_dims); freez(aclk_proto_rx_msgs_sample); freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample); -#endif freez(aclk_qt_data); freez(aclk_queries_per_thread); freez(aclk_queries_per_thread_sample); @@ -318,10 +333,10 @@ void *aclk_stats_main_thread(void *ptr) // to not hold lock longer than necessary, especially not to hold it // during database rrd* operations memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL + memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); -#endif + memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); @@ -343,9 +358,10 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_query_time(&per_sample); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL + struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client); + aclk_stats_mqtt_wss(&mqtt_wss_stats); + aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample); -#endif } return 0; diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index 4f2894798..bec9ac247 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -6,6 +6,7 @@ #include "daemon/common.h" #include "libnetdata/libnetdata.h" #include "aclk_query_queue.h" +#include "mqtt_wss_client.h" #define ACLK_STATS_THREAD_NAME "ACLK_Stats" @@ -22,6 +23,7 @@ int aclk_cloud_req_http_type_to_idx(const char *name); struct aclk_stats_thread { netdata_thread_t *thread; int query_thread_count; + mqtt_wss_client client; }; // preserve between samples @@ -60,9 +62,7 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_q_process_max; } aclk_metrics_per_sample; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL extern uint32_t *aclk_proto_rx_msgs_sample; -#endif extern uint32_t *aclk_queries_per_thread; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 3530dccff..822a90fa2 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -6,6 +6,8 @@ #include "aclk_stats.h" #include "aclk.h" +#include "schema-wrappers/proto_2_json.h" + #ifndef __GNUC__ #pragma region aclk_tx_msgs helper functions #endif @@ -13,29 +15,6 @@ // version for aclk legacy (old cloud arch) #define ACLK_VERSION 2 -static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) -{ - uint16_t packet_id; - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - const char *topic = aclk_get_topic(subtopic); - - if (unlikely(!topic)) { - error("Couldn't get topic. Aborting message send"); - return; - } - - mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(packet_id); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif -} - uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { #ifndef ACLK_LOG_CONVERSATION_DIR @@ -56,42 +35,11 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); + char *json = protomsg_to_json(msg, msg_len, msgname); + log_aclk_message_bin(json, strlen(json), 1, topic, msgname); + freez(json); #endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname); - FILE *fptr; - if (fptr = fopen(filename,"w")) { - fwrite(msg, msg_len, 1, fptr); - fclose(fptr); - } -#endif - - return packet_id; -} - -static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) -{ - uint16_t packet_id; - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - const char *topic = aclk_get_topic(subtopic); - - if (unlikely(!topic)) { - error("Couldn't get topic. Aborting message send"); - return 0; - } - mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(packet_id); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif return packet_id; } @@ -231,17 +179,6 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time return obj; } -static char *create_uuid() -{ - uuid_t uuid; - char *uuid_str = mallocz(36 + 1); - - uuid_generate(uuid); - uuid_unparse(uuid, uuid_str); - - return uuid_str; -} - #ifndef __GNUC__ #pragma endregion #endif @@ -250,90 +187,6 @@ static char *create_uuid() #pragma region aclk_tx_msgs message generators #endif -/* - * This will send the /api/v1/info - */ -#define BUFFER_INITIAL_SIZE (1024 * 16) -void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host) -{ - BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); - json_object *msg, *payload, *tmp; - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - if (metadata_submitted) - msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION); - else - msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); - - payload = json_object_new_object(); - json_object_object_add(msg, "payload", payload); - - web_client_api_request_v1_info_fill_buffer(host, local_buffer); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "info", tmp); - - buffer_flush(local_buffer); - - charts2json(host, local_buffer, 1, 0); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "charts", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); - - json_object_put(msg); - freez(msg_id); - buffer_free(local_buffer); -} - -// TODO should include header instead -void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); - -void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) -{ - BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); - json_object *msg, *payload, *tmp; - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - - if (metadata_submitted) - msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION); - else - msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION); - - payload = json_object_new_object(); - json_object_object_add(msg, "payload", payload); - - health_alarms2json(localhost, local_buffer, 1); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "configured-alarms", tmp); - - buffer_flush(local_buffer); - - health_active_log_alarms_2json(localhost, local_buffer); - tmp = json_tokener_parse(local_buffer->buffer); - json_object_object_add(payload, "alarms-active", tmp); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS); - - json_object_put(msg); - freez(msg_id); - buffer_free(local_buffer); -} - void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -384,80 +237,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg } } -void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) -{ - json_object *msg, *payload; - BUFFER *tmp_buffer; - RRDSET *st; - - st = rrdset_find(host, chart); - if (!st) - st = rrdset_find_byname(host, chart); - if (!st) { - info("FAILED to find chart %s", chart); - return; - } - - tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE); - rrdset2json(st, tmp_buffer, NULL, NULL, 1); - payload = json_tokener_parse(tmp_buffer->buffer); - if (!payload) { - error("Failed to parse JSON from rrdset2json"); - buffer_free(tmp_buffer); - return; - } - - msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION); - json_object_object_add(msg, "payload", payload); - - aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART); - - buffer_free(tmp_buffer); - json_object_put(msg); -} - -void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg) -{ - // we create header here on purpose (and not send message with it already as `msg` param) - // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy - // send message with timestamps already to Query Queue they would be incorrect at time - // when query queue would get to send them) - json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION); - json_object_object_add(obj, "payload", msg); - - aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS); - json_object_put(obj); -} - -/* - * Will generate disconnect message. - * @param message if NULL it will generate LWT message (unexpected). - * Otherwise string pointed to by this parameter will be used as - * reason. - */ -json_object *aclk_generate_disconnect(const char *message) -{ - json_object *tmp, *msg; - - msg = create_hdr("disconnect", NULL, 0, 0, 2); - - tmp = json_object_new_string(message ? message : "unexpected"); - json_object_object_add(msg, "payload", tmp); - - return msg; -} - -int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) -{ - int pid; - json_object *msg = aclk_generate_disconnect(message); - pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA); - json_object_put(msg); - return pid; -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -// new protobuf msgs uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { size_t len; uint16_t pid; @@ -469,6 +248,7 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, #endif { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, { .name = NULL, .version = 0, .enabled = 0 } }; @@ -532,7 +312,6 @@ char *aclk_generate_lwt(size_t *size) { return msg; } -#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ #ifndef __GNUC__ #pragma endregion diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index 44281eb68..31e592410 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -11,23 +11,10 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); -void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); -void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); - void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len); void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); -void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart); - -void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg); - -json_object *aclk_generate_disconnect(const char *message); -int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message); - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL -// new protobuf msgs uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable); char *aclk_generate_lwt(size_t *size); -#endif #endif diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index 430925460..ec021aec5 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -4,7 +4,6 @@ #include "daemon/common.h" -int aclk_use_new_cloud_arch = 0; usec_t aclk_session_newarch = 0; aclk_env_t *aclk_env = NULL; @@ -124,22 +123,17 @@ struct topic_name { { .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" }, { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" }, + { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" }, + { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" }, + { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" }, { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } }; -enum aclk_topics compulsory_topics_legacy[] = { - ACLK_TOPICID_CHART, - ACLK_TOPICID_ALARMS, - ACLK_TOPICID_METADATA, - ACLK_TOPICID_COMMAND, - ACLK_TOPICID_UNKNOWN -}; - -enum aclk_topics compulsory_topics_new_cloud_arch[] = { +enum aclk_topics compulsory_topics[] = { // TODO remove old topics once not needed anymore - ACLK_TOPICID_CHART, - ACLK_TOPICID_ALARMS, - ACLK_TOPICID_METADATA, + ACLK_TOPICID_CHART, //TODO from legacy + ACLK_TOPICID_ALARMS, //TODO from legacy + ACLK_TOPICID_METADATA, //TODO from legacy ACLK_TOPICID_COMMAND, ACLK_TOPICID_AGENT_CONN, ACLK_TOPICID_CMD_NG_V1, @@ -154,6 +148,9 @@ enum aclk_topics compulsory_topics_new_cloud_arch[] = { ACLK_TOPICID_ALARM_HEALTH, ACLK_TOPICID_ALARM_CONFIG, ACLK_TOPICID_ALARM_SNAPSHOT, + ACLK_TOPICID_NODE_COLLECTORS, + ACLK_TOPICID_CTXS_SNAPSHOT, + ACLK_TOPICID_CTXS_UPDATED, ACLK_TOPICID_UNKNOWN }; @@ -279,8 +276,6 @@ int aclk_generate_topic_cache(struct json_object *json) } } - enum aclk_topics *compulsory_topics = aclk_use_new_cloud_arch ? compulsory_topics_new_cloud_arch : compulsory_topics_legacy; - for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { if (!aclk_get_topic(compulsory_topics[i])) { error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index fb0492ac8..ed715e046 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -20,7 +20,6 @@ // Helper stuff which should not have any further inside ACLK dependency // and are supposed not to be needed outside of ACLK -extern int aclk_use_new_cloud_arch; extern usec_t aclk_session_newarch; extern int chart_batch_id; @@ -88,7 +87,10 @@ enum aclk_topics { ACLK_TOPICID_ALARM_LOG = 14, ACLK_TOPICID_ALARM_HEALTH = 15, ACLK_TOPICID_ALARM_CONFIG = 16, - ACLK_TOPICID_ALARM_SNAPSHOT = 17 + ACLK_TOPICID_ALARM_SNAPSHOT = 17, + ACLK_TOPICID_NODE_COLLECTORS = 18, + ACLK_TOPICID_CTXS_SNAPSHOT = 19, + ACLK_TOPICID_CTXS_UPDATED = 20 }; const char *aclk_get_topic(enum aclk_topics topic); diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc index 338e512d8..f64393300 100644 --- a/aclk/schema-wrappers/alarm_stream.cc +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -118,6 +118,7 @@ void destroy_alarm_log_entry(struct alarm_log_entry *entry) freez(entry->old_value_string); freez(entry->rendered_info); + freez(entry->chart_context); } static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto) @@ -166,6 +167,8 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr proto->set_updated(data->updated); proto->set_rendered_info(data->rendered_info); + + proto->set_chart_context(data->chart_context); } char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h index 2932bb192..63911da3f 100644 --- a/aclk/schema-wrappers/alarm_stream.h +++ b/aclk/schema-wrappers/alarm_stream.h @@ -97,6 +97,8 @@ struct alarm_log_entry { // rendered_info char *rendered_info; + + char *chart_context; }; struct send_alarm_snapshot { diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc index 7d820e533..54c940758 100644 --- a/aclk/schema-wrappers/chart_stream.cc +++ b/aclk/schema-wrappers/chart_stream.cc @@ -76,7 +76,7 @@ void chart_instance_updated_destroy(struct chart_instance_updated *instance) freez((char*)instance->id); freez((char*)instance->claim_id); - free_label_list(instance->label_head); + rrdlabels_destroy(instance->chart_labels); freez((char*)instance->config_hash); } @@ -85,7 +85,6 @@ static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, co { google::protobuf::Map<std::string, std::string> *map; aclk_lib::v1::ACLKMessagePosition *pos; - struct label *label; chart->set_id(update->id); chart->set_claim_id(update->claim_id); @@ -93,11 +92,7 @@ static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, co chart->set_name(update->name); map = chart->mutable_chart_labels(); - label = update->label_head; - while (label) { - map->insert({label->key, label->value}); - label = label->next; - } + rrdlabels_walkthrough_read(update->chart_labels, label_add_to_map_callback, map); switch (update->memory_mode) { case RRD_MEMORY_MODE_NONE: diff --git a/aclk/schema-wrappers/chart_stream.h b/aclk/schema-wrappers/chart_stream.h index 7a46ecd8e..904866868 100644 --- a/aclk/schema-wrappers/chart_stream.h +++ b/aclk/schema-wrappers/chart_stream.h @@ -57,7 +57,7 @@ struct chart_instance_updated { const char *node_id; const char *name; - struct label *label_head; + DICTIONARY *chart_labels; RRD_MEMORY_MODE memory_mode; diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc new file mode 100644 index 000000000..b04c9d20c --- /dev/null +++ b/aclk/schema-wrappers/context.cc @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/context.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +#include "context.h" + +using namespace context::v1; + +// ContextsSnapshot +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version) +{ + ContextsSnapshot *ctxs_snap = new ContextsSnapshot; + + if (ctxs_snap == NULL) + fatal("Cannot allocate ContextsSnapshot object. OOM"); + + ctxs_snap->set_claim_id(claim_id); + ctxs_snap->set_node_id(node_id); + ctxs_snap->set_version(version); + + return ctxs_snap; +} + +void contexts_snapshot_delete(contexts_snapshot_t snapshot) +{ + delete (ContextsSnapshot *)snapshot; +} + +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version) +{ + ((ContextsSnapshot *)ctxs_snapshot)->set_version(version); +} + +static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx) +{ + ctx->set_id(c_ctx->id); + ctx->set_version(c_ctx->version); + ctx->set_first_entry(c_ctx->first_entry); + ctx->set_last_entry(c_ctx->last_entry); + ctx->set_deleted(c_ctx->deleted); + ctx->set_title(c_ctx->title); + ctx->set_priority(c_ctx->priority); + ctx->set_chart_type(c_ctx->chart_type); + ctx->set_units(c_ctx->units); + ctx->set_family(c_ctx->family); +} + +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + ContextUpdated *ctx = ctxs_snap->add_contexts(); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap); + char *bin = (char*)mallocz(*len); + if (!ctxs_snap->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_snap; + return NULL; + } + + delete ctxs_snap; + return bin; +} + +// ContextsUpdated +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at) +{ + ContextsUpdated *ctxs_updated = new ContextsUpdated; + + if (ctxs_updated == NULL) + fatal("Cannot allocate ContextsUpdated object. OOM"); + + ctxs_updated->set_claim_id(claim_id); + ctxs_updated->set_node_id(node_id); + ctxs_updated->set_version_hash(version_hash); + ctxs_updated->set_created_at(created_at); + + return ctxs_updated; +} + +void contexts_updated_delete(contexts_updated_t ctxs_updated) +{ + delete (ContextsUpdated *)ctxs_updated; +} + +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash) +{ + ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash); +} + +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + ContextUpdated *ctx = ctxs_update->add_contextupdates(); + + if (ctx == NULL) + fatal("Cannot allocate ContextUpdated object. OOM"); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update); + char *bin = (char*)mallocz(*len); + if (!ctxs_update->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_update; + return NULL; + } + + delete ctxs_update; + return bin; +} diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h new file mode 100644 index 000000000..cbb7701a8 --- /dev/null +++ b/aclk/schema-wrappers/context.h @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_H + +#include <stdint.h> +#include <sys/types.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* contexts_updated_t; +typedef void* contexts_snapshot_t; + +struct context_updated { + // context id + const char *id; + + uint64_t version; + + uint64_t first_entry; + uint64_t last_entry; + + int deleted; + + const char *title; + uint64_t priority; + const char *chart_type; + const char *units; + const char *family; +}; + +// ContextS Snapshot related +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version); +void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot); +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version); +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update); +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len); + +// ContextS Updated related +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at); +void contexts_updated_delete(contexts_updated_t ctxs_updated); +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash); +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update); +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */ diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc new file mode 100644 index 000000000..3bb1956cb --- /dev/null +++ b/aclk/schema-wrappers/context_stream.cc @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/stream.pb.h" + +#include "context_stream.h" + +#include "libnetdata/libnetdata.h" + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len) +{ + context::v1::StopStreamingContexts msg; + + struct stop_streaming_ctxs *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + + return res; +} + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len) +{ + context::v1::ContextsCheckpoint msg; + + struct ctxs_checkpoint *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + res->version_hash = msg.version_hash(); + + return res; +} diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h new file mode 100644 index 000000000..8c691d2cc --- /dev/null +++ b/aclk/schema-wrappers/context_stream.h @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct stop_streaming_ctxs { + char *claim_id; + char *node_id; + // we omit reason as there is only one defined at this point + // as soon as there is more than one defined in StopStreaminContextsReason + // we should add it + // 0 - RATE_LIMIT_EXCEEDED +}; + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len); + +struct ctxs_checkpoint { + char *claim_id; + char *node_id; + + uint64_t version_hash; +}; + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len); + + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */ diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc index 0a4c8ece1..a6ca8ef98 100644 --- a/aclk/schema-wrappers/node_connection.cc +++ b/aclk/schema-wrappers/node_connection.cc @@ -28,6 +28,15 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect timestamp->set_seconds(tv.tv_sec); timestamp->set_nanos(tv.tv_usec * 1000); + if (data->capabilities) { + struct capability *capa = data->capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + *len = PROTO_COMPAT_MSG_SIZE(msg); char *bin = (char*)malloc(*len); if (bin) diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h index 3fd207213..c27729d15 100644 --- a/aclk/schema-wrappers/node_connection.h +++ b/aclk/schema-wrappers/node_connection.h @@ -3,6 +3,8 @@ #ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H #define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H +#include "capability.h" + #ifdef __cplusplus extern "C" { #endif @@ -17,6 +19,7 @@ typedef struct { int64_t session_id; int32_t hops; + struct capability *capabilities; } node_instance_connection_t; char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc index f66985246..2a05ddaba 100644 --- a/aclk/schema-wrappers/node_info.cc +++ b/aclk/schema-wrappers/node_info.cc @@ -6,7 +6,6 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data) { - struct label *label; google::protobuf::Map<std::string, std::string> *map; if (data->name) @@ -56,9 +55,6 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl if (data->custom_info) info->set_custom_info(data->custom_info); - for (size_t i = 0; i < data->service_count; i++) - info->add_services(data->services[i]); - if (data->machine_guid) info->set_machine_guid(data->machine_guid); @@ -67,12 +63,7 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl ml_info->set_ml_enabled(data->ml_info.ml_enabled); map = info->mutable_host_labels(); - label = data->host_labels_head; - while (label) { - map->insert({label->key, label->value}); - label = label->next; - } - + rrdlabels_walkthrough_read(data->host_labels_ptr, label_add_to_map_callback, map); return 0; } @@ -119,3 +110,27 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in return bin; } + +char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *upd_node_collectors) +{ + nodeinstance::info::v1::UpdateNodeCollectors msg; + + msg.set_node_id(upd_node_collectors->node_id); + msg.set_claim_id(upd_node_collectors->claim_id); + + void *colls; + dfe_start_read(upd_node_collectors->node_collectors, colls) { + struct collector_info *c =(struct collector_info *)colls; + nodeinstance::info::v1::CollectorInfo *col = msg.add_collectors(); + col->set_plugin(c->plugin); + col->set_module(c->module); + } + dfe_done(colls); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h index e67f3e1da..e8ac2d7c6 100644 --- a/aclk/schema-wrappers/node_info.h +++ b/aclk/schema-wrappers/node_info.h @@ -4,9 +4,10 @@ #define ACLK_SCHEMA_WRAPPER_NODE_INFO_H #include <stdlib.h> +#include <stdint.h> -#include "database/rrd.h" #include "capability.h" +#include "database/rrd.h" #ifdef __cplusplus extern "C" { @@ -49,12 +50,9 @@ struct aclk_node_info { char *custom_info; - char **services; - size_t service_count; - char *machine_guid; - struct label *host_labels_head; + DICTIONARY *host_labels_ptr; struct machine_learning_info ml_info; }; @@ -73,8 +71,21 @@ struct update_node_info { struct capability *node_instance_capabilities; }; +struct collector_info { + char *module; + char *plugin; +}; + +struct update_node_collectors { + char *claim_id; + char *node_id; + DICTIONARY *node_collectors; +}; + char *generate_update_node_info_message(size_t *len, struct update_node_info *info); +char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *collectors); + #ifdef __cplusplus } #endif diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc new file mode 100644 index 000000000..0e473eb6c --- /dev/null +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -0,0 +1,101 @@ +#include <google/protobuf/message.h> +#include <google/protobuf/util/json_util.h> + +#include "proto/alarm/v1/config.pb.h" +#include "proto/alarm/v1/stream.pb.h" +#include "proto/aclk/v1/lib.pb.h" +#include "proto/chart/v1/config.pb.h" +#include "proto/chart/v1/stream.pb.h" +#include "proto/agent/v1/connection.pb.h" +#include "proto/agent/v1/disconnect.pb.h" +#include "proto/nodeinstance/connection/v1/connection.pb.h" +#include "proto/nodeinstance/create/v1/creation.pb.h" +#include "proto/nodeinstance/info/v1/info.pb.h" +#include "proto/context/v1/stream.pb.h" +#include "proto/context/v1/context.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "proto_2_json.h" + +using namespace google::protobuf::util; + +static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) +{ +//tx side + if (!strcmp(msgname, "UpdateAgentConnection")) + return new agent::v1::UpdateAgentConnection; + if (!strcmp(msgname, "UpdateNodeInstanceConnection")) + return new nodeinstance::v1::UpdateNodeInstanceConnection; + if (!strcmp(msgname, "CreateNodeInstance")) + return new nodeinstance::create::v1::CreateNodeInstance; + if (!strcmp(msgname, "ChartsAndDimensionsUpdated")) + return new chart::v1::ChartsAndDimensionsUpdated; + if (!strcmp(msgname, "ChartConfigsUpdated")) + return new chart::v1::ChartConfigsUpdated; + if (!strcmp(msgname, "ResetChartMessages")) + return new chart::v1::ResetChartMessages; + if (!strcmp(msgname, "RetentionUpdated")) + return new chart::v1::RetentionUpdated; + if (!strcmp(msgname, "UpdateNodeInfo")) + return new nodeinstance::info::v1::UpdateNodeInfo; + if (!strcmp(msgname, "AlarmLogHealth")) + return new alarms::v1::AlarmLogHealth; + if (!strcmp(msgname, "ProvideAlarmConfiguration")) + return new alarms::v1::ProvideAlarmConfiguration; + if (!strcmp(msgname, "AlarmSnapshot")) + return new alarms::v1::AlarmSnapshot; + if (!strcmp(msgname, "AlarmLogEntry")) + return new alarms::v1::AlarmLogEntry; + if (!strcmp(msgname, "UpdateNodeCollectors")) + return new nodeinstance::info::v1::UpdateNodeCollectors; + if (!strcmp(msgname, "ContextsUpdated")) + return new context::v1::ContextsUpdated; + if (!strcmp(msgname, "ContextsSnapshot")) + return new context::v1::ContextsSnapshot; + +//rx side + if (!strcmp(msgname, "CreateNodeInstanceResult")) + return new nodeinstance::create::v1::CreateNodeInstanceResult; + if (!strcmp(msgname, "SendNodeInstances")) + return new agent::v1::SendNodeInstances; + if (!strcmp(msgname, "StreamChartsAndDimensions")) + return new chart::v1::StreamChartsAndDimensions; + if (!strcmp(msgname, "ChartsAndDimensionsAck")) + return new chart::v1::ChartsAndDimensionsAck; + if (!strcmp(msgname, "UpdateChartConfigs")) + return new chart::v1::UpdateChartConfigs; + if (!strcmp(msgname, "StartAlarmStreaming")) + return new alarms::v1::StartAlarmStreaming; + if (!strcmp(msgname, "SendAlarmLogHealth")) + return new alarms::v1::SendAlarmLogHealth; + if (!strcmp(msgname, "SendAlarmConfiguration")) + return new alarms::v1::SendAlarmConfiguration; + if (!strcmp(msgname, "SendAlarmSnapshot")) + return new alarms::v1::SendAlarmSnapshot; + if (!strcmp(msgname, "DisconnectReq")) + return new agent::v1::DisconnectReq; + if (!strcmp(msgname, "ContextsCheckpoint")) + return new context::v1::ContextsCheckpoint; + if (!strcmp(msgname, "StopStreamingContexts")) + return new context::v1::StopStreamingContexts; + + return NULL; +} + +char *protomsg_to_json(const void *protobin, size_t len, const char *msgname) +{ + google::protobuf::Message *msg = msg_name_to_protomsg(msgname); + if (msg == NULL) + return strdupz("Don't know this message type by name."); + + if (!msg->ParseFromArray(protobin, len)) + return strdupz("Can't parse this message. Malformed or wrong parser used."); + + JsonPrintOptions options; + + std::string output; + google::protobuf::util::MessageToJsonString(*msg, &output, options); + delete msg; + return strdupz(output.c_str()); +} diff --git a/aclk/schema-wrappers/proto_2_json.h b/aclk/schema-wrappers/proto_2_json.h new file mode 100644 index 000000000..3bd98478c --- /dev/null +++ b/aclk/schema-wrappers/proto_2_json.h @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef PROTO_2_JSON_H +#define PROTO_2_JSON_H + +#include <sys/types.h> + +#ifdef __cplusplus +extern "C" { +#endif + +char *protomsg_to_json(const void *protobin, size_t len, const char *msgname); + +#ifdef __cplusplus +} +#endif + +#endif /* PROTO_2_JSON_H */ diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/aclk/schema-wrappers/schema_wrapper_utils.cc index b100e20c3..6573e6299 100644 --- a/aclk/schema-wrappers/schema_wrapper_utils.cc +++ b/aclk/schema-wrappers/schema_wrapper_utils.cc @@ -13,3 +13,10 @@ void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, st tv->tv_sec = ts.seconds(); tv->tv_usec = ts.nanos()/1000; } + +int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + (void)ls; + auto map = (google::protobuf::Map<std::string, std::string> *)data; + map->insert({name, value}); + return 1; +} diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h index 494855f82..2815d0f20 100644 --- a/aclk/schema-wrappers/schema_wrapper_utils.h +++ b/aclk/schema-wrappers/schema_wrapper_utils.h @@ -3,8 +3,11 @@ #ifndef SCHEMA_WRAPPER_UTILS_H #define SCHEMA_WRAPPER_UTILS_H +#include "database/rrd.h" + #include <sys/time.h> #include <google/protobuf/timestamp.pb.h> +#include <google/protobuf/map.h> #if GOOGLE_PROTOBUF_VERSION < 3001000 #define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize(); @@ -16,5 +19,6 @@ void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts); void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv); +int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data); #endif /* SCHEMA_WRAPPER_UTILS_H */ diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index a3248a69b..26412cacc 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -14,5 +14,7 @@ #include "alarm_stream.h" #include "node_info.h" #include "capability.h" +#include "context_stream.h" +#include "context.h" #endif /* SCHEMA_WRAPPERS_H */ |