From 03bf87dcb06f7021bfb2df2fa8691593c6148aff Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 30 Nov 2022 19:47:00 +0100 Subject: Adding upstream version 1.37.0. Signed-off-by: Daniel Baumann --- aclk/README.md | 8 +- aclk/aclk.c | 209 +++++++++----------- aclk/aclk.h | 21 +- aclk/aclk_alarm_api.c | 3 - aclk/aclk_api.c | 88 --------- aclk/aclk_api.h | 45 ----- aclk/aclk_capas.c | 47 +++++ aclk/aclk_capas.h | 14 ++ aclk/aclk_charts_api.c | 77 -------- aclk/aclk_charts_api.h | 22 --- aclk/aclk_contexts_api.c | 18 ++ aclk/aclk_contexts_api.h | 2 + aclk/aclk_otp.c | 11 +- aclk/aclk_query.c | 32 ++- aclk/aclk_query.h | 2 +- aclk/aclk_query_queue.c | 16 -- aclk/aclk_rx_msgs.c | 70 +++---- aclk/aclk_stats.c | 33 ++-- aclk/aclk_tx_msgs.c | 94 +++------ aclk/aclk_util.c | 1 + aclk/helpers/mqtt_wss_pal.h | 19 ++ aclk/helpers/ringbuffer_pal.h | 11 ++ aclk/schema-wrappers/capability.cc | 2 +- aclk/schema-wrappers/capability.h | 2 +- aclk/schema-wrappers/chart_config.cc | 105 ---------- aclk/schema-wrappers/chart_config.h | 50 ----- aclk/schema-wrappers/chart_stream.cc | 337 -------------------------------- aclk/schema-wrappers/chart_stream.h | 121 ------------ aclk/schema-wrappers/connection.cc | 10 +- aclk/schema-wrappers/connection.h | 2 +- aclk/schema-wrappers/node_connection.cc | 4 +- aclk/schema-wrappers/node_connection.h | 2 +- aclk/schema-wrappers/node_creation.cc | 6 +- aclk/schema-wrappers/node_creation.h | 6 +- aclk/schema-wrappers/node_info.cc | 4 +- aclk/schema-wrappers/node_info.h | 54 ++--- aclk/schema-wrappers/proto_2_json.cc | 16 -- aclk/schema-wrappers/schema_wrappers.h | 2 - 38 files changed, 341 insertions(+), 1225 deletions(-) delete mode 100644 aclk/aclk_api.c delete mode 100644 aclk/aclk_api.h create mode 100644 aclk/aclk_capas.c create mode 100644 aclk/aclk_capas.h delete mode 100644 aclk/aclk_charts_api.c delete mode 100644 aclk/aclk_charts_api.h create mode 100644 aclk/helpers/mqtt_wss_pal.h create mode 100644 aclk/helpers/ringbuffer_pal.h delete mode 100644 aclk/schema-wrappers/chart_config.cc delete mode 100644 aclk/schema-wrappers/chart_config.h delete mode 100644 aclk/schema-wrappers/chart_stream.cc delete mode 100644 aclk/schema-wrappers/chart_stream.h (limited to 'aclk') diff --git a/aclk/README.md b/aclk/README.md index 6f541c38e..af0f5fdde 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -19,7 +19,7 @@ The Cloud App lives at app.netdata.cloud which currently resolves to the followi :::caution -This list of IPs can change without notice, we strongly advise you to whitelist the domain `app.netdata.cloud`, if +This list of IPs can change without notice, we strongly advise you to whitelist following domains `api.netdata.cloud`, `mqtt.netdata.cloud`, if this is not an option in your case always verify the current domain resolution (e.g via the `host` command). ::: @@ -49,7 +49,7 @@ configuration uses two settings: ```conf [global] enabled = yes - cloud base url = https://app.netdata.cloud + cloud base url = https://api.netdata.cloud ``` If your Agent needs to use a proxy to access the internet, you must [set up a proxy for @@ -60,12 +60,10 @@ You can configure following keys in the `netdata.conf` section `[cloud]`: [cloud] statistics = yes query thread count = 2 - mqtt5 = yes ``` - `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. - `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). -- `mqtt5` allows disabling the new MQTT5 implementation which is used now by default in case of issues. This option will be removed in future stable release. ## Disable the ACLK @@ -112,7 +110,7 @@ must contain only `EOF`. ```bash [global] enabled = no - cloud base url = https://app.netdata.cloud + cloud base url = https://api.netdata.cloud EOF ``` diff --git a/aclk/aclk.c b/aclk/aclk.c index 7b3641b1e..3b035b849 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -2,6 +2,7 @@ #include "aclk.h" +#ifdef ENABLE_ACLK #include "aclk_stats.h" #include "mqtt_wss_client.h" #include "aclk_otp.h" @@ -12,6 +13,7 @@ #include "aclk_rx_msgs.h" #include "https_client.h" #include "schema-wrappers/schema_wrappers.h" +#include "aclk_capas.h" #include "aclk_proxy.h" @@ -23,21 +25,31 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable +#endif /* ENABLE_ACLK */ + int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. int aclk_rcvd_cloud_msgs = 0; int aclk_connection_counter = 0; int disconnect_req = 0; +int aclk_connected = 0; +int aclk_ctx_based = 0; +int aclk_disable_runtime = 0; +int aclk_stats_enabled; +int aclk_kill_link = 0; + +usec_t aclk_session_us = 0; +time_t aclk_session_sec = 0; + time_t last_conn_time_mqtt = 0; time_t last_conn_time_appl = 0; time_t last_disconnect_time = 0; time_t next_connection_attempt = 0; float last_backoff_value = 0; -int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload - time_t aclk_block_until = 0; +#ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; @@ -447,9 +459,9 @@ static int aclk_block_till_recon_allowed() { */ static int aclk_get_transport_idx(aclk_env_t *env) { for (size_t i = 0; i < env->transport_count; i++) { - // currently we support only MQTT 3 + // currently we support only MQTT 5 // therefore select first transport that matches - if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) { + if (env->transports[i]->type == ACLK_TRP_MQTT_5) { return i; } } @@ -483,7 +495,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); if (cloud_base_url == NULL) { - error("Do not move the cloud base url out of post_conf_load!!"); + error_report("Do not move the cloud base url out of post_conf_load!!"); return -1; } @@ -493,13 +505,13 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) info("Attempting connection now"); memset(&base_url, 0, sizeof(url_t)); if (url_parse(cloud_base_url, &base_url)) { - error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); + error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); sleep(CLOUD_BASE_URL_READ_RETRY); url_t_destroy(&base_url); continue; } - struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT }; + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT }; aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); struct mqtt_connect_params mqtt_conn_params = { @@ -523,7 +535,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_env(aclk_env, base_url.host, base_url.port); url_t_destroy(&base_url); if (ret) { - error("Failed to Get ACLK environment"); + error_report("Failed to Get ACLK environment"); // delay handled by aclk_block_till_recon_allowed continue; } @@ -537,14 +549,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } if (!aclk_env_has_capa("proto")) { - error ("Can't use encoding=proto without at least \"proto\" capability."); + error_report("Can't use encoding=proto without at least \"proto\" capability."); continue; } info("New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { - error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); + error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); url_t_destroy(&auth_url); continue; } @@ -552,7 +564,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); url_t_destroy(&auth_url); if (ret) { - error("Error passing Challenge/Response to get OTP"); + error_report("Error passing Challenge/Response to get OTP"); continue; } @@ -561,20 +573,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); if (!mqtt_conn_params.will_topic) { - error("Couldn't get LWT topic. Will not send LWT."); + error_report("Couldn't get LWT topic. Will not send LWT."); continue; } // Do the MQTT connection ret = aclk_get_transport_idx(aclk_env); if (ret < 0) { - error("Cloud /env endpoint didn't return any transport usable by this Agent."); + error_report("Cloud /env endpoint didn't return any transport usable by this Agent."); continue; } memset(&mqtt_url, 0, sizeof(url_t)); if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ - error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); + error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); url_t_destroy(&mqtt_url); continue; } @@ -660,9 +672,7 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); - - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) { + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) { error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -672,7 +682,7 @@ void *aclk_main(void *ptr) // that send JSON payloads of 10 MB as single messages mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024); - aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled); if (aclk_stats_enabled) { stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); @@ -748,7 +758,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) node_instance_creation_t node_instance_creation = { .claim_id = localhost->aclk_state.claimed_id, .hops = host->system_info->hops, - .hostname = host->hostname, + .hostname = rrdhost_hostname(host), .machine_guid = host->machine_guid }; create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); @@ -770,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) node_state_update.node_id = mallocz(UUID_STR_LEN); uuid_unparse_lower(node_id, (char*)node_state_update.node_id); - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + node_state_update.capabilities = aclk_get_agent_capas(); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; @@ -815,15 +818,8 @@ void aclk_send_node_instances() char host_id[UUID_STR_LEN]; uuid_unparse_lower(list->host_id, host_id); - RRDHOST *host = rrdhost_find_by_guid(host_id, 0); - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + RRDHOST *host = rrdhost_find_by_guid(host_id); + node_state_update.capabilities = aclk_get_node_instance_capas(host); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; @@ -832,6 +828,8 @@ void aclk_send_node_instances() info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, list->live, list->hops); + + freez((void*)node_state_update.capabilities); freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; @@ -853,7 +851,7 @@ void aclk_send_node_instances() rrdhost_aclk_state_unlock(localhost); info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, list->hops); - freez(node_instance_creation.machine_guid); + freez((void *)node_instance_creation.machine_guid); aclk_queue_query(create_query); } freez(list->hostname); @@ -891,41 +889,13 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) status.last_submitted_sequence_id ); } +#endif /* ENABLE_ACLK */ -static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host) -{ - struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); - if (!stats) { - buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host"); - return; - } - buffer_sprintf(wb, - "\n\t\tUpdates: %d" - "\n\t\tBatch ID: %"PRIu64 - "\n\t\tMin Seq ID: %"PRIu64 - "\n\t\tMax Seq ID: %"PRIu64 - "\n\t\tPending Min Seq ID: %"PRIu64 - "\n\t\tPending Max Seq ID: %"PRIu64 - "\n\t\tSent Min Seq ID: %"PRIu64 - "\n\t\tSent Max Seq ID: %"PRIu64 - "\n\t\tAcked Min Seq ID: %"PRIu64 - "\n\t\tAcked Max Seq ID: %"PRIu64, - stats->updates, - stats->batch_id, - stats->min_seqid, - stats->max_seqid, - stats->min_seqid_pend, - stats->max_seqid_pend, - stats->min_seqid_sent, - stats->max_seqid_sent, - stats->min_seqid_ack, - stats->max_seqid_ack - ); - freez(stats); -} - -char *ng_aclk_state(void) +char *aclk_state(void) { +#ifndef ENABLE_ACLK + return strdupz("ACLK Available: No"); +#else BUFFER *wb = buffer_create(1024); struct tm *tmptr, tmbuf; char *ret; @@ -935,7 +905,7 @@ char *ng_aclk_state(void) "ACLK Version: 2\n" "Protocols Supported: Protobuf\n" ); - buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5); char *agent_id = get_agent_claimid(); if (agent_id == NULL) @@ -974,7 +944,7 @@ char *ng_aclk_state(void) RRDHOST *host; rrd_rdlock(); rrdhost_foreach_read(host) { - buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname); + buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host)); buffer_strcat(wb, "\tClaimed ID: "); rrdhost_aclk_state_lock(host); @@ -1000,9 +970,6 @@ char *ng_aclk_state(void) buffer_strcat(wb, "\n\tAlert Streaming Status:"); fill_alert_status_for_host(wb, host); - - buffer_strcat(wb, "\n\tChart Streaming Status:"); - fill_chart_status_for_host(wb, host); } rrd_unlock(); } @@ -1010,8 +977,10 @@ char *ng_aclk_state(void) ret = strdupz(buffer_tostring(wb)); buffer_free(wb); return ret; +#endif /* ENABLE_ACLK */ } +#ifdef ENABLE_ACLK static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) { struct proto_alert_status status; @@ -1038,45 +1007,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) json_object_object_add(obj, "last-submitted-seq-id", tmp); } -static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host) -{ - struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host); - if (!stats) - return; - - json_object *tmp = json_object_new_int(stats->updates); - json_object_object_add(obj, "updates", tmp); - - tmp = json_object_new_int(stats->batch_id); - json_object_object_add(obj, "batch-id", tmp); - - tmp = json_object_new_int(stats->min_seqid); - json_object_object_add(obj, "min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid); - json_object_object_add(obj, "max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_pend); - json_object_object_add(obj, "pending-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_pend); - json_object_object_add(obj, "pending-max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_sent); - json_object_object_add(obj, "sent-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_sent); - json_object_object_add(obj, "sent-max-seq-id", tmp); - - tmp = json_object_new_int(stats->min_seqid_ack); - json_object_object_add(obj, "acked-min-seq-id", tmp); - - tmp = json_object_new_int(stats->max_seqid_ack); - json_object_object_add(obj, "acked-max-seq-id", tmp); - - freez(stats); -} - static json_object *timestamp_to_json(const time_t *t) { struct tm *tmptr, tmbuf; @@ -1087,9 +1017,13 @@ static json_object *timestamp_to_json(const time_t *t) } return NULL; } +#endif /* ENABLE_ACLK */ -char *ng_aclk_state_json(void) +char *aclk_state_json(void) { +#ifndef ENABLE_ACLK + return strdupz("{\"aclk-available\":false}"); +#else json_object *tmp, *grp, *msg = json_object_new_object(); tmp = json_object_new_boolean(1); @@ -1124,7 +1058,7 @@ char *ng_aclk_state_json(void) tmp = json_object_new_string("Protobuf"); json_object_object_add(msg, "used-cloud-protocol", tmp); - tmp = json_object_new_int(use_mqtt_5 ? 5 : 3); + tmp = json_object_new_int(5); json_object_object_add(msg, "mqtt-version", tmp); tmp = json_object_new_int(aclk_rcvd_cloud_msgs); @@ -1155,7 +1089,7 @@ char *ng_aclk_state_json(void) rrdhost_foreach_read(host) { json_object *nodeinstance = json_object_new_object(); - tmp = json_object_new_string(host->hostname); + tmp = json_object_new_string(rrdhost_hostname(host)); json_object_object_add(nodeinstance, "hostname", tmp); tmp = json_object_new_string(host->machine_guid); @@ -1191,10 +1125,6 @@ char *ng_aclk_state_json(void) fill_alert_status_for_host_json(tmp, host); json_object_object_add(nodeinstance, "alert-sync-status", tmp); - tmp = json_object_new_object(); - fill_chart_status_for_host_json(tmp, host); - json_object_object_add(nodeinstance, "chart-sync-status", tmp); - json_object_array_add(grp, nodeinstance); } rrd_unlock(); @@ -1203,4 +1133,41 @@ char *ng_aclk_state_json(void) char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); json_object_put(msg); return str; +#endif /* ENABLE_ACLK */ +} + +void add_aclk_host_labels(void) { + DICTIONARY *labels = localhost->rrdlabels; + +#ifdef ENABLE_ACLK + rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); + ACLK_PROXY_TYPE aclk_proxy; + char *proxy_str; + aclk_get_proxy(&aclk_proxy); + + switch(aclk_proxy) { + case PROXY_TYPE_SOCKS5: + proxy_str = "SOCKS5"; + break; + case PROXY_TYPE_HTTP: + proxy_str = "HTTP"; + break; + default: + proxy_str = "none"; + break; + } + + rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); +#else + rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); +#endif +} + +void aclk_queue_node_info(RRDHOST *host) { + struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker; + if (likely(wc)) { + wc->node_info_send = 1; + } } diff --git a/aclk/aclk.h b/aclk/aclk.h index 5065ac2bf..6aed548b7 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -3,17 +3,30 @@ #define ACLK_H #include "daemon/common.h" + +#ifdef ENABLE_ACLK #include "aclk_util.h" #include "aclk_rrdhost_state.h" // How many MQTT PUBACKs we need to get to consider connection // stable for the purposes of TBEB (truncated binary exponential backoff) #define ACLK_PUBACKS_CONN_STABLE 3 +#endif /* ENABLE_ACLK */ + +extern int aclk_connected; +extern int aclk_ctx_based; +extern int aclk_disable_runtime; +extern int aclk_stats_enabled; +extern int aclk_kill_link; + +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; extern time_t aclk_block_until; extern int disconnect_req; +#ifdef ENABLE_ACLK void *aclk_main(void *ptr); extern netdata_mutex_t aclk_shared_state_mutex; @@ -34,7 +47,11 @@ void aclk_send_node_instances(void); void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); -char *ng_aclk_state(void); -char *ng_aclk_state_json(void); +#endif /* ENABLE_ACLK */ + +char *aclk_state(void); +char *aclk_state_json(void); +void add_aclk_host_labels(void); +void aclk_queue_node_info(RRDHOST *host); #endif /* ACLK_H */ diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index a181eb291..7df51a7b5 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -23,9 +23,6 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) char *payload = generate_alarm_log_entry(&payload_size, log_entry); aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); - - if (!use_mqtt_5) - freez(payload); } void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c deleted file mode 100644 index 141d267af..000000000 --- a/aclk/aclk_api.c +++ /dev/null @@ -1,88 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#include "libnetdata/libnetdata.h" -#include "database/rrd.h" - -#ifdef ENABLE_ACLK -#include "aclk.h" -#endif - -int aclk_connected = 0; -int aclk_kill_link = 0; - -usec_t aclk_session_us = 0; -time_t aclk_session_sec = 0; - -int aclk_disable_runtime = 0; - -int aclk_stats_enabled; -int use_mqtt_5 = 0; -int aclk_ctx_based = 0; - -#define ACLK_IMPL_KEY_NAME "aclk implementation" - -#ifdef ENABLE_ACLK -void *aclk_starter(void *ptr) { - char *aclk_impl_req = config_get(CONFIG_SECTION_CLOUD, ACLK_IMPL_KEY_NAME, "ng"); - - if (!strcasecmp(aclk_impl_req, "ng")) { - return aclk_main(ptr); - } else if (!strcasecmp(aclk_impl_req, "legacy")) { - error("Legacy ACLK is not supported anymore key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\" ignored. Using ACLK-NG."); - } else { - error("Unknown value \"%s\" of key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\". Using ACLK-NG. This config key will be deprecated.", aclk_impl_req); - } - return aclk_main(ptr); -} -#endif /* ENABLE_ACLK */ - -void add_aclk_host_labels(void) { - DICTIONARY *labels = localhost->host_labels; - -#ifdef ENABLE_ACLK - rrdlabels_add(labels, "_aclk_ng_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#else - rrdlabels_add(labels, "_aclk_ng_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#endif - rrdlabels_add(labels, "_aclk_legacy_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#ifdef ENABLE_ACLK - ACLK_PROXY_TYPE aclk_proxy; - char *proxy_str; - aclk_get_proxy(&aclk_proxy); - - switch(aclk_proxy) { - case PROXY_TYPE_SOCKS5: - proxy_str = "SOCKS5"; - break; - case PROXY_TYPE_HTTP: - proxy_str = "HTTP"; - break; - default: - proxy_str = "none"; - break; - } - - - int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES); - - rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO); - rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO); - rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); - rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); -#endif -} - -char *aclk_state(void) { -#ifndef ENABLE_ACLK - return strdupz("ACLK Available: No"); -#else - return ng_aclk_state(); -#endif -} - -char *aclk_state_json(void) { -#ifndef ENABLE_ACLK - return strdupz("{\"aclk-available\":false}"); -#else - return ng_aclk_state_json(); -#endif -} diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h deleted file mode 100644 index 36a6d603f..000000000 --- a/aclk/aclk_api.h +++ /dev/null @@ -1,45 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#ifndef ACLK_API_H -#define ACLK_API_H - -#include "libnetdata/libnetdata.h" - -#include "aclk_proxy.h" - -// TODO get rid global vars as soon as -// ACLK Legacy is removed -extern int aclk_connected; -extern int aclk_kill_link; - -extern usec_t aclk_session_us; -extern time_t aclk_session_sec; - -extern int aclk_disable_runtime; - -extern int aclk_stats_enabled; -extern int aclk_alert_reloaded; - -extern int use_mqtt_5; -extern int aclk_ctx_based; - -#ifdef ENABLE_ACLK -void *aclk_starter(void *ptr); - -void aclk_host_state_update(RRDHOST *host, int connect); - -#define NETDATA_ACLK_HOOK \ - { .name = "ACLK_Main", \ - .config_section = NULL, \ - .config_name = NULL, \ - .enabled = 1, \ - .thread = NULL, \ - .init_routine = NULL, \ - .start_routine = aclk_starter }, - -#endif - -void add_aclk_host_labels(void); -char *aclk_state(void); -char *aclk_state_json(void); - -#endif /* ACLK_API_H */ diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c new file mode 100644 index 000000000..df9d18f63 --- /dev/null +++ b/aclk/aclk_capas.c @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_capas.h" + +#include "ml/ml.h" + +const struct capability *aclk_get_agent_capas() +{ + static struct capability agent_capabilities[] = { + { .name = "json", .version = 2, .enabled = 0 }, + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = 0, .enabled = 0 }, + { .name = "mc", .version = 0, .enabled = 0 }, + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 1, .enabled = 1 }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + agent_capabilities[2].version = ml_capable() ? 1 : 0; + agent_capabilities[2].enabled = ml_enabled(localhost); + + agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0; + agent_capabilities[3].enabled = enable_metric_correlations; + + return agent_capabilities; +} + +struct capability *aclk_get_node_instance_capas(RRDHOST *host) +{ + struct capability ni_caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) }, + { .name = "mc", + .version = enable_metric_correlations ? metric_correlations_version : 0, + .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 0, .enabled = 0 }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) { + ni_caps[4].version = 1; + ni_caps[4].enabled = 1; + } + + struct capability *ret = mallocz(sizeof(ni_caps)); + memcpy(ret, ni_caps, sizeof(ni_caps)); + return ret; +} diff --git a/aclk/aclk_capas.h b/aclk/aclk_capas.h new file mode 100644 index 000000000..c39a197b8 --- /dev/null +++ b/aclk/aclk_capas.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_CAPAS_H +#define ACLK_CAPAS_H + +#include "daemon/common.h" +#include "libnetdata/libnetdata.h" + +#include "schema-wrappers/capability.h" + +const struct capability *aclk_get_agent_capas(); +struct capability *aclk_get_node_instance_capas(RRDHOST *host); + +#endif /* ACLK_CAPAS_H */ diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c deleted file mode 100644 index 51d8dad58..000000000 --- a/aclk/aclk_charts_api.c +++ /dev/null @@ -1,77 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#include "aclk_charts_api.h" - -#include "aclk_query_queue.h" - -#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated" - -void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; - query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) -{ - aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; - query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id); - query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size) -{ - aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED; - query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size); - query->data.bin_payload.msg_name = "ChartConfigsUpdated"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_chart_reset(chart_reset_t reset) -{ - aclk_query_t query = aclk_query_new(CHART_RESET); - query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET; - query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset); - query->data.bin_payload.msg_name = "ResetChartMessages"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_retention_updated(struct retention_updated *data) -{ - aclk_query_t query = aclk_query_new(RETENTION_UPDATED); - query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED; - query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data); - query->data.bin_payload.msg_name = "RetentionUpdated"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_update_node_info(struct update_node_info *info) -{ - aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); - query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; - query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); - query->data.bin_payload.msg_name = "UpdateNodeInfo"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} - -void aclk_update_node_collectors(struct update_node_collectors *collectors) -{ - aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); - query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; - query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); - query->data.bin_payload.msg_name = "UpdateNodeCollectors"; - QUEUE_IF_PAYLOAD_PRESENT(query); -} diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h deleted file mode 100644 index 71f07dd33..000000000 --- a/aclk/aclk_charts_api.h +++ /dev/null @@ -1,22 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#ifndef ACLK_CHARTS_H -#define ACLK_CHARTS_H - -#include "../daemon/common.h" -#include "schema-wrappers/schema_wrappers.h" - -void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); -void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); -void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id); - -void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size); - -void aclk_chart_reset(chart_reset_t reset); - -void aclk_retention_updated(struct retention_updated *data); - -void aclk_update_node_info(struct update_node_info *info); - -void aclk_update_node_collectors(struct update_node_collectors *collectors); - -#endif /* ACLK_CHARTS_H */ diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c index f17d3cabd..f3344935e 100644 --- a/aclk/aclk_contexts_api.c +++ b/aclk/aclk_contexts_api.c @@ -21,3 +21,21 @@ void aclk_send_contexts_updated(contexts_updated_t data) query->data.bin_payload.msg_name = "ContextsUpdated"; QUEUE_IF_PAYLOAD_PRESENT(query); } + +void aclk_update_node_collectors(struct update_node_collectors *collectors) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; + query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); + query->data.bin_payload.msg_name = "UpdateNodeCollectors"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_update_node_info(struct update_node_info *info) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; + query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); + query->data.bin_payload.msg_name = "UpdateNodeInfo"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h index 46b916d22..f0b5ec77e 100644 --- a/aclk/aclk_contexts_api.h +++ b/aclk/aclk_contexts_api.h @@ -7,6 +7,8 @@ void aclk_send_contexts_snapshot(contexts_snapshot_t data); void aclk_send_contexts_updated(contexts_updated_t data); +void aclk_update_node_collectors(struct update_node_collectors *collectors); +void aclk_update_node_info(struct update_node_info *info); #endif /* ACLK_CONTEXTS_API_H */ diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index b7bf173c4..2bdbb70fb 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -13,7 +13,7 @@ static int aclk_https_request(https_req_t *request, https_req_response_t *respon int rc; // wrapper for ACLK only which loads ACLK specific proxy settings // then only calls https_request - struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT }; + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT }; aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); if (proxy_conf.type == MQTT_WSS_PROXY_HTTP) { @@ -380,7 +380,7 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char ** base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64)); if (*challenge_bytes != CHALLENGE_LEN) { error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN); - freez(challenge); + freez(*challenge); *challenge = NULL; goto cleanup_json; } @@ -490,7 +490,7 @@ int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **m int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) #endif { - unsigned char *challenge; + unsigned char *challenge = NULL; int challenge_bytes; char *agent_id = get_agent_claimid(); @@ -844,10 +844,7 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { return 1; } - if (rrdcontext_enabled) - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); - else - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); freez(agent_id); diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 981c01965..5301c281f 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -38,9 +38,7 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id) int res; uuid_t node_id_bin, host_id_bin; - rrd_rdlock(); - RRDHOST *host = find_host_by_node_id((char *) node_id); - rrd_unlock(); + RRDHOST *host = find_host_by_node_id((char *)node_id); if (host) return host; @@ -54,7 +52,7 @@ static RRDHOST *node_id_2_rrdhost(const char *node_id) return NULL; } uuid_unparse_lower(host_id_bin, host_id); - return rrdhost_find_by_guid(host_id, 0); + return rrdhost_find_by_guid(host_id); } #define NODE_ID_QUERY "/node/" @@ -82,7 +80,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() w->cookie1[0] = 0; // Simulate web_client_create_on_fd() w->cookie2[0] = 0; // Simulate web_client_create_on_fd() - w->acl = 0x1f; + w->acl = WEB_CLIENT_ACL_ACLK; buffer_strcat(log_buffer, query->data.http_api_v2.query); size_t size = 0; @@ -101,7 +99,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } } - RRDHOST *temp_host = NULL; if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); char nodeid[UUID_STR_LEN]; @@ -116,14 +113,11 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) query_host = node_id_2_rrdhost(nodeid); if (!query_host) { - temp_host = sql_create_host_by_uuid(nodeid); - if (!temp_host) { - error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); - retval = 1; - w->response.code = 404; - aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); - goto cleanup; - } + error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); + retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); + goto cleanup; } } @@ -144,8 +138,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } // execute the query - t = aclk_web_api_v1_request(query_host ? query_host : temp_host, w, mysep ? mysep + 1 : "noop"); - free_temporary_host(temp_host); + t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; sent = size; @@ -263,7 +256,7 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) return 0; } -const char *aclk_query_get_name(aclk_query_type_t qt) +const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok) { switch (qt) { case HTTP_API_V2: return "http_api_request_v2"; @@ -280,7 +273,8 @@ const char *aclk_query_get_name(aclk_query_type_t qt) case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; default: - error_report("Unknown query type used %d", (int) qt); + if (!unknown_ok) + error_report("Unknown query type used %d", (int) qt); return "unknown"; } } @@ -329,7 +323,7 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) static void worker_aclk_register(void) { worker_register("ACLKQUERY"); for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) { - worker_register_job_name(i, aclk_query_get_name(i)); + worker_register_job_name(i, aclk_query_get_name(i, 0)); } } diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h index f86754a2a..c006b0138 100644 --- a/aclk/aclk_query.h +++ b/aclk/aclk_query.h @@ -31,6 +31,6 @@ struct aclk_query_threads { void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client); void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); -const char *aclk_query_get_name(aclk_query_type_t qt); +const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok); #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 01b20d23f..9a450571e 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -111,22 +111,6 @@ void aclk_query_free(aclk_query_t query) freez(query->data.http_api_v2.query); break; - case NODE_STATE_UPDATE: - case REGISTER_NODE: - case CHART_DIMS_UPDATE: - case CHART_CONFIG_UPDATED: - case CHART_RESET: - case RETENTION_UPDATED: - case UPDATE_NODE_INFO: - case ALARM_LOG_HEALTH: - case ALARM_PROVIDE_CFG: - case ALARM_SNAPSHOT: - case UPDATE_NODE_COLLECTORS: - case PROTO_BIN_MESSAGE: - if (!use_mqtt_5) - freez(query->data.bin_payload.payload); - break; - default: break; } diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index e6ed332cc..83bc5508b 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -5,6 +5,7 @@ #include "aclk_stats.h" #include "aclk_query_queue.h" #include "aclk.h" +#include "aclk_capas.h" #include "schema-wrappers/proto_2_json.h" @@ -274,7 +275,7 @@ int create_node_instance_result(const char *msg, size_t msg_len) .node_id = res.node_id }; - RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid); if (host) { // not all host must have RRDHOST struct created for them // if they never connected during runtime of agent @@ -289,20 +290,15 @@ int create_node_instance_result(const char *msg, size_t msg_len) } } - struct capability caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_state_update.capabilities = caps; + node_state_update.capabilities = aclk_get_node_instance_capas(host); rrdhost_aclk_state_lock(localhost); node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); + freez((void *)node_state_update.capabilities); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; @@ -322,44 +318,25 @@ int send_node_instances(const char *msg, size_t msg_len) int stream_charts_and_dimensions(const char *msg, size_t msg_len) { - aclk_ctx_based = 0; - stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return 1; - } - chart_batch_id = res.batch_id; - aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); - freez(res.claim_id); - freez(res.node_id); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensions msg"); return 0; } int charts_and_dimensions_ack(const char *msg, size_t msg_len) { - chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); - if (!res.claim_id || !res.node_id) { - error("Error parsing StreamChartsAndDimensions msg"); - freez(res.claim_id); - freez(res.node_id); - return 1; - } - aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); - freez(res.claim_id); - freez(res.node_id); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensionsAck msg"); return 0; } int update_chart_configs(const char *msg, size_t msg_len) { - struct update_chart_config res = parse_update_chart_config(msg, msg_len); - if (!res.claim_id || !res.node_id || !res.hashes) - error("Error parsing UpdateChartConfigs msg"); - else - aclk_get_chart_config(res.hashes); - destroy_update_chart_config(&res); + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete UpdateChartConfigs msg"); return 0; } @@ -527,7 +504,7 @@ unsigned int aclk_init_rx_msg_handlers(void) return i; } -void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic) +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic __maybe_unused) { if (aclk_stats_enabled) { ACLK_STATS_LOCK; @@ -546,15 +523,16 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t return; } -#ifdef NETDATA_INTERNAL_CHECKS - if (!strncmp(message_type, "cmd", strlen("cmd"))) { - log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name); - } else { - char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name); - log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name); - freez(json); + + if (aclklog_enabled) { + if (!strncmp(message_type, "cmd", strlen("cmd"))) { + log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name); + } else { + char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name); + log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name); + freez(json); + } } -#endif if (aclk_stats_enabled) { ACLK_STATS_LOCK; diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 241e9b724..215313ff9 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -39,8 +39,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE); rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st_aclkstats); + } rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online); @@ -60,8 +59,7 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st_query_thread); + } rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued); rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched); @@ -83,8 +81,8 @@ static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample) rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); + } + if(per_sample->latency_count) rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count)); else @@ -109,8 +107,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); + } rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err); rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err); @@ -129,10 +126,9 @@ static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) - dims[i] = rrddim_add(st, aclk_query_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + dims[i] = rrddim_add(st, aclk_query_get_name(i, 1), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); + } for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]); @@ -171,8 +167,7 @@ static void aclk_stats_cloud_req_http_type(struct aclk_metrics_per_sample *per_s for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) rd_rq_types[i] = rrddim_add(st, cloud_req_http_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); + } for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_http_by_type[i]); @@ -197,8 +192,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) error("snprintf encoding error"); aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } - } else - rrdset_next(st); + } for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); @@ -222,8 +216,7 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); + } if(per_sample->cloud_q_process_count) rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count)); @@ -248,8 +241,7 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) { aclk_stats_cfg.rx_msg_dims[i] = rrddim_add(st, rx_handler_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } - } else - rrdset_next(st); + } for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) rrddim_set_by_pointer(st, aclk_stats_cfg.rx_msg_dims[i], rx_msgs_sample[i]); @@ -275,8 +267,7 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); - } else - rrdset_next(st); + } rrddim_set_by_pointer(st, rd_sent, sent); rrddim_set_by_pointer(st, rd_recvd, recvd); diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 822a90fa2..532b964ad 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -5,6 +5,7 @@ #include "aclk_util.h" #include "aclk_stats.h" #include "aclk.h" +#include "aclk_capas.h" #include "schema-wrappers/proto_2_json.h" @@ -15,6 +16,13 @@ // version for aclk legacy (old cloud arch) #define ACLK_VERSION 2 +static void freez_aclk_publish5a(void *ptr) { + freez(ptr); +} +static void freez_aclk_publish5b(void *ptr) { + freez(ptr); +} + uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) { #ifndef ACLK_LOG_CONVERSATION_DIR @@ -28,43 +36,27 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s return 0; } - if (use_mqtt_5) - mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); - else - mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); - char *json = protomsg_to_json(msg, msg_len, msgname); - log_aclk_message_bin(json, strlen(json), 1, topic, msgname); - freez(json); #endif + if (aclklog_enabled) { + char *json = protomsg_to_json(msg, msg_len, msgname); + log_aclk_message_bin(json, strlen(json), 1, topic, msgname); + freez(json); + } + return packet_id; } -/* UNUSED now but can be used soon MVP1? -static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic) +// json_object_put returns int unfortunately :D +// we need void(*fnc)(void *); +static void json_object_put_wrapper(void *jsonobj) { - if (unlikely(!topic || topic[0] != '/')) { - error ("Full topic required!"); - return; - } - - const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); - - mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_msg_published(); -#endif -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif + json_object_put(jsonobj); } -*/ #define TOPIC_MAX_LEN 512 #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" @@ -73,10 +65,11 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec uint16_t packet_id; const char *str; char *full_msg = NULL; - int len, rc; + int len; if (unlikely(!topic || topic[0] != '/')) { error ("Full topic required!"); + json_object_put(msg); return HTTP_RESP_INTERNAL_SERVER_ERROR; } @@ -87,40 +80,20 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); memcpy(full_msg, str, len); + json_object_put(msg); + msg = NULL; memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); len += strlen(V2_BIN_PAYLOAD_SEPARATOR); memcpy(&full_msg[len], payload, payload_len); len += payload_len; } -/* TODO -#ifdef ACLK_LOG_CONVERSATION_DIR -#define FN_MAX_LEN 1024 - char filename[FN_MAX_LEN]; - snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); - json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); -#endif */ - - if (use_mqtt_5) - mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id); - else { - rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000); - if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) { - error("Timeout sending binpacked message"); - freez(full_msg); - return HTTP_RESP_BACKEND_FETCH_FAILED; - } - if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) { - error("Message is bigger than allowed maximum"); - freez(full_msg); - return HTTP_RESP_FORBIDDEN; - } - } + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id); #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); #endif - freez(full_msg); + return 0; } @@ -203,7 +176,6 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { error("Failed to send cancelation message for http reply"); } - json_object_put(msg); } void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) @@ -222,7 +194,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg json_object_object_add(msg, "http-code", tmp); int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); - json_object_put(msg); switch (rc) { case HTTP_RESP_FORBIDDEN: @@ -241,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable size_t len; uint16_t pid; - struct capability agent_capabilities[] = { - { .name = "json", .version = 2, .enabled = 0 }, - { .name = "proto", .version = 1, .enabled = 1 }, -#ifdef ENABLE_ML - { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) }, -#endif - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled }, - { .name = NULL, .version = 0, .enabled = 0 } - }; - update_agent_connection_t conn = { .reachable = (reachable ? 1 : 0), .lwt = 0, .session_id = aclk_session_newarch, - .capabilities = agent_capabilities + .capabilities = aclk_get_agent_capas() }; rrdhost_aclk_state_lock(localhost); @@ -279,8 +239,6 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable } pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); - if (!use_mqtt_5) - freez(msg); if (localhost->aclk_state.prev_claimed_id) { freez(localhost->aclk_state.prev_claimed_id); localhost->aclk_state.prev_claimed_id = NULL; diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index ec021aec5..01eaedc8e 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "aclk_util.h" +#include "aclk_proxy.h" #include "daemon/common.h" diff --git a/aclk/helpers/mqtt_wss_pal.h b/aclk/helpers/mqtt_wss_pal.h new file mode 100644 index 000000000..5c89f8bb7 --- /dev/null +++ b/aclk/helpers/mqtt_wss_pal.h @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef MQTT_WSS_PAL_H +#define MQTT_WSS_PAL_H + +#include "libnetdata/libnetdata.h" + +#undef OPENSSL_VERSION_095 +#undef OPENSSL_VERSION_097 +#undef OPENSSL_VERSION_110 +#undef OPENSSL_VERSION_111 + +#define mw_malloc(...) mallocz(__VA_ARGS__) +#define mw_calloc(...) callocz(__VA_ARGS__) +#define mw_free(...) freez(__VA_ARGS__) +#define mw_strdup(...) strdupz(__VA_ARGS__) +#define mw_realloc(...) reallocz(__VA_ARGS__) + +#endif /* MQTT_WSS_PAL_H */ diff --git a/aclk/helpers/ringbuffer_pal.h b/aclk/helpers/ringbuffer_pal.h new file mode 100644 index 000000000..2f7e1cb93 --- /dev/null +++ b/aclk/helpers/ringbuffer_pal.h @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef RINGBUFFER_PAL_H +#define RINGBUFFER_PAL_H + +#include "libnetdata/libnetdata.h" + +#define crbuf_malloc(...) mallocz(__VA_ARGS__) +#define crbuf_free(...) freez(__VA_ARGS__) + +#endif /* RINGBUFFER_PAL_H */ diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc index 769806f90..af45740a9 100644 --- a/aclk/schema-wrappers/capability.cc +++ b/aclk/schema-wrappers/capability.cc @@ -4,7 +4,7 @@ #include "capability.h" -void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) { +void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) { proto_capa->set_name(c_capa->name); proto_capa->set_enabled(c_capa->enabled); proto_capa->set_version(c_capa->version); diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h index 9517a8716..c6085a44b 100644 --- a/aclk/schema-wrappers/capability.h +++ b/aclk/schema-wrappers/capability.h @@ -18,7 +18,7 @@ struct capability { #include "proto/aclk/v1/lib.pb.h" -void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa); +void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa); #endif #endif /* ACLK_SCHEMA_CAPABILITY_H */ diff --git a/aclk/schema-wrappers/chart_config.cc b/aclk/schema-wrappers/chart_config.cc deleted file mode 100644 index 87e34e0df..000000000 --- a/aclk/schema-wrappers/chart_config.cc +++ /dev/null @@ -1,105 +0,0 @@ -#include "chart_config.h" - -#include "proto/chart/v1/config.pb.h" - -#include "libnetdata/libnetdata.h" - -#include "schema_wrapper_utils.h" - -void destroy_update_chart_config(struct update_chart_config *cfg) -{ - freez(cfg->claim_id); - freez(cfg->node_id); - freez(cfg->hashes); -} - -void destroy_chart_config_updated(struct chart_config_updated *cfg) -{ - freez(cfg->type); - freez(cfg->family); - freez(cfg->context); - freez(cfg->title); - freez(cfg->plugin); - freez(cfg->module); - freez(cfg->units); - freez(cfg->config_hash); -} - -struct update_chart_config parse_update_chart_config(const char *data, size_t len) -{ - chart::v1::UpdateChartConfigs cfgs; - update_chart_config res; - memset(&res, 0, sizeof(res)); - - if (!cfgs.ParseFromArray(data, len)) - return res; - - res.claim_id = strdupz(cfgs.claim_id().c_str()); - res.node_id = strdupz(cfgs.node_id().c_str()); - - // to not do bazillion tiny allocations for individual strings - // we calculate how much memory we will need for all of them - // and allocate at once - int hash_count = cfgs.config_hashes_size(); - size_t total_strlen = 0; - for (int i = 0; i < hash_count; i++) - total_strlen += cfgs.config_hashes(i).length(); - total_strlen += hash_count; //null bytes - - res.hashes = (char**)callocz( 1, - (hash_count+1) * sizeof(char*) + //char * array incl. terminating NULL at the end - total_strlen //strings themselves incl. 1 null byte each - ); - - char* dest = ((char*)res.hashes) + (hash_count + 1 /* NULL ptr */) * sizeof(char *); - // now copy them strings - // null bytes handled by callocz - for (int i = 0; i < hash_count; i++) { - strcpy(dest, cfgs.config_hashes(i).c_str()); - res.hashes[i] = dest; - dest += strlen(dest) + 1 /* end string null */; - } - - return res; -} - -char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size) -{ - chart::v1::ChartConfigsUpdated configs; - for (int i = 0; i < list_size; i++) { - chart::v1::ChartConfigUpdated *config = configs.add_configs(); - config->set_type(config_list[i].type); - if (config_list[i].family) - config->set_family(config_list[i].family); - config->set_context(config_list[i].context); - config->set_title(config_list[i].title); - config->set_priority(config_list[i].priority); - config->set_plugin(config_list[i].plugin); - - if (config_list[i].module) - config->set_module(config_list[i].module); - - switch (config_list[i].chart_type) { - case RRDSET_TYPE_LINE: - config->set_chart_type(chart::v1::LINE); - break; - case RRDSET_TYPE_AREA: - config->set_chart_type(chart::v1::AREA); - break; - case RRDSET_TYPE_STACKED: - config->set_chart_type(chart::v1::STACKED); - break; - default: - return NULL; - } - - config->set_units(config_list[i].units); - config->set_config_hash(config_list[i].config_hash); - } - - *len = PROTO_COMPAT_MSG_SIZE(configs); - char *bin = (char*)mallocz(*len); - configs.SerializeToArray(bin, *len); - - return bin; -} diff --git a/aclk/schema-wrappers/chart_config.h b/aclk/schema-wrappers/chart_config.h deleted file mode 100644 index f08f76b61..000000000 --- a/aclk/schema-wrappers/chart_config.h +++ /dev/null @@ -1,50 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H -#define ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H - -#include - -#include "database/rrd.h" - -#ifdef __cplusplus -extern "C" { -#endif - -struct update_chart_config { - char *claim_id; - char *node_id; - char **hashes; -}; - -enum chart_config_chart_type { - LINE, - AREA, - STACKED -}; - -struct chart_config_updated { - char *type; - char *family; - char *context; - char *title; - uint64_t priority; - char *plugin; - char *module; - RRDSET_TYPE chart_type; - char *units; - char *config_hash; -}; - -void destroy_update_chart_config(struct update_chart_config *cfg); -void destroy_chart_config_updated(struct chart_config_updated *cfg); - -struct update_chart_config parse_update_chart_config(const char *data, size_t len); - -char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size); - -#ifdef __cplusplus -} -#endif - -#endif /* ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H */ diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc deleted file mode 100644 index 54c940758..000000000 --- a/aclk/schema-wrappers/chart_stream.cc +++ /dev/null @@ -1,337 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "aclk/aclk_util.h" - -#include "proto/chart/v1/stream.pb.h" -#include "chart_stream.h" - -#include "schema_wrapper_utils.h" - -#include -#include - -stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len) -{ - chart::v1::StreamChartsAndDimensions msg; - stream_charts_and_dims_t res; - memset(&res, 0, sizeof(res)); - - if (!msg.ParseFromArray(data, len)) - return res; - - res.node_id = strdup(msg.node_id().c_str()); - res.claim_id = strdup(msg.claim_id().c_str()); - res.seq_id = msg.sequence_id(); - res.batch_id = msg.batch_id(); - set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at); - - return res; -} - -chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len) -{ - chart::v1::ChartsAndDimensionsAck msg; - chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 }; - - if (!msg.ParseFromArray(data, len)) - return res; - - res.node_id = strdup(msg.node_id().c_str()); - res.claim_id = strdup(msg.claim_id().c_str()); - res.last_seq_id = msg.last_sequence_id(); - - return res; -} - -char *generate_reset_chart_messages(size_t *len, chart_reset_t reset) -{ - chart::v1::ResetChartMessages msg; - - msg.set_claim_id(reset.claim_id); - msg.set_node_id(reset.node_id); - switch (reset.reason) { - case DB_EMPTY: - msg.set_reason(chart::v1::ResetReason::DB_EMPTY); - break; - case SEQ_ID_NOT_EXISTS: - msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS); - break; - case TIMESTAMP_MISMATCH: - msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH); - break; - default: - return NULL; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)malloc(*len); - if (bin) - msg.SerializeToArray(bin, *len); - - return bin; -} - -void chart_instance_updated_destroy(struct chart_instance_updated *instance) -{ - freez((char*)instance->id); - freez((char*)instance->claim_id); - - rrdlabels_destroy(instance->chart_labels); - - freez((char*)instance->config_hash); -} - -static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update) -{ - google::protobuf::Map *map; - aclk_lib::v1::ACLKMessagePosition *pos; - - chart->set_id(update->id); - chart->set_claim_id(update->claim_id); - chart->set_node_id(update->node_id); - chart->set_name(update->name); - - map = chart->mutable_chart_labels(); - rrdlabels_walkthrough_read(update->chart_labels, label_add_to_map_callback, map); - - switch (update->memory_mode) { - case RRD_MEMORY_MODE_NONE: - chart->set_memory_mode(chart::v1::NONE); - break; - case RRD_MEMORY_MODE_RAM: - chart->set_memory_mode(chart::v1::RAM); - break; - case RRD_MEMORY_MODE_MAP: - chart->set_memory_mode(chart::v1::MAP); - break; - case RRD_MEMORY_MODE_SAVE: - chart->set_memory_mode(chart::v1::SAVE); - break; - case RRD_MEMORY_MODE_ALLOC: - chart->set_memory_mode(chart::v1::ALLOC); - break; - case RRD_MEMORY_MODE_DBENGINE: - chart->set_memory_mode(chart::v1::DB_ENGINE); - break; - default: - return 1; - break; - } - - chart->set_update_every_interval(update->update_every); - chart->set_config_hash(update->config_hash); - - pos = chart->mutable_position(); - pos->set_sequence_id(update->position.sequence_id); - pos->set_previous_sequence_id(update->position.previous_sequence_id); - set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); - - return 0; -} - -static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim) -{ - aclk_lib::v1::ACLKMessagePosition *pos; - - dim->set_id(c_dim->id); - dim->set_chart_id(c_dim->chart_id); - dim->set_node_id(c_dim->node_id); - dim->set_claim_id(c_dim->claim_id); - dim->set_name(c_dim->name); - - set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at()); - set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp()); - - pos = dim->mutable_position(); - pos->set_sequence_id(c_dim->position.sequence_id); - pos->set_previous_sequence_id(c_dim->position.previous_sequence_id); - set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); - - return 0; -} - -char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - chart::v1::ChartInstanceUpdated db_chart; - chart::v1::ChartDimensionUpdated db_dim; - aclk_lib::v1::ACLKMessagePosition *pos; - - msg.set_batch_id(batch_id); - - for (int i = 0; payloads[i]; i++) { - if (is_dim[i]) { - if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); - return NULL; - } - - pos = db_dim.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart::v1::ChartDimensionUpdated *dim = msg.add_dimensions(); - *dim = db_dim; - } else { - if (!db_chart.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); - return NULL; - } - - pos = db_chart.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart::v1::ChartInstanceUpdated *chart = msg.add_charts(); - *chart = db_chart; - } - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - - msg.set_batch_id(chart_batch_id); - - for (int i = 0; payloads[i]; i++) { - chart::v1::ChartInstanceUpdated db_msg; - chart::v1::ChartInstanceUpdated *chart; - aclk_lib::v1::ACLKMessagePosition *pos; - - if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); - return NULL; - } - - pos = db_msg.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart = msg.add_charts(); - *chart = db_msg; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - - msg.set_batch_id(chart_batch_id); - - for (int i = 0; payloads[i]; i++) { - chart::v1::ChartDimensionUpdated db_msg; - chart::v1::ChartDimensionUpdated *dim; - aclk_lib::v1::ACLKMessagePosition *pos; - - if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); - return NULL; - } - - pos = db_msg.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - dim = msg.add_dimensions(); - *dim = db_msg; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update) -{ - chart::v1::ChartInstanceUpdated *chart = new chart::v1::ChartInstanceUpdated(); - - if (set_chart_instance_updated(chart, update)) - return NULL; - - *len = PROTO_COMPAT_MSG_SIZE_PTR(chart); - char *bin = (char*)mallocz(*len); - chart->SerializeToArray(bin, *len); - - delete chart; - return bin; -} - -char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim) -{ - chart::v1::ChartDimensionUpdated *proto_dim = new chart::v1::ChartDimensionUpdated(); - - if (set_chart_dim_updated(proto_dim, dim)) - return NULL; - - *len = PROTO_COMPAT_MSG_SIZE_PTR(proto_dim); - char *bin = (char*)mallocz(*len); - proto_dim->SerializeToArray(bin, *len); - - delete proto_dim; - return bin; -} - -using namespace google::protobuf; - -char *generate_retention_updated(size_t *len, struct retention_updated *data) -{ - chart::v1::RetentionUpdated msg; - - msg.set_claim_id(data->claim_id); - msg.set_node_id(data->node_id); - - switch (data->memory_mode) { - case RRD_MEMORY_MODE_NONE: - msg.set_memory_mode(chart::v1::NONE); - break; - case RRD_MEMORY_MODE_RAM: - msg.set_memory_mode(chart::v1::RAM); - break; - case RRD_MEMORY_MODE_MAP: - msg.set_memory_mode(chart::v1::MAP); - break; - case RRD_MEMORY_MODE_SAVE: - msg.set_memory_mode(chart::v1::SAVE); - break; - case RRD_MEMORY_MODE_ALLOC: - msg.set_memory_mode(chart::v1::ALLOC); - break; - case RRD_MEMORY_MODE_DBENGINE: - msg.set_memory_mode(chart::v1::DB_ENGINE); - break; - default: - return NULL; - } - - for (int i = 0; i < data->interval_duration_count; i++) { - Map *map = msg.mutable_interval_durations(); - map->insert({data->interval_durations[i].update_every, data->interval_durations[i].retention}); - } - - set_google_timestamp_from_timeval(data->rotation_timestamp, msg.mutable_rotation_timestamp()); - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} diff --git a/aclk/schema-wrappers/chart_stream.h b/aclk/schema-wrappers/chart_stream.h deleted file mode 100644 index 904866868..000000000 --- a/aclk/schema-wrappers/chart_stream.h +++ /dev/null @@ -1,121 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ACLK_SCHEMA_WRAPPER_CHART_STREAM_H -#define ACLK_SCHEMA_WRAPPER_CHART_STREAM_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "database/rrd.h" - -typedef struct { - char* claim_id; - char* node_id; - - uint64_t seq_id; - uint64_t batch_id; - - struct timeval seq_id_created_at; -} stream_charts_and_dims_t; - -stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len); - -typedef struct { - char* claim_id; - char* node_id; - - uint64_t last_seq_id; -} chart_and_dim_ack_t; - -chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len); - -enum chart_reset_reason { - DB_EMPTY, - SEQ_ID_NOT_EXISTS, - TIMESTAMP_MISMATCH -}; - -typedef struct { - char *claim_id; - char *node_id; - - enum chart_reset_reason reason; -} chart_reset_t; - -char *generate_reset_chart_messages(size_t *len, const chart_reset_t reset); - -struct aclk_message_position { - uint64_t sequence_id; - struct timeval seq_id_creation_time; - uint64_t previous_sequence_id; -}; - -struct chart_instance_updated { - const char *id; - const char *claim_id; - const char *node_id; - const char *name; - - DICTIONARY *chart_labels; - - RRD_MEMORY_MODE memory_mode; - - uint32_t update_every; - const char * config_hash; - - struct aclk_message_position position; -}; - -void chart_instance_updated_destroy(struct chart_instance_updated *instance); - -struct chart_dimension_updated { - const char *id; - const char *chart_id; - const char *node_id; - const char *claim_id; - const char *name; - struct timeval created_at; - struct timeval last_timestamp; - struct aclk_message_position position; -}; - -typedef struct { - struct chart_instance_updated *charts; - uint16_t chart_count; - - struct chart_dimension_updated *dims; - uint16_t dim_count; - - uint64_t batch_id; -} charts_and_dims_updated_t; - -struct interval_duration { - uint32_t update_every; - uint32_t retention; -}; - -struct retention_updated { - char *claim_id; - char *node_id; - - RRD_MEMORY_MODE memory_mode; - - struct interval_duration *interval_durations; - int interval_duration_count; - - struct timeval rotation_timestamp; -}; - -char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id); -char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); -char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update); -char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); -char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim); -char *generate_retention_updated(size_t *len, struct retention_updated *data); - -#ifdef __cplusplus -} -#endif - -#endif /* ACLK_SCHEMA_WRAPPER_CHART_STREAM_H */ diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc index 7520a4600..20b40ece2 100644 --- a/aclk/schema-wrappers/connection.cc +++ b/aclk/schema-wrappers/connection.cc @@ -29,7 +29,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio timestamp->set_nanos(tv.tv_usec * 1000); if (data->capabilities) { - struct capability *capa = data->capabilities; + const struct capability *capa = data->capabilities; while (capa->name) { aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities(); capability_set(proto_capa, capa); @@ -38,7 +38,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio } *len = PROTO_COMPAT_MSG_SIZE(connupd); - char *msg = (char*)malloc(*len); + char *msg = (char*)mallocz(*len); if (msg) connupd.SerializeToArray(msg, *len); @@ -52,7 +52,7 @@ struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) { if (!req.ParseFromArray(data, len)) return NULL; - res = (struct disconnect_cmd *)calloc(1, sizeof(struct disconnect_cmd)); + res = (struct disconnect_cmd *)callocz(1, sizeof(struct disconnect_cmd)); if (!res) return NULL; @@ -61,9 +61,9 @@ struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) { res->permaban = req.permaban(); res->error_code = req.error_code(); if (req.error_description().c_str()) { - res->error_description = strdup(req.error_description().c_str()); + res->error_description = strdupz(req.error_description().c_str()); if (!res->error_description) { - free(res); + freez(res); return NULL; } } diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h index fcbe6bd59..0356c7d78 100644 --- a/aclk/schema-wrappers/connection.h +++ b/aclk/schema-wrappers/connection.h @@ -17,7 +17,7 @@ typedef struct { unsigned int lwt:1; - struct capability *capabilities; + const struct capability *capabilities; // TODO in future optional fields // > 15 optional fields: diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc index a6ca8ef98..db1fa6449 100644 --- a/aclk/schema-wrappers/node_connection.cc +++ b/aclk/schema-wrappers/node_connection.cc @@ -29,7 +29,7 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect timestamp->set_nanos(tv.tv_usec * 1000); if (data->capabilities) { - struct capability *capa = data->capabilities; + const struct capability *capa = data->capabilities; while (capa->name) { aclk_lib::v1::Capability *proto_capa = msg.add_capabilities(); capability_set(proto_capa, capa); @@ -38,7 +38,7 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect } *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)malloc(*len); + char *bin = (char*)mallocz(*len); if (bin) msg.SerializeToArray(bin, *len); diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h index c27729d15..dac0d8fe0 100644 --- a/aclk/schema-wrappers/node_connection.h +++ b/aclk/schema-wrappers/node_connection.h @@ -19,7 +19,7 @@ typedef struct { int64_t session_id; int32_t hops; - struct capability *capabilities; + const struct capability *capabilities; } node_instance_connection_t; char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); diff --git a/aclk/schema-wrappers/node_creation.cc b/aclk/schema-wrappers/node_creation.cc index c696bb27b..5ad25b7e5 100644 --- a/aclk/schema-wrappers/node_creation.cc +++ b/aclk/schema-wrappers/node_creation.cc @@ -18,7 +18,7 @@ char *generate_node_instance_creation(size_t *len, const node_instance_creation_ msg.set_hops(data->hops); *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)malloc(*len); + char *bin = (char*)mallocz(*len); if (bin) msg.SerializeToArray(bin, *len); @@ -33,7 +33,7 @@ node_instance_creation_result_t parse_create_node_instance_result(const char *da if (!msg.ParseFromArray(data, len)) return res; - res.node_id = strdup(msg.node_id().c_str()); - res.machine_guid = strdup(msg.machine_guid().c_str()); + res.node_id = strdupz(msg.node_id().c_str()); + res.machine_guid = strdupz(msg.machine_guid().c_str()); return res; } diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h index 190ccb4d6..7a8c7f7c7 100644 --- a/aclk/schema-wrappers/node_creation.h +++ b/aclk/schema-wrappers/node_creation.h @@ -8,9 +8,9 @@ extern "C" { #endif typedef struct { - char* claim_id; - char* machine_guid; - char* hostname; + const char *claim_id; + const char *machine_guid; + const char *hostname; int32_t hops; } node_instance_creation_t; diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc index 2a05ddaba..5e321f688 100644 --- a/aclk/schema-wrappers/node_info.cc +++ b/aclk/schema-wrappers/node_info.cc @@ -104,7 +104,7 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in } *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)malloc(*len); + char *bin = (char*)mallocz(*len); if (bin) msg.SerializeToArray(bin, *len); @@ -128,7 +128,7 @@ char *generate_update_node_collectors_message(size_t *len, struct update_node_co dfe_done(colls); *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)malloc(*len); + char *bin = (char*)mallocz(*len); if (bin) msg.SerializeToArray(bin, *len); diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h index e8ac2d7c6..de4ade78a 100644 --- a/aclk/schema-wrappers/node_info.h +++ b/aclk/schema-wrappers/node_info.h @@ -19,41 +19,27 @@ struct machine_learning_info { }; struct aclk_node_info { - char *name; - - char *os; - char *os_name; - char *os_version; - - char *kernel_name; - char *kernel_version; - - char *architecture; - + const char *name; + + const char *os; + const char *os_name; + const char *os_version; + const char *kernel_name; + const char *kernel_version; + const char *architecture; uint32_t cpus; - - char *cpu_frequency; - - char *memory; - - char *disk_space; - - char *version; - - char *release_channel; - - char *timezone; - - char *virtualization_type; - - char *container_type; - - char *custom_info; - - char *machine_guid; + const char *cpu_frequency; + const char *memory; + const char *disk_space; + const char *version; + const char *release_channel; + const char *timezone; + const char *virtualization_type; + const char *container_type; + const char *custom_info; + const char *machine_guid; DICTIONARY *host_labels_ptr; - struct machine_learning_info ml_info; }; @@ -72,8 +58,8 @@ struct update_node_info { }; struct collector_info { - char *module; - char *plugin; + const char *module; + const char *plugin; }; struct update_node_collectors { diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc index 0e473eb6c..8853b2e08 100644 --- a/aclk/schema-wrappers/proto_2_json.cc +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -4,8 +4,6 @@ #include "proto/alarm/v1/config.pb.h" #include "proto/alarm/v1/stream.pb.h" #include "proto/aclk/v1/lib.pb.h" -#include "proto/chart/v1/config.pb.h" -#include "proto/chart/v1/stream.pb.h" #include "proto/agent/v1/connection.pb.h" #include "proto/agent/v1/disconnect.pb.h" #include "proto/nodeinstance/connection/v1/connection.pb.h" @@ -29,14 +27,6 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new nodeinstance::v1::UpdateNodeInstanceConnection; if (!strcmp(msgname, "CreateNodeInstance")) return new nodeinstance::create::v1::CreateNodeInstance; - if (!strcmp(msgname, "ChartsAndDimensionsUpdated")) - return new chart::v1::ChartsAndDimensionsUpdated; - if (!strcmp(msgname, "ChartConfigsUpdated")) - return new chart::v1::ChartConfigsUpdated; - if (!strcmp(msgname, "ResetChartMessages")) - return new chart::v1::ResetChartMessages; - if (!strcmp(msgname, "RetentionUpdated")) - return new chart::v1::RetentionUpdated; if (!strcmp(msgname, "UpdateNodeInfo")) return new nodeinstance::info::v1::UpdateNodeInfo; if (!strcmp(msgname, "AlarmLogHealth")) @@ -59,12 +49,6 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new nodeinstance::create::v1::CreateNodeInstanceResult; if (!strcmp(msgname, "SendNodeInstances")) return new agent::v1::SendNodeInstances; - if (!strcmp(msgname, "StreamChartsAndDimensions")) - return new chart::v1::StreamChartsAndDimensions; - if (!strcmp(msgname, "ChartsAndDimensionsAck")) - return new chart::v1::ChartsAndDimensionsAck; - if (!strcmp(msgname, "UpdateChartConfigs")) - return new chart::v1::UpdateChartConfigs; if (!strcmp(msgname, "StartAlarmStreaming")) return new alarms::v1::StartAlarmStreaming; if (!strcmp(msgname, "SendAlarmLogHealth")) diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index 26412cacc..a96f7ea7a 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -8,8 +8,6 @@ #include "connection.h" #include "node_connection.h" #include "node_creation.h" -#include "chart_config.h" -#include "chart_stream.h" #include "alarm_config.h" #include "alarm_stream.h" #include "node_info.h" -- cgit v1.2.3