diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-12-01 06:15:11 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-12-01 06:15:11 +0000 |
commit | 483926a283e118590da3f9ecfa75a8a4d62143ce (patch) | |
tree | cb77052778df9a128a8cd3ff5bf7645322a13bc5 /aclk | |
parent | Releasing debian version 1.31.0-4. (diff) | |
download | netdata-483926a283e118590da3f9ecfa75a8a4d62143ce.tar.xz netdata-483926a283e118590da3f9ecfa75a8a4d62143ce.zip |
Merging upstream version 1.32.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk')
63 files changed, 3924 insertions, 1223 deletions
diff --git a/aclk/README.md b/aclk/README.md index ffd036b97..13a9be27f 100644 --- a/aclk/README.md +++ b/aclk/README.md @@ -9,18 +9,19 @@ custom_edit_url: https://github.com/netdata/netdata/edit/master/aclk/README.md The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting a Netdata Agent to your web browser through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port -`443`. The ACLK is encrypted, safe, and _is only established if you claim your node_. +`443`. The ACLK is encrypted, safe, and _is only established if you connect your node_. The Cloud App lives at app.netdata.cloud which currently resolves to 35.196.244.138. However, this IP or range of IPs can change without notice. Watch this page for updates. -For a guide to claiming a node using the ACLK, plus additional troubleshooting and reference information, read our [get -started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [claiming +For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [get +started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [connect to Cloud documentation](/claim/README.md). ## Data privacy -Privacy is very important to us. We firmly believe that your data belongs to you. This is why **we don't store any metric data in Netdata Cloud**. +[Data privacy](https://netdata.cloud/data-privacy/) is very important to us. We firmly believe that your data belongs to +you. This is why **we don't store any metric data in Netdata Cloud**. All the data that the user sees in the web browser when using Netdata Cloud, are actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard. They pass through our systems, but they are not stored. @@ -30,7 +31,7 @@ We do however store a limited number of *metadata* to be able to offer the stunn The information we store in Netdata Cloud is the following (using the publicly available demo server `frankfurt.my-netdata.io` as an example): - The email address you used to sign up/or sign in -- For each node claimed to your Spaces in Netdata Cloud: +- For each node connected to your Spaces in Netdata Cloud: - Hostname (as it appears in Netdata Cloud) - Information shown in `/api/v1/info`. For example: [https://frankfurt.my-netdata.io/api/v1/info](https://frankfurt.my-netdata.io/api/v1/info). - The chart metadata shown in `/api/v1/charts`. For example: [https://frankfurt.my-netdata.io/api/v1/info](https://frankfurt.my-netdata.io/api/v1/info). @@ -45,7 +46,7 @@ How we use them: ## Enable and configure the ACLK The ACLK is enabled by default, with its settings automatically configured and stored in the Agent's memory. No file is -created at `/var/lib/netdata/cloud.d/cloud.conf` until you either claim a node or create it yourself. The default +created at `/var/lib/netdata/cloud.d/cloud.conf` until you either connect a node or create it yourself. The default configuration uses two settings: ```conf @@ -55,17 +56,41 @@ configuration uses two settings: ``` If your Agent needs to use a proxy to access the internet, you must [set up a proxy for -claiming](/claim/README.md#claim-through-a-proxy). +connecting to cloud](/claim/README.md#connect-through-a-proxy). You can configure following keys in the `netdata.conf` section `[cloud]`: ``` [cloud] - statistics = yes - query thread count = 2 + statistics = yes + query thread count = 2 + aclk implementation = legacy ``` - `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. - `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). +- `aclk implementation` - see [ACLK implementation](#aclk-implementation) section + +## ACLK implementation + +Currently we are in process of switching ACLK to brand new technical stack called ACLK-NG. To choose your implementation, change the `aclk implementation` setting in your `netdata.conf` (accepted values `ng` or `legacy`). + +Before changing this value, check the desired implementation is available (determined at build time) by running `netdata -W buildinfo`. Following lines indicate which ACLK implementations are available: + +``` +Features: + ACLK Next Generation: YES + ACLK Legacy: YES +``` + +To verify which ACLK implementation Netdata uses, visit the `/api/v1/info` endpoint on your local dashboard and check the `aclk-implementation` key. + +New Netdata Cloud features will be implemented on top of ACLK-NG from this point on. ACLK Legacy is therefore kept as a fallback in case some users have issues with ACLK-NG or need to use features which are not yet available in ACLK-NG *(like IPv6 support and SOCKS proxy)*. + +### Improvements of ACLK-NG over Legacy are: +- No dependency on custom patched `libmosquitto` (no bundling of libraries). Which should remove obstacles many GNU/Linux distribution package maintainers had trying to provide Netdata with Cloud support +- No dependency on libwebsockets +- Lower latency and higher throughput +- More up to date, new features for Netdata Cloud are currently developed on top of ACLK-NG first ## Disable the ACLK @@ -79,7 +104,7 @@ You can pass the `--disable-cloud` parameter to the Agent installation when usin Git](/packaging/installer/methods/manual.md). When you pass this parameter, the installer does not download or compile any extra libraries. Once running, the Agent -kills the thread responsible for the ACLK and claiming behavior, and behaves as though the ACLK, and thus Netdata Cloud, +kills the thread responsible for the ACLK and connecting behavior, and behaves as though the ACLK, and thus Netdata Cloud, does not exist. ### Disable at runtime @@ -135,7 +160,7 @@ If you first disable the ACLK and any Cloud functionality and then decide you wo If you passed `--disable-cloud` to `netdata-installer.sh` during installation, you must [reinstall](/packaging/installer/REINSTALL.md) your Agent. Use the same method as before, but pass `--require-cloud` to -the installer. When installation finishes you can [claim your node](/claim/README.md#how-to-claim-a-node). +the installer. When installation finishes you can [connect your node](/claim/README.md#how-to-connect-a-node). If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` file, edit the file again and change `enabled` to `yes`: @@ -145,7 +170,6 @@ If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` enabled = yes ``` -Restart your Agent and [claim your node](/claim/README.md#how-to-claim-a-node). +Restart your Agent and [connect your node](/claim/README.md#how-to-connect-a-node). [![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Faclk%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) - diff --git a/aclk/aclk.c b/aclk/aclk.c index 35549cfea..a24d258c5 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -13,6 +13,8 @@ #include "aclk_collector_list.h" #include "https_client.h" +#include "aclk_proxy.h" + #ifdef ACLK_LOG_CONVERSATION_DIR #include <sys/types.h> #include <sys/stat.h> @@ -21,20 +23,12 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable -//TODO remove most (as in 99.999999999%) of this crap -int aclk_connected = 0; -int aclk_disable_runtime = 0; -int aclk_disable_single_updates = 0; -int aclk_kill_link = 0; - int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +int disconnect_req = 0; -time_t aclk_block_until = 0; +int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload -usec_t aclk_session_us = 0; // Used by the mqtt layer -time_t aclk_session_sec = 0; // Used by the mqtt layer - -aclk_env_t *aclk_env = NULL; +time_t aclk_block_until = 0; mqtt_wss_client mqttwss_client; @@ -43,22 +37,12 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) struct aclk_shared_state aclk_shared_state = { - .agent_state = AGENT_INITIALIZING, + .agent_state = ACLK_HOST_INITIALIZING, .last_popcorn_interrupt = 0, .mqtt_shutdown_msg_id = -1, .mqtt_shutdown_msg_rcvd = 0 }; -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} - //ENDTODO static RSA *aclk_private_key = NULL; @@ -197,8 +181,9 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) //TODO prevent big buffer on stack #define RX_MSGLEN_MAX 4096 -static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) +static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) { + UNUSED(qos); char cmsg[RX_MSGLEN_MAX]; size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -233,13 +218,61 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int error("Received message on unexpected topic %s", topic); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring message."); + error("Link is shutting down. Ignoring incoming message."); return; } aclk_handle_cloud_message(cmsg); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) +{ + UNUSED(qos); + if (msglen > RX_MSGLEN_MAX) + error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); + + debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); + + if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { + error("Link is shutting down. Ignoring incoming message."); + return; + } + + const char *msgtype = strrchr(topic, '/'); + if (unlikely(!msgtype)) { + error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic); + return; + } + msgtype++; + if (unlikely(!*msgtype)) { + error_report("Message type empty. Ignoring message from topic \"%s\"", topic); + return; + } + +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 512 + char filename[FN_MAX_LEN]; + int logfd; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype); + logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); + if(logfd < 0) + error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + write(logfd, msg, msglen); + close(logfd); +#endif + + aclk_handle_new_cloud_msg(msgtype, msg, msglen); +} + +static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { + if (aclk_use_new_cloud_arch) + msg_callback_new_protocol(topic, msg, msglen, qos); + else + msg_callback_old_protocol(topic, msg, msglen, qos); +} +#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ + static void puback_callback(uint16_t packet_id) { if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) @@ -250,7 +283,7 @@ static void puback_callback(uint16_t packet_id) #endif if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { - error("Got PUBACK for shutdown message. Can exit gracefully."); + info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; } } @@ -268,6 +301,8 @@ static int read_query_thread_count() return threads; } +void aclk_graceful_disconnect(mqtt_wss_client client); + /* Keeps connection alive and handles all network comms. * Returns on error or when netdata is shutting down. * @param client instance of mqtt_wss_client @@ -281,7 +316,16 @@ static int handle_connection(mqtt_wss_client client) // timeout 1000 to check at least once a second // for netdata_exit if (mqtt_wss_service(client, 1000) < 0){ - error("Connection Error or Dropped"); + error_report("Connection Error or Dropped"); + return 1; + } + + if (disconnect_req) { + disconnect_req = 0; + aclk_graceful_disconnect(client); + aclk_queue_unlock(); + aclk_shared_state.mqtt_shutdown_msg_id = -1; + aclk_shared_state.mqtt_shutdown_msg_rcvd = 0; return 1; } @@ -298,10 +342,21 @@ static int handle_connection(mqtt_wss_client client) return 0; } +inline static int aclk_popcorn_check() +{ + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; + return 1; + } + ACLK_SHARED_STATE_UNLOCK; + return 0; +} + inline static int aclk_popcorn_check_bump() { ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); ACLK_SHARED_STATE_UNLOCK; return 1; @@ -323,11 +378,6 @@ static inline void queue_connect_payloads(void) static inline void mqtt_connected_actions(mqtt_wss_client client) { - // TODO global vars? - usec_t now = now_realtime_usec(); - aclk_session_sec = now / USEC_PER_SEC; - aclk_session_us = now % USEC_PER_SEC; - const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND); if (!topic) @@ -335,16 +385,34 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) else mqtt_wss_subscribe(client, topic, 1); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (aclk_use_new_cloud_arch) { + topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); + } +#endif + aclk_stats_upd_online(1); aclk_connected = 1; aclk_pubacks_per_conn = 0; - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { - error("Sending `connect` payload immediately as popcorning was finished already."); - queue_connect_payloads(); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (!aclk_use_new_cloud_arch) { +#endif + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) { + error("Sending `connect` payload immediately as popcorning was finished already."); + queue_connect_payloads(); + } + ACLK_SHARED_STATE_UNLOCK; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + } else { + aclk_send_agent_connection_update(client, 1); } - ACLK_SHARED_STATE_UNLOCK; +#endif } /* Waits until agent is ready or needs to exit @@ -354,29 +422,29 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) * @return 0 - Popcorning Finished - Agent STABLE, * !0 - netdata_exit */ -static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads) +static int wait_popcorning_finishes() { time_t elapsed; int need_wait; + if (aclk_use_new_cloud_arch) + return 0; + while (!netdata_exit) { ACLK_SHARED_STATE_LOCK; - if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) { + if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) { ACLK_SHARED_STATE_UNLOCK; return 0; } elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; if (elapsed >= ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = AGENT_STABLE; + aclk_shared_state.agent_state = ACLK_HOST_STABLE; ACLK_SHARED_STATE_UNLOCK; - error("ACLK localhost popocorn finished"); - if (unlikely(!query_threads->thread_list)) - aclk_query_threads_start(query_threads, client); - queue_connect_payloads(); + error("ACLK localhost popcorn timer finished"); return 0; } ACLK_SHARED_STATE_UNLOCK; need_wait = ACLK_STABLE_TIMEOUT - elapsed; - error("ACLK localhost popocorn wait %d seconds longer", need_wait); + error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait); sleep(need_wait); } return 1; @@ -384,10 +452,16 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th void aclk_graceful_disconnect(mqtt_wss_client client) { - error("Preparing to Gracefully Shutdown the ACLK"); + info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); - aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (aclk_use_new_cloud_arch) + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); + else +#endif + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + time_t t = now_monotonic_sec(); while (!mqtt_wss_service(client, 100)) { if (now_monotonic_sec() - t >= 2) { @@ -395,14 +469,16 @@ void aclk_graceful_disconnect(mqtt_wss_client client) break; } if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { - error("MQTT App Layer `disconnect` message sent successfully"); + info("MQTT App Layer `disconnect` message sent successfully"); break; } } + info("ACLK link is down"); + log_access("ACLK DISCONNECTED"); aclk_stats_upd_online(0); aclk_connected = 0; - error("Attempting to Gracefully Shutdown MQTT/WSS connection"); + info("Attempting to gracefully shutdown the MQTT/WSS connection"); mqtt_wss_disconnect(client, 1000); } @@ -433,7 +509,7 @@ static unsigned long aclk_reconnect_delay() { return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s); } -/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled +/* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled * @return 0 - Go ahead and connect (delay expired) * 1 - netdata_exit */ @@ -455,7 +531,7 @@ static int aclk_block_till_recon_allowed() { sleep_usec(recon_delay * USEC_PER_MS); recon_delay = 0; } - return 0; + return netdata_exit; } #ifndef ACLK_DISABLE_CHALLENGE @@ -477,7 +553,7 @@ static int aclk_get_transport_idx(aclk_env_t *env) { /* Attempts to make a connection to MQTT broker over WSS * @param client instance of mqtt_wss_client - * @return 0 - Successfull Connection, + * @return 0 - Successful Connection, * <0 - Irrecoverable Error -> Kill ACLK, * >0 - netdata_exit */ @@ -498,7 +574,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) url_t mqtt_url; #endif - json_object *lwt; + json_object *lwt = NULL; while (!netdata_exit) { char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); @@ -529,9 +605,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) .will_topic = "lwt", .will_msg = NULL, .will_flags = MQTT_WSS_PUB_QOS2, - .keep_alive = 60 + .keep_alive = 60, + .drop_on_publish_fail = 1 }; +#if defined(ENABLE_NEW_CLOUD_PROTOCOL) && defined(ACLK_NEWARCH_DEVMODE) + aclk_use_new_cloud_arch = 1; + info("Switching ACLK to new protobuf protocol. Due to #define ACLK_NEWARCH_DEVMODE."); +#else + aclk_use_new_cloud_arch = 0; +#endif + #ifndef ACLK_DISABLE_CHALLENGE if (aclk_env) { aclk_env_t_destroy(aclk_env); @@ -547,6 +631,24 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) continue; } + if (netdata_exit) + return 1; + +#ifndef ACLK_NEWARCH_DEVMODE + if (aclk_env->encoding == ACLK_ENC_PROTO) { +#ifndef ENABLE_NEW_CLOUD_PROTOCOL + error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!"); + continue; +#endif + if (!aclk_env_has_capa("proto")) { + error ("Can't encoding=proto without at least \"proto\" capability."); + continue; + } + info("Switching ACLK to new protobuf protocol. Due to /env response."); + aclk_use_new_cloud_arch = 1; + } +#endif + memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); @@ -563,7 +665,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) // aclk_get_topic moved here as during OTP we // generate the topic cache - mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (aclk_use_new_cloud_arch) + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); + else + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA); + if (!mqtt_conn_params.will_topic) { error("Couldn't get LWT topic. Will not send LWT."); continue; @@ -584,9 +690,21 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) } #endif - lwt = aclk_generate_disconnect(NULL); - mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); - mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); + aclk_session_newarch = now_realtime_usec(); + aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; + aclk_session_us = aclk_session_newarch % USEC_PER_SEC; + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (aclk_use_new_cloud_arch) { + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); + } else { +#endif + lwt = aclk_generate_disconnect(NULL); + mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); + mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + } +#endif #ifdef ACLK_DISABLE_CHALLENGE ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); @@ -600,15 +718,19 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) freez((char*)mqtt_conn_params.username); #endif - json_object_put(lwt); + if (aclk_use_new_cloud_arch) + freez((char *)mqtt_conn_params.will_msg); + else + json_object_put(lwt); if (!ret) { - info("MQTTWSS connection succeeded"); + info("ACLK connection successfully established"); + log_access("ACLK CONNECTED"); mqtt_connected_actions(client); return 0; } - error("Connect failed\n"); + error_report("Connect failed"); } return 1; @@ -659,11 +781,20 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { +#else + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) { +#endif error("Couldn't initialize MQTT_WSS network library"); goto exit; } + // Enable MQTT buffer growth if necessary + // e.g. old cloud architecture clients with huge nodes + // that send JSON payloads of 10 MB as single messages + mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024); + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); if (aclk_stats_enabled) { stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); @@ -683,12 +814,19 @@ void *aclk_main(void *ptr) // warning this assumes the popcorning is relative short (3s) // if that changes call mqtt_wss_service from within // to keep OpenSSL, WSS and MQTT connection alive - if (wait_popcorning_finishes(mqttwss_client, &query_threads)) + if (wait_popcorning_finishes()) goto exit_full; + + if (unlikely(!query_threads.thread_list)) + aclk_query_threads_start(&query_threads, mqttwss_client); + + if (!aclk_use_new_cloud_arch) + queue_connect_payloads(); - if (!handle_connection(mqttwss_client)) { + if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); aclk_connected = 0; + log_access("ACLK DISCONNECTED"); } } while (!netdata_exit); @@ -721,10 +859,10 @@ exit: // fix this in both old and new ACLK extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); -void aclk_alarm_reload(void) +void ng_aclk_alarm_reload(void) { ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { ACLK_SHARED_STATE_UNLOCK; return; } @@ -733,7 +871,7 @@ void aclk_alarm_reload(void) aclk_queue_query(aclk_query_new(METADATA_ALARMS)); } -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) { BUFFER *local_buffer; json_object *msg; @@ -742,7 +880,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) { ACLK_SHARED_STATE_UNLOCK; return 0; } @@ -764,11 +902,11 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; } -int aclk_update_chart(RRDHOST *host, char *chart_name, int create) +int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create) { struct aclk_query *query; - if (aclk_popcorn_check_bump()) + if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check()) return 0; query = aclk_query_new(create ? CHART_NEW : CHART_DEL); @@ -788,11 +926,11 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create) * Add a new collector to the list * If it exists, update the chart count */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -831,11 +969,11 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu * This function will release the memory used and schedule * a cloud update */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct aclk_query *query; struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { + if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) { return; } @@ -872,26 +1010,165 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu aclk_queue_query(query); } -struct label *add_aclk_host_labels(struct label *label) { -#ifdef ENABLE_ACLK - ACLK_PROXY_TYPE aclk_proxy; - char *proxy_str; - aclk_get_proxy(&aclk_proxy); +void ng_aclk_host_state_update(RRDHOST *host, int cmd) +{ + uuid_t node_id; + int ret; - switch(aclk_proxy) { - case PROXY_TYPE_SOCKS5: - proxy_str = "SOCKS5"; - break; - case PROXY_TYPE_HTTP: - proxy_str = "HTTP"; - break; - default: - proxy_str = "none"; - break; + if (!aclk_connected || !aclk_use_new_cloud_arch) + return; + + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = (uint32_t) host->system_info->hops; + create_query->data.node_creation.hostname = strdupz(host->hostname); + create_query->data.node_creation.machine_guid = strdupz(host->machine_guid); + info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); + aclk_queue_query(create_query); + return; } - label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO); - return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = (uint32_t) host->system_info->hops; + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = cmd; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd, + host->system_info->hops); + aclk_queue_query(query); +} + +void aclk_send_node_instances() +{ + struct node_instance_list *list_head = get_node_list(); + struct node_instance_list *list = list_head; + if (unlikely(!list)) { + error_report("Failure to get_node_list from DB!"); + return; + } + while (!uuid_is_null(list->host_id)) { + if (!uuid_is_null(list->node_id)) { + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + query->data.node_update.live = list->live; + query->data.node_update.hops = list->hops; + query->data.node_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id); + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id, + list->live, + list->hops); + aclk_queue_query(query); + } else { + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + create_query->data.node_creation.hops = list->hops; + create_query->data.node_creation.hostname = list->hostname; + create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid); + info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid, + list->hops); + aclk_queue_query(create_query); + } + + list++; + } + freez(list_head); +} + +void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +{ + aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); +} + +char *ng_aclk_state(void) +{ + BUFFER *wb = buffer_create(1024); + char *ret; + + buffer_strcat(wb, + "ACLK Available: Yes\n" + "ACLK Implementation: Next Generation\n" +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + "New Cloud Protocol Support: Yes\n" #else - return label; + "New Cloud Protocol Support: No\n" #endif + "Claimed: " + ); + + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + buffer_strcat(wb, "No\n"); + else { + buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + freez(agent_id); + } + + buffer_sprintf(wb, "Online: %s\nUsed Cloud Protocol: %s", aclk_connected ? "Yes" : "No", aclk_use_new_cloud_arch ? "New" : "Legacy"); + + ret = strdupz(buffer_tostring(wb)); + buffer_free(wb); + return ret; +} + +char *ng_aclk_state_json(void) +{ + json_object *tmp, *msg = json_object_new_object(); + + tmp = json_object_new_boolean(1); + json_object_object_add(msg, "aclk-available", tmp); + + tmp = json_object_new_string("Next Generation"); + json_object_object_add(msg, "aclk-implementation", tmp); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + tmp = json_object_new_boolean(1); +#else + tmp = json_object_new_boolean(0); +#endif + json_object_object_add(msg, "new-cloud-protocol-supported", tmp); + + char *agent_id = is_agent_claimed(); + tmp = json_object_new_boolean(agent_id != NULL); + json_object_object_add(msg, "agent-claimed", tmp); + + if (agent_id) { + tmp = json_object_new_string(agent_id); + freez(agent_id); + } else + tmp = NULL; + json_object_object_add(msg, "claimed-id", tmp); + + tmp = json_object_new_boolean(aclk_connected); + json_object_object_add(msg, "online", tmp); + + tmp = json_object_new_string(aclk_use_new_cloud_arch ? "New" : "Legacy"); + json_object_object_add(msg, "used-cloud-protocol", tmp); + + char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); + json_object_put(msg); + return str; } diff --git a/aclk/aclk.h b/aclk/aclk.h index b02b93d75..444de86be 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -2,83 +2,54 @@ #ifndef ACLK_H #define ACLK_H -typedef struct aclk_rrdhost_state { - char *claimed_id; // Claimed ID if host has one otherwise NULL -} aclk_rrdhost_state; - -#include "../daemon/common.h" +#include "daemon/common.h" #include "aclk_util.h" - -// version for aclk legacy (old cloud arch) -#define ACLK_VERSION 2 - -// Define ACLK Feature Version Boundaries Here -#define ACLK_V_COMPRESSION 2 +#include "aclk_rrdhost_state.h" // How many MQTT PUBACKs we need to get to consider connection // stable for the purposes of TBEB (truncated binary exponential backoff) #define ACLK_PUBACKS_CONN_STABLE 3 -// TODO get rid of this shit -extern int aclk_disable_runtime; -extern int aclk_disable_single_updates; -extern int aclk_kill_link; -extern int aclk_connected; - extern time_t aclk_block_until; -extern usec_t aclk_session_us; -extern time_t aclk_session_sec; - -extern aclk_env_t *aclk_env; +extern int disconnect_req; void *aclk_main(void *ptr); -void aclk_single_update_disable(); -void aclk_single_update_enable(); - -#define NETDATA_ACLK_HOOK \ - { .name = "ACLK_Main", \ - .config_section = NULL, \ - .config_name = NULL, \ - .enabled = 1, \ - .thread = NULL, \ - .init_routine = NULL, \ - .start_routine = aclk_main }, extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) -typedef enum aclk_agent_state { - AGENT_INITIALIZING, - AGENT_STABLE -} ACLK_AGENT_STATE; extern struct aclk_shared_state { ACLK_AGENT_STATE agent_state; time_t last_popcorn_interrupt; // To wait for `disconnect` message PUBACK - // when shuting down + // when shutting down // at the same time if > 0 we know link is // shutting down int mqtt_shutdown_msg_id; int mqtt_shutdown_msg_rcvd; } aclk_shared_state; -void aclk_alarm_reload(void); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); +void ng_aclk_alarm_reload(void); +int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); -// TODO this is for bacward compatibility with ACLK legacy -#define ACLK_CMD_CHART 1 -#define ACLK_CMD_CHARTDEL 0 /* Informs ACLK about created/deleted chart * @param create 0 - if chart was deleted, other if chart created */ -int aclk_update_chart(RRDHOST *host, char *chart_name, int create); +int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create); + +void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); + +void ng_aclk_host_state_update(RRDHOST *host, int cmd); + +void aclk_send_node_instances(void); -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); -struct label *add_aclk_host_labels(struct label *label); +char *ng_aclk_state(void); +char *ng_aclk_state_json(void); #endif /* ACLK_H */ diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c new file mode 100644 index 000000000..7df51a7b5 --- /dev/null +++ b/aclk/aclk_alarm_api.c @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_alarm_api.h" + +#include "aclk_query_queue.h" + +#include "aclk_util.h" + +#include "aclk.h" + +void aclk_send_alarm_log_health(struct alarm_log_health *log_health) +{ + aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH); + query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH; + query->data.bin_payload.msg_name = "AlarmLogHealth"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) +{ + size_t payload_size; + char *payload = generate_alarm_log_entry(&payload_size, log_entry); + + aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); +} + +void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) +{ + aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG); + query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG; + query->data.bin_payload.msg_name = "ProvideAlarmConfiguration"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot) +{ + aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT); + query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT; + query->data.bin_payload.msg_name = "AlarmSnapshot"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_alarm_api.h b/aclk/aclk_alarm_api.h new file mode 100644 index 000000000..e3fa92b5b --- /dev/null +++ b/aclk/aclk_alarm_api.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_ALARM_API_H +#define ACLK_ALARM_API_H + +#include "../daemon/common.h" +#include "schema-wrappers/schema_wrappers.h" + +void aclk_send_alarm_log_health(struct alarm_log_health *log_health); +void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry); +void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg); +void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot); + +#endif /* ACLK_ALARM_API_H */ diff --git a/aclk/aclk_api.c b/aclk/aclk_api.c new file mode 100644 index 000000000..251f5b708 --- /dev/null +++ b/aclk/aclk_api.c @@ -0,0 +1,228 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "libnetdata/libnetdata.h" +#include "database/rrd.h" + +#ifdef ACLK_NG +#include "aclk.h" +#endif +#ifdef ACLK_LEGACY +#include "legacy/agent_cloud_link.h" +#endif + +int aclk_connected = 0; +int aclk_kill_link = 0; + +usec_t aclk_session_us = 0; +time_t aclk_session_sec = 0; + +int aclk_disable_runtime = 0; +int aclk_disable_single_updates = 0; + +int aclk_stats_enabled; + +#ifdef ACLK_NG +int aclk_ng = 1; +#else +int aclk_ng = 0; +#endif + +#define ACLK_IMPL_KEY_NAME "aclk implementation" + +#ifdef ENABLE_ACLK +void *aclk_starter(void *ptr) { + char *aclk_impl_req = config_get(CONFIG_SECTION_CLOUD, ACLK_IMPL_KEY_NAME, "ng"); + + if (!strcasecmp(aclk_impl_req, "ng")) { + aclk_ng = 1; + } else if (!strcasecmp(aclk_impl_req, "legacy")) { + aclk_ng = 0; + } else { + error("Unknown value \"%s\" of key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\". Trying default ACLK %s.", aclk_impl_req, aclk_ng ? "NG" : "Legacy"); + } + +#ifndef ACLK_NG + if (aclk_ng) { + error("Configuration requests ACLK-NG but it is not available in this agent. Switching to Legacy."); + aclk_ng = 0; + } +#endif + +#ifndef ACLK_LEGACY + if (!aclk_ng) { + error("Configuration requests ACLK Legacy but it is not available in this agent. Switching to NG."); + aclk_ng = 1; + } +#endif + +#ifdef ACLK_NG + if (aclk_ng) { + info("Starting ACLK-NG"); + return aclk_main(ptr); + } +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) { + info("Starting ACLK Legacy"); + return legacy_aclk_main(ptr); + } +#endif + error_report("No ACLK could be started"); + return NULL; +} + +void aclk_single_update_disable() +{ + aclk_disable_single_updates = 1; +} + +void aclk_single_update_enable() +{ + aclk_disable_single_updates = 0; +} + +void aclk_alarm_reload(void) +{ +#ifdef ACLK_NG + if (aclk_ng) + ng_aclk_alarm_reload(); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + legacy_aclk_alarm_reload(); +#endif +} + +int aclk_update_chart(RRDHOST *host, char *chart_name, int create) +{ +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_update_chart(host, chart_name, create); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_update_chart(host, chart_name, create); +#endif + error_report("No usable aclk_update_chart implementation"); + return 1; +} + +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +{ +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_update_alarm(host, ae); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_update_alarm(host, ae); +#endif + error_report("No usable aclk_update_alarm implementation"); + return 1; +} + +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_add_collector(host, plugin_name, module_name); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_add_collector(host, plugin_name, module_name); +#endif + error_report("No usable aclk_add_collector implementation"); +} + +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_del_collector(host, plugin_name, module_name); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_del_collector(host, plugin_name, module_name); +#endif + error_report("No usable aclk_del_collector implementation"); +} + +void aclk_host_state_update(RRDHOST *host, int connect) +{ +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_host_state_update(host, connect); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_host_state_update(host, connect); +#endif + error_report("Couldn't use any version of aclk_host_state_update"); +} + +#endif /* ENABLE_ACLK */ + +struct label *add_aclk_host_labels(struct label *label) { +#ifdef ACLK_NG + label = add_label_to_list(label, "_aclk_ng_available", "true", LABEL_SOURCE_AUTO); +#else + label = add_label_to_list(label, "_aclk_ng_available", "false", LABEL_SOURCE_AUTO); +#endif +#ifdef ACLK_LEGACY + label = add_label_to_list(label, "_aclk_legacy_available", "true", LABEL_SOURCE_AUTO); +#else + label = add_label_to_list(label, "_aclk_legacy_available", "false", LABEL_SOURCE_AUTO); +#endif +#ifdef ENABLE_ACLK + ACLK_PROXY_TYPE aclk_proxy; + char *proxy_str; + aclk_get_proxy(&aclk_proxy); + + switch(aclk_proxy) { + case PROXY_TYPE_SOCKS5: + proxy_str = "SOCKS5"; + break; + case PROXY_TYPE_HTTP: + proxy_str = "HTTP"; + break; + default: + proxy_str = "none"; + break; + } + + label = add_label_to_list(label, "_aclk_impl", aclk_ng ? "Next Generation" : "Legacy", LABEL_SOURCE_AUTO); + label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); +#endif + return label; +} + +char *aclk_state(void) { +#ifndef ENABLE_ACLK + return strdupz("ACLK Available: No"); +#else +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_state(); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_state(); +#endif +#endif /* ENABLE_ACLK */ + return NULL; +} + +char *aclk_state_json(void) { +#ifndef ENABLE_ACLK + return strdupz("{\"aclk-available\": false}"); +#else +#ifdef ACLK_NG + if (aclk_ng) + return ng_aclk_state_json(); +#endif +#ifdef ACLK_LEGACY + if (!aclk_ng) + return legacy_aclk_state_json(); +#endif +#endif /* ENABLE_ACLK */ + return NULL; +} diff --git a/aclk/aclk_api.h b/aclk/aclk_api.h new file mode 100644 index 000000000..9958b0e11 --- /dev/null +++ b/aclk/aclk_api.h @@ -0,0 +1,56 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_API_H +#define ACLK_API_H + +#include "libnetdata/libnetdata.h" + +#include "aclk_proxy.h" + +// TODO get rid global vars as soon as +// ACLK Legacy is removed +extern int aclk_connected; +extern int aclk_kill_link; + +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; + +extern int aclk_disable_runtime; +extern int aclk_disable_single_updates; + +extern int aclk_stats_enabled; +extern int aclk_alert_reloaded; + +extern int aclk_ng; + +#ifdef ENABLE_ACLK +void *aclk_starter(void *ptr); + +void aclk_single_update_disable(); +void aclk_single_update_enable(); + +void aclk_alarm_reload(void); + +int aclk_update_chart(RRDHOST *host, char *chart_name, int create); +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); + +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); + +void aclk_host_state_update(RRDHOST *host, int connect); + +#define NETDATA_ACLK_HOOK \ + { .name = "ACLK_Main", \ + .config_section = NULL, \ + .config_name = NULL, \ + .enabled = 1, \ + .thread = NULL, \ + .init_routine = NULL, \ + .start_routine = aclk_starter }, + +#endif + +struct label *add_aclk_host_labels(struct label *label); +char *aclk_state(void); +char *aclk_state_json(void); + +#endif /* ACLK_API_H */ diff --git a/aclk/aclk_charts_api.c b/aclk/aclk_charts_api.c new file mode 100644 index 000000000..4e1c466e8 --- /dev/null +++ b/aclk/aclk_charts_api.c @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "aclk_charts_api.h" + +#include "aclk_query_queue.h" + +#define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated" + +void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) +{ + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) +{ + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; + query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) +{ + aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS; + query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id); + query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size) +{ + aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED; + query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size); + query->data.bin_payload.msg_name = "ChartConfigsUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_chart_reset(chart_reset_t reset) +{ + aclk_query_t query = aclk_query_new(CHART_RESET); + query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET; + query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset); + query->data.bin_payload.msg_name = "ResetChartMessages"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_retention_updated(struct retention_updated *data) +{ + aclk_query_t query = aclk_query_new(RETENTION_UPDATED); + query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED; + query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data); + query->data.bin_payload.msg_name = "RetentionUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_update_node_info(struct update_node_info *info) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; + query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); + query->data.bin_payload.msg_name = "UpdateNodeInfo"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_charts_api.h b/aclk/aclk_charts_api.h new file mode 100644 index 000000000..305fe4f74 --- /dev/null +++ b/aclk/aclk_charts_api.h @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_CHARTS_H +#define ACLK_CHARTS_H + +#include "../daemon/common.h" +#include "schema-wrappers/schema_wrappers.h" + +void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); +void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); +void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id); + +void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size); + +void aclk_chart_reset(chart_reset_t reset); + +void aclk_retention_updated(struct retention_updated *data); + +void aclk_update_node_info(struct update_node_info *info); + +#endif /* ACLK_CHARTS_H */ diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c index a251a23a8..2920c9a5c 100644 --- a/aclk/aclk_collector_list.c +++ b/aclk/aclk_collector_list.c @@ -1,5 +1,5 @@ // SPDX-License-Identifier: GPL-3.0-or-later -// This is copied from Legacy ACLK, Original Autor: amoss +// This is copied from Legacy ACLK, Original Author: amoss // TODO unmess this diff --git a/aclk/aclk_collector_list.h b/aclk/aclk_collector_list.h index 98d30ba94..09c06b14a 100644 --- a/aclk/aclk_collector_list.h +++ b/aclk/aclk_collector_list.h @@ -1,5 +1,5 @@ // SPDX-License-Identifier: GPL-3.0-or-later -// This is copied from Legacy ACLK, Original Autor: amoss +// This is copied from Legacy ACLK, Original Author: amoss // TODO unmess this @@ -31,6 +31,8 @@ struct _collector { struct _collector *next; }; +extern struct _collector *collector_list; + struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name); struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name); void _reset_collector_list(); diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c index 411a5f891..658e04f9b 100644 --- a/aclk/aclk_otp.c +++ b/aclk/aclk_otp.c @@ -2,16 +2,12 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "aclk_otp.h" +#include "aclk_util.h" +#include "aclk.h" -#include "../daemon/common.h" +#include "daemon/common.h" -#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h" - -// CentOS 7 has older version that doesn't define this -// same goes for MacOS -#ifndef UUID_STR_LEN -#define UUID_STR_LEN 37 -#endif +#include "mqtt_websockets/c-rbuf/include/ringbuffer.h" struct dictionary_singleton { char *key; @@ -213,7 +209,7 @@ static int parse_passwd_response(const char *json_str, struct auth_data *auth) { json = json_tokener_parse(json_str); if (!json) { - error("JSON-C failed to parse the payload of http respons of /env endpoint"); + error("JSON-C failed to parse the payload of http response of /env endpoint"); return 1; } @@ -363,7 +359,7 @@ static int aclk_parse_otp_error(const char *json_str) { json = json_tokener_parse(json_str); if (!json) { - error("JSON-C failed to parse the payload of http respons of /env endpoint"); + error("JSON-C failed to parse the payload of http response of /env endpoint"); return 1; } @@ -734,7 +730,7 @@ static int parse_json_env(const char *json_str, aclk_env_t *env) { json = json_tokener_parse(json_str); if (!json) { - error("JSON-C failed to parse the payload of http respons of /env endpoint"); + error("JSON-C failed to parse the payload of http response of /env endpoint"); return 1; } @@ -846,7 +842,11 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { return 1; } - buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json$claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json,proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); +#else + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); +#endif freez(agent_id); req.host = (char*)aclk_hostname; diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h index d2044f6fd..1ca9245c2 100644 --- a/aclk/aclk_otp.h +++ b/aclk/aclk_otp.h @@ -3,9 +3,10 @@ #ifndef ACLK_OTP_H #define ACLK_OTP_H -#include "../daemon/common.h" +#include "daemon/common.h" #include "https_client.h" +#include "aclk_util.h" int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target); int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port); diff --git a/aclk/aclk_proxy.c b/aclk/aclk_proxy.c new file mode 100644 index 000000000..1701eb8e8 --- /dev/null +++ b/aclk/aclk_proxy.c @@ -0,0 +1,186 @@ +#include "aclk_proxy.h" + +#include "daemon/common.h" + +#define ACLK_PROXY_ENV "env" +#define ACLK_PROXY_CONFIG_VAR "proxy" + +struct { + ACLK_PROXY_TYPE type; + const char *url_str; +} supported_proxy_types[] = { + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, +}; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) +{ + switch (*type) { + case PROXY_DISABLED: + return "disabled"; + case PROXY_TYPE_HTTP: + return "HTTP"; + case PROXY_TYPE_SOCKS5: + return "SOCKS"; + default: + return "Unknown"; + } +} + +static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) +{ + int i = 0; + while (supported_proxy_types[i].url_str) { + if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) + return supported_proxy_types[i].type; + i++; + } + return PROXY_TYPE_UNKNOWN; +} + +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) +{ + if (!string) + return PROXY_TYPE_UNKNOWN; + + while (*string == 0x20) + string++; + + if (!*string) + return PROXY_TYPE_UNKNOWN; + + return aclk_find_proxy(string); +} + +// helper function to censor user&password +// for logging purposes +void safe_log_proxy_censor(char *proxy) +{ + size_t length = strlen(proxy); + char *auth = proxy + length - 1; + char *cur; + + while ((auth >= proxy) && (*auth != '@')) + auth--; + + //if not found or @ is first char do nothing + if (auth <= proxy) + return; + + cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + if (!cur) + cur = proxy; + else + cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + while (cur < auth) { + *cur = 'X'; + cur++; + } +} + +static inline void safe_log_proxy_error(char *str, const char *proxy) +{ + char *log = strdupz(proxy); + safe_log_proxy_censor(log); + error("%s Provided Value:\"%s\"", str, log); + freez(log); +} + +static inline int check_socks_enviroment(const char **proxy) +{ + char *tmp = getenv("socks_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +static inline int check_http_enviroment(const char **proxy) +{ + char *tmp = getenv("http_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) +{ + const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); + *type = PROXY_DISABLED; + + if (strcmp(proxy, "none") == 0) + return proxy; + + if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { + if (check_socks_enviroment(&proxy) == 0) { +#ifdef LWS_WITH_SOCKS5 + *type = PROXY_TYPE_SOCKS5; + return proxy; +#else + safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " + "but Libwebsockets used doesn't have SOCKS5 support built in. " + "Ignoring and checking for other options.", + proxy); +#endif + } + if (check_http_enviroment(&proxy) == 0) + *type = PROXY_TYPE_HTTP; + return proxy; + } + + *type = aclk_verify_proxy(proxy); +#ifndef LWS_WITH_SOCKS5 + if (*type == PROXY_TYPE_SOCKS5) { + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", + proxy); + } +#endif + if (*type == PROXY_TYPE_UNKNOWN) { + *type = PROXY_DISABLED; + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + proxy); + } + + return proxy; +} + +// helper function to read settings only once (static) +// as claiming, challenge/response and ACLK +// read the same thing, no need to parse again +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) +{ + static const char *proxy = NULL; + static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; + + if (proxy_type == PROXY_NOT_SET) + proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); + + *type = proxy_type; + return proxy; +} diff --git a/aclk/aclk_proxy.h b/aclk/aclk_proxy.h new file mode 100644 index 000000000..b4ceb7df8 --- /dev/null +++ b/aclk/aclk_proxy.h @@ -0,0 +1,22 @@ +#ifndef ACLK_PROXY_H +#define ACLK_PROXY_H + +#include <config.h> + +#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" + +typedef enum aclk_proxy_type { + PROXY_TYPE_UNKNOWN = 0, + PROXY_TYPE_SOCKS5, + PROXY_TYPE_HTTP, + PROXY_DISABLED, + PROXY_NOT_SET, +} ACLK_PROXY_TYPE; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); +void safe_log_proxy_censor(char *proxy); +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); + +#endif /* ACLK_PROXY_H */ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 3e2f88e46..001c1ba02 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -17,20 +17,20 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; typedef struct aclk_query_handler { aclk_query_type_t type; char *name; // for logging purposes - int(*fnc)(mqtt_wss_client client, aclk_query_t query); + int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query); } aclk_query_handler; -static int info_metadata(mqtt_wss_client client, aclk_query_t query) +static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_send_info_metadata(client, + aclk_send_info_metadata(query_thr->client, !query->data.metadata_info.initial_on_connect, query->data.metadata_info.host); return 0; } -static int alarms_metadata(mqtt_wss_client client, aclk_query_t query) +static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_send_alarm_metadata(client, + aclk_send_alarm_metadata(query_thr->client, !query->data.metadata_info.initial_on_connect); return 0; } @@ -55,11 +55,34 @@ static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char return t; } -static int http_api_v2(mqtt_wss_client client, aclk_query_t query) +static RRDHOST *node_id_2_rrdhost(const char *node_id) +{ + int res; + uuid_t node_id_bin, host_id_bin; + char host_id[UUID_STR_LEN]; + if (uuid_parse(node_id, node_id_bin)) { + error("Couldn't parse UUID %s", node_id); + return NULL; + } + if ((res = get_host_id(&node_id_bin, &host_id_bin))) { + error("node not found rc=%d", res); + return NULL; + } + uuid_unparse_lower(host_id_bin, host_id); + return rrdhost_find_by_guid(host_id, 0); +} + +#define NODE_ID_QUERY "/node/" +// TODO this function should be quarantied and written nicely +// lots of skeletons from initial ACLK Legacy impl. +// quick and dirty from the start +static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) { int retval = 0; usec_t t; BUFFER *local_buffer = NULL; + BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE); + RRDHOST *query_host = localhost; #ifdef NETDATA_WITH_ZLIB int z_ret; @@ -76,6 +99,26 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) w->cookie2[0] = 0; // Simulate web_client_create_on_fd() w->acl = 0x1f; + if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { + char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); + char nodeid[UUID_STR_LEN]; + if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { + error("URL requests node_id but there is not enough chars following"); + retval = 1; + goto cleanup; + } + strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1); + + query_host = node_id_2_rrdhost(nodeid); + if (!query_host) { + error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid); + retval = 1; + goto cleanup; + } + } + + buffer_strcat(log_buffer, query->data.http_api_v2.query); + char *mysep = strchr(query->data.http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); @@ -85,8 +128,19 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) mysep = strrchr(query->data.http_api_v2.query, '/'); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + int stat_idx = aclk_cloud_req_http_type_to_idx(mysep ? mysep + 1 : "other"); + aclk_metrics_per_sample.cloud_req_http_by_type[stat_idx]++; + ACLK_STATS_UNLOCK; + } + // execute the query - t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop"); + w->tv_in = query->created_tv; + now_realtime_timeval(&w->tv_ready); + t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); + size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; + size_t sent = size; #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -128,14 +182,13 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) z_buffer->len += bytes_to_cpy; } while(z_ret != Z_STREAM_END); // so that web_client_build_http_header - // puts correct content lenght into header + // puts correct content length into header buffer_free(w->response.data); w->response.data = z_buffer; z_buffer = NULL; } #endif - now_realtime_timeval(&w->tv_ready); w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -149,6 +202,7 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) buffer_need_bytes(local_buffer, w->response.data->len); memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); local_buffer->len += w->response.data->len; + sent = sent - size + w->response.data->len; } else { #endif buffer_strcat(local_buffer, w->response.data->buffer); @@ -157,7 +211,26 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query) #endif } - aclk_http_msg_v2(client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); + // send msg. + aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); + + // log. + struct timeval tv; + now_realtime_timeval(&tv); + log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + w->id + , gettid() + , query_thr->idx + , "DATA" + , sent + , size + , size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0) + , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 + , dt_usec(&tv, &w->tv_ready) / 1000.0 + , dt_usec(&tv, &w->tv_in) / 1000.0 + , w->response.code + , strip_control_characters((char *)buffer_tostring(log_buffer)) + ); cleanup: #ifdef NETDATA_WITH_ZLIB @@ -170,45 +243,83 @@ cleanup: buffer_free(w->response.header_output); freez(w); buffer_free(local_buffer); + buffer_free(log_buffer); return retval; } -static int chart_query(mqtt_wss_client client, aclk_query_t query) +static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_chart_msg(client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); + aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); return 0; } -static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query) +static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query) { - aclk_alarm_state_msg(client, query->data.alarm_update); + aclk_alarm_state_msg(query_thr->client, query->data.alarm_update); // aclk_alarm_state_msg frees the json object including the header it generates query->data.alarm_update = NULL; return 0; } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static int register_node(struct aclk_query_thread *query_thr, aclk_query_t query) { + // TODO create a pending registrations list + // with some timeouts to detect registration requests that + // go unanswered from the cloud + aclk_generate_node_registration(query_thr->client, &query->data.node_creation); + return 0; +} + +static int node_state_update(struct aclk_query_thread *query_thr, aclk_query_t query) { + // TODO create a pending registrations list + // with some timeouts to detect registration requests that + // go unanswered from the cloud + aclk_generate_node_state_update(query_thr->client, &query->data.node_update); + return 0; +} + +static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) +{ + // this will be simplified when legacy support is removed + aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); + return 0; +} +#endif + aclk_query_handler aclk_query_handlers[] = { - { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, - { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, - { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata }, - { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, - { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, - { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, - { .type = UNKNOWN, .name = NULL, .fnc = NULL } + { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, + { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, + { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata }, + { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, + { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, + { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + { .type = REGISTER_NODE, .name = "register node", .fnc = register_node }, + { .type = NODE_STATE_UPDATE, .name = "node state update", .fnc = node_state_update }, + { .type = CHART_DIMS_UPDATE, .name = "chart and dim update bin", .fnc = send_bin_msg }, + { .type = CHART_CONFIG_UPDATED, .name = "chart config updated", .fnc = send_bin_msg }, + { .type = CHART_RESET, .name = "reset chart messages", .fnc = send_bin_msg }, + { .type = RETENTION_UPDATED, .name = "update retention info", .fnc = send_bin_msg }, + { .type = UPDATE_NODE_INFO, .name = "update node info", .fnc = send_bin_msg }, + { .type = ALARM_LOG_HEALTH, .name = "alarm log health", .fnc = send_bin_msg }, + { .type = ALARM_PROVIDE_CFG, .name = "provide alarm config", .fnc = send_bin_msg }, + { .type = ALARM_SNAPSHOT, .name = "alarm snapshot", .fnc = send_bin_msg }, +#endif + { .type = UNKNOWN, .name = NULL, .fnc = NULL } }; -static void aclk_query_process_msg(struct aclk_query_thread *info, aclk_query_t query) +static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { if (aclk_query_handlers[i].type == query->type) { debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); - aclk_query_handlers[i].fnc(info->client, query); + aclk_query_handlers[i].fnc(query_thr, query); aclk_query_free(query); if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[info->idx]++; + aclk_queries_per_thread[query_thr->idx]++; ACLK_STATS_UNLOCK; } return; @@ -219,11 +330,11 @@ static void aclk_query_process_msg(struct aclk_query_thread *info, aclk_query_t /* Processes messages from queue. Compete for work with other threads */ -int aclk_query_process_msgs(struct aclk_query_thread *info) +int aclk_query_process_msgs(struct aclk_query_thread *query_thr) { aclk_query_t query; while ((query = aclk_queue_pop())) - aclk_query_process_msg(info, query); + aclk_query_process_msg(query_thr, query); return 0; } @@ -233,21 +344,20 @@ int aclk_query_process_msgs(struct aclk_query_thread *info) */ void *aclk_query_main_thread(void *ptr) { - struct aclk_query_thread *info = ptr; + struct aclk_query_thread *query_thr = ptr; + while (!netdata_exit) { - aclk_query_process_msgs(info); + aclk_query_process_msgs(query_thr); QUERY_THREAD_LOCK; - if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); - QUERY_THREAD_UNLOCK; } return NULL; } -#define TASK_LEN_MAX 16 +#define TASK_LEN_MAX 22 void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client) { info("Starting %d query threads.", query_threads->count); @@ -257,7 +367,7 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss for (int i = 0; i < query_threads->count; i++) { query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics - if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) + if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) error("snprintf encoding error"); netdata_thread_create( &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index c9461b233..18b4783ee 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -20,7 +20,9 @@ static struct aclk_query_queue { static inline int _aclk_queue_query(aclk_query_t query) { + now_realtime_timeval(&query->created_tv); query->created = now_realtime_usec(); + ACLK_QUEUE_LOCK; if (aclk_query_queue.block_push) { ACLK_QUEUE_UNLOCK; @@ -43,14 +45,49 @@ static inline int _aclk_queue_query(aclk_query_t query) } +// Gets a pointer to the metric associated with a particular query type. +// NULL if the query type has no associated metric. +static inline volatile uint32_t *aclk_stats_qmetric_for_qtype(aclk_query_type_t qtype) { + switch (qtype) { + case HTTP_API_V2: + return &aclk_metrics_per_sample.cloud_req_type_http; + case ALARM_STATE_UPDATE: + return &aclk_metrics_per_sample.cloud_req_type_alarm_upd; + case METADATA_INFO: + return &aclk_metrics_per_sample.cloud_req_type_metadata_info; + case METADATA_ALARMS: + return &aclk_metrics_per_sample.cloud_req_type_metadata_alarms; + case CHART_NEW: + return &aclk_metrics_per_sample.cloud_req_type_chart_new; + case CHART_DEL: + return &aclk_metrics_per_sample.cloud_req_type_chart_del; + case REGISTER_NODE: + return &aclk_metrics_per_sample.cloud_req_type_register_node; + case NODE_STATE_UPDATE: + return &aclk_metrics_per_sample.cloud_req_type_node_upd; + default: + return NULL; + } +} + int aclk_queue_query(aclk_query_t query) { int ret = _aclk_queue_query(query); if (!ret) { + // local cache of query type before we wake up query thread, which may + // free the query in a race. + aclk_query_type_t qtype = query->type; QUERY_THREAD_WAKEUP; + if (aclk_stats_enabled) { + // get target query type metric before lock so we keep lock for + // minimal time. + volatile uint32_t *metric = aclk_stats_qmetric_for_qtype(qtype); + ACLK_STATS_LOCK; aclk_metrics_per_sample.queries_queued++; + if (metric) + *metric += 1; ACLK_STATS_UNLOCK; } } @@ -102,17 +139,47 @@ aclk_query_t aclk_query_new(aclk_query_type_t type) void aclk_query_free(aclk_query_t query) { - if (query->type == HTTP_API_V2) { + switch (query->type) { + case HTTP_API_V2: freez(query->data.http_api_v2.payload); if (query->data.http_api_v2.query != query->dedup_id) freez(query->data.http_api_v2.query); - } + break; - if (query->type == CHART_NEW) + case CHART_NEW: freez(query->data.chart_add_del.chart_name); + break; - if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update) - json_object_put(query->data.alarm_update); + case ALARM_STATE_UPDATE: + if (query->data.alarm_update) + json_object_put(query->data.alarm_update); + break; + + case NODE_STATE_UPDATE: + freez((void*)query->data.node_update.claim_id); + freez((void*)query->data.node_update.node_id); + break; + + case REGISTER_NODE: + freez((void*)query->data.node_creation.claim_id); + freez((void*)query->data.node_creation.hostname); + freez((void*)query->data.node_creation.machine_guid); + break; + + case CHART_DIMS_UPDATE: + case CHART_CONFIG_UPDATED: + case CHART_RESET: + case RETENTION_UPDATED: + case UPDATE_NODE_INFO: + case ALARM_LOG_HEALTH: + case ALARM_PROVIDE_CFG: + case ALARM_SNAPSHOT: + freez(query->data.bin_payload.payload); + break; + + default: + break; + } freez(query->dedup_id); freez(query->callback_topic); @@ -126,3 +193,10 @@ void aclk_queue_lock(void) aclk_query_queue.block_push = 1; ACLK_QUEUE_UNLOCK; } + +void aclk_queue_unlock(void) +{ + ACLK_QUEUE_LOCK; + aclk_query_queue.block_push = 0; + ACLK_QUEUE_UNLOCK; +} diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index c46513567..db6354433 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -4,7 +4,10 @@ #define NETDATA_ACLK_QUERY_QUEUE_H #include "libnetdata/libnetdata.h" -#include "../daemon/common.h" +#include "daemon/common.h" +#include "schema-wrappers/schema_wrappers.h" + +#include "aclk_util.h" typedef enum { UNKNOWN, @@ -13,7 +16,17 @@ typedef enum { HTTP_API_V2, CHART_NEW, CHART_DEL, - ALARM_STATE_UPDATE + ALARM_STATE_UPDATE, + REGISTER_NODE, + NODE_STATE_UPDATE, + CHART_DIMS_UPDATE, + CHART_CONFIG_UPDATED, + CHART_RESET, + RETENTION_UPDATED, + UPDATE_NODE_INFO, + ALARM_LOG_HEALTH, + ALARM_PROVIDE_CFG, + ALARM_SNAPSHOT } aclk_query_type_t; struct aclk_query_metadata { @@ -31,6 +44,13 @@ struct aclk_query_http_api_v2 { char *query; }; +struct aclk_bin_payload { + char *payload; + size_t size; + enum aclk_topics topic; + const char *msg_name; +}; + typedef struct aclk_query *aclk_query_t; struct aclk_query { aclk_query_type_t type; @@ -44,6 +64,7 @@ struct aclk_query { char *callback_topic; char *msg_id; + struct timeval created_tv; usec_t created; aclk_query_t next; @@ -55,6 +76,9 @@ struct aclk_query { struct aclk_query_metadata metadata_alarms; struct aclk_query_http_api_v2 http_api_v2; struct aclk_query_chart_add_del chart_add_del; + node_instance_creation_t node_creation; + node_instance_connection_t node_update; + struct aclk_bin_payload bin_payload; json_object *alarm_update; } data; }; @@ -67,5 +91,14 @@ aclk_query_t aclk_queue_pop(void); void aclk_queue_flush(void); void aclk_queue_lock(void); +void aclk_queue_unlock(void); + +#define QUEUE_IF_PAYLOAD_PRESENT(query) \ + if (likely(query->data.bin_payload.payload)) { \ + aclk_queue_query(query); \ + } else { \ + error("Failed to generate payload (%s)", __FUNCTION__); \ + aclk_query_free(query); \ + } #endif /* NETDATA_ACLK_QUERY_QUEUE_H */ diff --git a/aclk/legacy/aclk_rrdhost_state.h b/aclk/aclk_rrdhost_state.h index 7ab3a502e..73925b330 100644 --- a/aclk/legacy/aclk_rrdhost_state.h +++ b/aclk/aclk_rrdhost_state.h @@ -1,8 +1,9 @@ #ifndef ACLK_RRDHOST_STATE_H #define ACLK_RRDHOST_STATE_H -#include "../../libnetdata/libnetdata.h" +#include "libnetdata/libnetdata.h" +#ifdef ACLK_LEGACY typedef enum aclk_cmd { ACLK_CMD_CLOUD, ACLK_CMD_ONCONNECT, @@ -20,23 +21,24 @@ typedef enum aclk_metadata_state { ACLK_METADATA_CMD_QUEUED, ACLK_METADATA_SENT } ACLK_METADATA_STATE; +#endif typedef enum aclk_agent_state { ACLK_HOST_INITIALIZING, ACLK_HOST_STABLE -} ACLK_POPCORNING_STATE; +} ACLK_AGENT_STATE; typedef struct aclk_rrdhost_state { char *claimed_id; // Claimed ID if host has one otherwise NULL -#ifdef ENABLE_ACLK +#ifdef ACLK_LEGACY // per child popcorning - ACLK_POPCORNING_STATE state; + ACLK_AGENT_STATE state; ACLK_METADATA_STATE metadata; time_t timestamp_created; time_t t_last_popcorn_update; -#endif /* ENABLE_ACLK */ +#endif /* ACLK_LEGACY */ } aclk_rrdhost_state; #endif /* ACLK_RRDHOST_STATE_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 3d3ab5e2c..e7ce932ea 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -4,9 +4,12 @@ #include "aclk_stats.h" #include "aclk_query_queue.h" +#include "aclk.h" #define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" -#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/" +#define ACLK_CLOUD_REQ_V2_PREFIX "GET /" + +#define ACLK_V_COMPRESSION 2 struct aclk_request { char *type_id; @@ -18,7 +21,7 @@ struct aclk_request { int max_version; }; -int cloud_to_agent_parse(JSON_ENTRY *e) +static int cloud_to_agent_parse(JSON_ENTRY *e) { struct aclk_request *data = e->callback_data; @@ -88,6 +91,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur { const char *start, *end; + // TODO better check of URL if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { errno = 0; error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); @@ -108,7 +112,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur } #define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\ + if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\ debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ ACLK_SHARED_STATE_UNLOCK;\ return 1;\ @@ -117,7 +121,9 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) { - HTTP_CHECK_AGENT_INITIALIZED(); + if (!aclk_use_new_cloud_arch) { + HTTP_CHECK_AGENT_INITIALIZED(); + } aclk_query_t query; @@ -253,3 +259,182 @@ err_cleanup_nojson: return 1; } + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) +{ + // TODO do the look up table with hashes to optimize when there are more + // than few + if (!strcmp(message_type, "cmd")) { + // msg is binary payload in all other cases + // however in this message from old legacy cloud + // we have to convert it to C string + char *str = mallocz(msg_len+1); + memcpy(str, msg, msg_len); + str[msg_len] = 0; + aclk_handle_cloud_message(str); + freez(str); + return; + } + if (!strcmp(message_type, "CreateNodeInstanceResult")) { + node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); + if (!res.machine_guid || !res.node_id) { + error_report("Error parsing CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return; + } + + debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); + + uuid_t host_id, node_id; + if (uuid_parse(res.machine_guid, host_id)) { + error("Error parsing machine_guid provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return; + } + if (uuid_parse(res.node_id, node_id)) { + error("Error parsing node_id provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return; + } + update_node_id(&host_id, &node_id); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded + rrdhost_aclk_state_lock(localhost); + query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid, 0); + query->data.node_update.live = 0; + + if (host) { + // not all host must have RRDHOST struct created for them + // if they never connected during runtime of agent + if (host == localhost) { + query->data.node_update.live = 1; + query->data.node_update.hops = 0; + } else { + netdata_mutex_lock(&host->receiver_lock); + query->data.node_update.live = (host->receiver != NULL); + netdata_mutex_unlock(&host->receiver_lock); + query->data.node_update.hops = host->system_info->hops; + } + } + + query->data.node_update.node_id = res.node_id; // aclk_query_free will free it + query->data.node_update.queryable = 1; + query->data.node_update.session_id = aclk_session_newarch; + aclk_queue_query(query); + freez(res.machine_guid); + return; + } + if (!strcmp(message_type, "SendNodeInstances")) { + debug(D_ACLK, "Got SendNodeInstances"); + aclk_send_node_instances(); + return; + } + + if (!strcmp(message_type, "StreamChartsAndDimensions")) { + stream_charts_and_dims_t res = parse_stream_charts_and_dims(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); + freez(res.claim_id); + freez(res.node_id); + return; + } + chart_batch_id = res.batch_id; + aclk_start_streaming(res.node_id, res.seq_id, res.seq_id_created_at.tv_sec, res.batch_id); + freez(res.claim_id); + freez(res.node_id); + return; + } + if (!strcmp(message_type, "ChartsAndDimensionsAck")) { + chart_and_dim_ack_t res = parse_chart_and_dimensions_ack(msg, msg_len); + if (!res.claim_id || !res.node_id) { + error("Error parsing StreamChartsAndDimensions msg"); + freez(res.claim_id); + freez(res.node_id); + return; + } + aclk_ack_chart_sequence_id(res.node_id, res.last_seq_id); + freez(res.claim_id); + freez(res.node_id); + return; + } + if (!strcmp(message_type, "UpdateChartConfigs")) { + struct update_chart_config res = parse_update_chart_config(msg, msg_len); + if (!res.claim_id || !res.node_id || !res.hashes) + error("Error parsing UpdateChartConfigs msg"); + else + aclk_get_chart_config(res.hashes); + destroy_update_chart_config(&res); + return; + } + if (!strcmp(message_type, "StartAlarmStreaming")) { + struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); + if (!res.node_id || !res.batch_id) { + error("Error parsing StartAlarmStreaming"); + freez(res.node_id); + return; + } + aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + freez(res.node_id); + return; + } + if (!strcmp(message_type, "SendAlarmLogHealth")) { + char *node_id = parse_send_alarm_log_health(msg, msg_len); + if (!node_id) { + error("Error parsing SendAlarmLogHealth"); + return; + } + aclk_send_alarm_health_log(node_id); + freez(node_id); + return; + } + if (!strcmp(message_type, "SendAlarmConfiguration")) { + char *config_hash = parse_send_alarm_configuration(msg, msg_len); + if (!config_hash || !*config_hash) { + error("Error parsing SendAlarmConfiguration"); + freez(config_hash); + return; + } + aclk_send_alarm_configuration(config_hash); + freez(config_hash); + return; + } + if (!strcmp(message_type, "SendAlarmSnapshot")) { + struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); + if (!sas->node_id || !sas->claim_id) { + error("Error parsing SendAlarmSnapshot"); + destroy_send_alarm_snapshot(sas); + return; + } + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + destroy_send_alarm_snapshot(sas); + return; + } + if (!strcmp(message_type, "DisconnectReq")) { + struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); + if (!cmd) + return; + if (cmd->permaban) { + error ("Cloud Banned This Agent!"); + aclk_disable_runtime = 1; + } + info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); + if (cmd->reconnect_after_s > 0) { + aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; + info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s); + } + disconnect_req = 1; + freez(cmd->error_description); + freez(cmd); + return; + } + error ("Unknown new cloud arch message type received \"%s\"", message_type); +} +#endif diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index e24252bee..074dc004a 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -5,9 +5,13 @@ #ifndef ACLK_RX_MSGS_H #define ACLK_RX_MSGS_H -#include "../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" int aclk_handle_cloud_message(char *payload); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len); +#endif + #endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index a599cfda5..765c6a333 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -4,8 +4,6 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; -int aclk_stats_enabled; - int query_thread_count; // data ACLK stats need per query thread @@ -112,7 +110,87 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -#define MAX_DIM_NAME 16 +static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_type_http = NULL; + static RRDDIM *rd_type_alarm_upd = NULL; + static RRDDIM *rd_type_metadata_info = NULL; + static RRDDIM *rd_type_metadata_alarms = NULL; + static RRDDIM *rd_type_chart_new = NULL; + static RRDDIM *rd_type_chart_del = NULL; + static RRDDIM *rd_type_register_node = NULL; + static RRDDIM *rd_type_node_upd = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req_type", NULL, "aclk", NULL, "Requests received from cloud by their type", "req/s", + "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_type_http = rrddim_add(st, "http", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_type_alarm_upd = rrddim_add(st, "alarm update", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_type_metadata_info = rrddim_add(st, "info metadata", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_type_metadata_alarms = rrddim_add(st, "alarms metadata", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_type_chart_new = rrddim_add(st, "chart new", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_type_chart_del = rrddim_add(st, "chart delete", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_type_register_node = rrddim_add(st, "register node", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_type_node_upd = rrddim_add(st, "node update", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_type_http, per_sample->cloud_req_type_http); + rrddim_set_by_pointer(st, rd_type_alarm_upd, per_sample->cloud_req_type_alarm_upd); + rrddim_set_by_pointer(st, rd_type_metadata_info, per_sample->cloud_req_type_metadata_info); + rrddim_set_by_pointer(st, rd_type_metadata_alarms, per_sample->cloud_req_type_metadata_alarms); + rrddim_set_by_pointer(st, rd_type_chart_new, per_sample->cloud_req_type_chart_new); + rrddim_set_by_pointer(st, rd_type_chart_del, per_sample->cloud_req_type_chart_del); + rrddim_set_by_pointer(st, rd_type_register_node, per_sample->cloud_req_type_register_node); + rrddim_set_by_pointer(st, rd_type_node_upd, per_sample->cloud_req_type_node_upd); + + rrdset_done(st); +} + +static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = { + "other", + "info", + "data", + "alarms", + "alarm_log", + "chart", + "charts" + // if you change then update `ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT`. +}; + +int aclk_cloud_req_http_type_to_idx(const char *name) +{ + for (int i = 1; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) + if (!strcmp(cloud_req_http_type_names[i], name)) + return i; + return 0; +} + +static void aclk_stats_cloud_req_http_type(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req_http_type", NULL, "aclk", NULL, "Requests received from cloud via HTTP by their type", "req/s", + "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) + rd_rq_types[i] = rrddim_add(st, cloud_req_http_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) + rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_http_by_type[i]); + + rrdset_done(st); +} + +#define MAX_DIM_NAME 22 static void aclk_stats_query_threads(uint32_t *queries_per_thread) { static RRDSET *st = NULL; @@ -122,10 +200,10 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) if (unlikely(!st)) { st = rrdset_create_localhost( "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", - "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + "netdata", "stats", 200009, localhost->rrd_update_every, RRDSET_TYPE_STACKED); for (int i = 0; i < query_thread_count; i++) { - if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) + if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) error("snprintf encoding error"); aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } @@ -149,7 +227,7 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) if (unlikely(!st)) { st = rrdset_create_localhost( "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us", - "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE); + "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_LINE); rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); @@ -218,6 +296,9 @@ void *aclk_stats_main_thread(void *ptr) #endif aclk_stats_cloud_req(&per_sample); + aclk_stats_cloud_req_type(&per_sample); + aclk_stats_cloud_req_http_type(&per_sample); + aclk_stats_query_threads(aclk_queries_per_thread_sample); aclk_stats_query_time(&per_sample); diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index 33d016965..317a34ba4 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -3,7 +3,7 @@ #ifndef NETDATA_ACLK_STATS_H #define NETDATA_ACLK_STATS_H -#include "../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" #define ACLK_STATS_THREAD_NAME "ACLK_Stats" @@ -13,7 +13,10 @@ extern netdata_mutex_t aclk_stats_mutex; #define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex) #define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex) -extern int aclk_stats_enabled; +// if you change update `cloud_req_http_type_names`. +#define ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT 7 + +int aclk_cloud_req_http_type_to_idx(const char *name); struct aclk_stats_thread { netdata_thread_t *thread; @@ -45,6 +48,19 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_req_recvd; volatile uint32_t cloud_req_err; + // request types. + volatile uint32_t cloud_req_type_http; + volatile uint32_t cloud_req_type_alarm_upd; + volatile uint32_t cloud_req_type_metadata_info; + volatile uint32_t cloud_req_type_metadata_alarms; + volatile uint32_t cloud_req_type_chart_new; + volatile uint32_t cloud_req_type_chart_del; + volatile uint32_t cloud_req_type_register_node; + volatile uint32_t cloud_req_type_node_upd; + + // HTTP-specific request types. + volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT]; + volatile uint32_t cloud_q_process_total; volatile uint32_t cloud_q_process_count; volatile uint32_t cloud_q_process_max; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 144008e4d..237c1bdd2 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -1,14 +1,18 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "aclk_tx_msgs.h" -#include "../daemon/common.h" +#include "daemon/common.h" #include "aclk_util.h" #include "aclk_stats.h" +#include "aclk.h" #ifndef __GNUC__ #pragma region aclk_tx_msgs helper functions #endif +// version for aclk legacy (old cloud arch) +#define ACLK_VERSION 2 + static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) { uint16_t packet_id; @@ -16,7 +20,7 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, const char *topic = aclk_get_topic(subtopic); if (unlikely(!topic)) { - error("Couldn't get topic. Aborting mesage send"); + error("Couldn't get topic. Aborting message send"); return; } @@ -32,6 +36,37 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, #endif } +uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +{ +#ifndef ACLK_LOG_CONVERSATION_DIR + UNUSED(msgname); +#endif + uint16_t packet_id; + const char *topic = aclk_get_topic(subtopic); + + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting message send."); + return 0; + } + + mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 1024 + char filename[FN_MAX_LEN]; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname); + FILE *fptr; + if (fptr = fopen(filename,"w")) { + fwrite(msg, msg_len, 1, fptr); + fclose(fptr); + } +#endif + + return packet_id; +} + static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) { uint16_t packet_id; @@ -39,7 +74,7 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje const char *topic = aclk_get_topic(subtopic); if (unlikely(!topic)) { - error("Couldn't get topic. Aborting mesage send"); + error("Couldn't get topic. Aborting message send"); return 0; } @@ -368,6 +403,87 @@ int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) return pid; } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +// new protobuf msgs +uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { + size_t len; + uint16_t pid; + update_agent_connection_t conn = { + .reachable = (reachable ? 1 : 0), + .lwt = 0, + .session_id = aclk_session_newarch + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return 0; + } + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(&len, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) { + error("Error generating agent::v1::UpdateAgentConnection payload"); + return 0; + } + + pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); + freez(msg); + return pid; +} + +char *aclk_generate_lwt(size_t *size) { + update_agent_connection_t conn = { + .reachable = 0, + .lwt = 1, + .session_id = aclk_session_newarch + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return NULL; + } + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(size, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) + error("Error generating agent::v1::UpdateAgentConnection payload for LWT"); + + return msg; +} + +void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) { + size_t len; + char *msg = generate_node_instance_creation(&len, node_creation); + if (!msg) { + error("Error generating nodeinstance::create::v1::CreateNodeInstance"); + return; + } + + aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance"); + freez(msg); +} + +void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) { + size_t len; + char *msg = generate_node_instance_connection(&len, node_connection); + if (!msg) { + error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection"); + return; + } + + aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection"); + freez(msg); +} +#endif /* ENABLE_NEW_CLOUD_PROTOCOL */ + #ifndef __GNUC__ #pragma endregion #endif diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index 50c981696..da29a4a32 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -4,8 +4,12 @@ #include <json-c/json.h> #include "libnetdata/libnetdata.h" -#include "../daemon/common.h" +#include "daemon/common.h" #include "mqtt_wss_client.h" +#include "schema-wrappers/schema_wrappers.h" +#include "aclk_util.h" + +uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); @@ -19,4 +23,13 @@ void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg); json_object *aclk_generate_disconnect(const char *message); int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +// new protobuf msgs +uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable); +char *aclk_generate_lwt(size_t *size); + +void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation); +void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection); +#endif + #endif diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index b8ac66756..ee8fcaf94 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -2,15 +2,14 @@ #include "aclk_util.h" -#include <stdio.h> +#include "daemon/common.h" -#include "../daemon/common.h" +int aclk_use_new_cloud_arch = 0; +usec_t aclk_session_newarch = 0; -// CentOS 7 has older version that doesn't define this -// same goes for MacOS -#ifndef UUID_STR_LEN -#define UUID_STR_LEN 37 -#endif +aclk_env_t *aclk_env = NULL; + +int chart_batch_id; aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) { if (!strcmp(str, "json")) { @@ -54,6 +53,15 @@ void aclk_env_t_destroy(aclk_env_t *env) { } } +int aclk_env_has_capa(const char *capa) +{ + for (int i = 0; i < (int) aclk_env->capability_count; i++) { + if (!strcasecmp(capa, aclk_env->capabilities[i])) + return 1; + } + return 0; +} + #ifdef ACLK_LOG_CONVERSATION_DIR volatile int aclk_conversation_log_counter = 0; #if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS) @@ -109,18 +117,53 @@ struct topic_name { // in answer to /password endpoint const char *name; } topic_names[] = { - { .id = ACLK_TOPICID_CHART, .name = "chart" }, - { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, - { .id = ACLK_TOPICID_METADATA, .name = "meta" }, - { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, - { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } + { .id = ACLK_TOPICID_CHART, .name = "chart" }, + { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, + { .id = ACLK_TOPICID_METADATA, .name = "meta" }, + { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, + { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" }, + { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" }, + { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" }, + { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" }, + { .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" }, + { .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" }, + { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" }, + { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" }, + { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" }, + { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" }, + { .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" }, + { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, + { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" }, + { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } +}; + +enum aclk_topics compulsory_topics_legacy[] = { + ACLK_TOPICID_CHART, + ACLK_TOPICID_ALARMS, + ACLK_TOPICID_METADATA, + ACLK_TOPICID_COMMAND, + ACLK_TOPICID_UNKNOWN }; -enum aclk_topics compulsory_topics[] = { +enum aclk_topics compulsory_topics_new_cloud_arch[] = { +// TODO remove old topics once not needed anymore ACLK_TOPICID_CHART, ACLK_TOPICID_ALARMS, ACLK_TOPICID_METADATA, ACLK_TOPICID_COMMAND, + ACLK_TOPICID_AGENT_CONN, + ACLK_TOPICID_CMD_NG_V1, + ACLK_TOPICID_CREATE_NODE, + ACLK_TOPICID_NODE_CONN, + ACLK_TOPICID_CHART_DIMS, + ACLK_TOPICID_CHART_CONFIGS_UPDATED, + ACLK_TOPICID_CHART_RESET, + ACLK_TOPICID_RETENTION_UPDATED, + ACLK_TOPICID_NODE_INFO, + ACLK_TOPICID_ALARM_LOG, + ACLK_TOPICID_ALARM_HEALTH, + ACLK_TOPICID_ALARM_CONFIG, + ACLK_TOPICID_ALARM_SNAPSHOT, ACLK_TOPICID_UNKNOWN }; @@ -188,7 +231,7 @@ static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *to } topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it))); if (topic->topic_id == ACLK_TOPICID_UNKNOWN) { - info("topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); } json_object_iter_next(&it); continue; @@ -246,6 +289,8 @@ int aclk_generate_topic_cache(struct json_object *json) } } + enum aclk_topics *compulsory_topics = aclk_use_new_cloud_arch ? compulsory_topics_new_cloud_arch : compulsory_topics_legacy; + for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { if (!aclk_get_topic(compulsory_topics[i])) { error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); @@ -315,189 +360,6 @@ unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, un return delay; } -#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" -#define ACLK_PROXY_ENV "env" -#define ACLK_PROXY_CONFIG_VAR "proxy" - -struct { - ACLK_PROXY_TYPE type; - const char *url_str; -} supported_proxy_types[] = { - { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, -}; - -const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) -{ - switch (*type) { - case PROXY_DISABLED: - return "disabled"; - case PROXY_TYPE_HTTP: - return "HTTP"; - case PROXY_TYPE_SOCKS5: - return "SOCKS"; - default: - return "Unknown"; - } -} - -static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) -{ - int i = 0; - while (supported_proxy_types[i].url_str) { - if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) - return supported_proxy_types[i].type; - i++; - } - return PROXY_TYPE_UNKNOWN; -} - -ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) -{ - if (!string) - return PROXY_TYPE_UNKNOWN; - - while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove) - string++; - - if (!*string) - return PROXY_TYPE_UNKNOWN; - - return aclk_find_proxy(string); -} - -// helper function to censor user&password -// for logging purposes -void safe_log_proxy_censor(char *proxy) -{ - size_t length = strlen(proxy); - char *auth = proxy + length - 1; - char *cur; - - while ((auth >= proxy) && (*auth != '@')) - auth--; - - //if not found or @ is first char do nothing - if (auth <= proxy) - return; - - cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); - if (!cur) - cur = proxy; - else - cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); - - while (cur < auth) { - *cur = 'X'; - cur++; - } -} - -static inline void safe_log_proxy_error(char *str, const char *proxy) -{ - char *log = strdupz(proxy); - safe_log_proxy_censor(log); - error("%s Provided Value:\"%s\"", str, log); - freez(log); -} - -static inline int check_socks_enviroment(const char **proxy) -{ - char *tmp = getenv("socks_proxy"); - - if (!tmp) - return 1; - - if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { - *proxy = tmp; - return 0; - } - - safe_log_proxy_error( - "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", - tmp); - return 1; -} - -static inline int check_http_enviroment(const char **proxy) -{ - char *tmp = getenv("http_proxy"); - - if (!tmp) - return 1; - - if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { - *proxy = tmp; - return 0; - } - - safe_log_proxy_error( - "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", - tmp); - return 1; -} - -const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) -{ - const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); - *type = PROXY_DISABLED; - - if (strcmp(proxy, "none") == 0) - return proxy; - - if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { - if (check_socks_enviroment(&proxy) == 0) { -#ifdef LWS_WITH_SOCKS5 - *type = PROXY_TYPE_SOCKS5; - return proxy; -#else - safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " - "but Libwebsockets used doesn't have SOCKS5 support built in. " - "Ignoring and checking for other options.", - proxy); -#endif - } - if (check_http_enviroment(&proxy) == 0) - *type = PROXY_TYPE_HTTP; - return proxy; - } - - *type = aclk_verify_proxy(proxy); -#ifndef LWS_WITH_SOCKS5 - if (*type == PROXY_TYPE_SOCKS5) { - safe_log_proxy_error( - "Config var \"" ACLK_PROXY_CONFIG_VAR - "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", - proxy); - } -#endif - if (*type == PROXY_TYPE_UNKNOWN) { - *type = PROXY_DISABLED; - safe_log_proxy_error( - "Config var \"" ACLK_PROXY_CONFIG_VAR - "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", - proxy); - } - - return proxy; -} - -// helper function to read settings only once (static) -// as claiming, challenge/response and ACLK -// read the same thing, no need to parse again -const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) -{ - static const char *proxy = NULL; - static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; - - if (proxy_type == PROXY_NOT_SET) - proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); - - *type = proxy_type; - return proxy; -} #define HTTP_PROXY_PREFIX "http://" void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type) diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 03b22e40c..07de5c58a 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -8,6 +8,11 @@ // Helper stuff which should not have any further inside ACLK dependency // and are supposed not to be needed outside of ACLK +extern int aclk_use_new_cloud_arch; +extern usec_t aclk_session_newarch; + +extern int chart_batch_id; + typedef enum { ACLK_ENC_UNKNOWN = 0, ACLK_ENC_JSON, @@ -44,18 +49,34 @@ typedef struct { aclk_backoff_t backoff; } aclk_env_t; +extern aclk_env_t *aclk_env; + aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str); aclk_transport_type_t aclk_transport_type_t_from_str(const char *str); void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc); void aclk_env_t_destroy(aclk_env_t *env); +int aclk_env_has_capa(const char *capa); enum aclk_topics { - ACLK_TOPICID_UNKNOWN = 0, - ACLK_TOPICID_CHART = 1, - ACLK_TOPICID_ALARMS = 2, - ACLK_TOPICID_METADATA = 3, - ACLK_TOPICID_COMMAND = 4 + ACLK_TOPICID_UNKNOWN = 0, + ACLK_TOPICID_CHART = 1, + ACLK_TOPICID_ALARMS = 2, + ACLK_TOPICID_METADATA = 3, + ACLK_TOPICID_COMMAND = 4, + ACLK_TOPICID_AGENT_CONN = 5, + ACLK_TOPICID_CMD_NG_V1 = 6, + ACLK_TOPICID_CREATE_NODE = 7, + ACLK_TOPICID_NODE_CONN = 8, + ACLK_TOPICID_CHART_DIMS = 9, + ACLK_TOPICID_CHART_CONFIGS_UPDATED = 10, + ACLK_TOPICID_CHART_RESET = 11, + ACLK_TOPICID_RETENTION_UPDATED = 12, + ACLK_TOPICID_NODE_INFO = 13, + ACLK_TOPICID_ALARM_LOG = 14, + ACLK_TOPICID_ALARM_HEALTH = 15, + ACLK_TOPICID_ALARM_CONFIG = 16, + ACLK_TOPICID_ALARM_SNAPSHOT = 17 }; const char *aclk_get_topic(enum aclk_topics topic); @@ -78,20 +99,6 @@ int aclk_get_conv_log_next(); unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max); #define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0) -typedef enum aclk_proxy_type { - PROXY_TYPE_UNKNOWN = 0, - PROXY_TYPE_SOCKS5, - PROXY_TYPE_HTTP, - PROXY_DISABLED, - PROXY_NOT_SET, -} ACLK_PROXY_TYPE; - -const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); -ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); -const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); -void safe_log_proxy_censor(char *proxy); -const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); - void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type); #endif /* ACLK_UTIL_H */ diff --git a/aclk/https_client.c b/aclk/https_client.c index 907f512ba..470c3fdf3 100644 --- a/aclk/https_client.c +++ b/aclk/https_client.c @@ -4,7 +4,7 @@ #include "https_client.h" -#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h" +#include "mqtt_websockets/c-rbuf/include/ringbuffer.h" enum http_parse_state { HTTP_PARSE_INITIAL = 0, @@ -47,7 +47,7 @@ static inline void http_parse_ctx_clear(http_parse_ctx *ctx) { #define RESP_PROTO "HTTP/1.1 " #define HTTP_KEYVAL_SEPARATOR ": " #define HTTP_HDR_BUFFER_SIZE 256 -#define PORT_STR_MAX_BYTES 7 +#define PORT_STR_MAX_BYTES 12 static void process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const char *val) { @@ -303,7 +303,8 @@ static int read_parse_response(https_req_ctx_t *ctx) { error("Poll timed out"); return 2; } - continue; + if (!ctx->ssl_ctx) + continue; } ctx->poll_fd.events = 0; @@ -421,6 +422,35 @@ err_exit: return rc; } +static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) +{ + X509 *err_cert; + int err, depth; + char *err_str; + + if (!preverify_ok) { + err = X509_STORE_CTX_get_error(ctx); + depth = X509_STORE_CTX_get_error_depth(ctx); + err_cert = X509_STORE_CTX_get_current_cert(ctx); + err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0); + + error("Cert Chain verify error:num=%d:%s:depth=%d:%s", err, + X509_verify_cert_error_string(err), depth, err_str); + + free(err_str); + } + +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT) + { + preverify_ok = 1; + error("Self Signed Certificate Accepted as the agent was built with ACLK_SSL_ALLOW_SELF_SIGNED"); + } +#endif + + return preverify_ok; +} + int https_request(https_req_t *request, https_req_response_t *response) { int rc = 1, ret; char connect_port_str[PORT_STR_MAX_BYTES]; @@ -438,7 +468,7 @@ int https_request(https_req_t *request, https_req_response_t *response) { goto exit_req_ctx; } - snprintf(connect_port_str, PORT_STR_MAX_BYTES, "%d", connect_port); + snprintfz(connect_port_str, PORT_STR_MAX_BYTES, "%d", connect_port); ctx->sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, connect_host, 0, connect_port_str, &timeout); if (ctx->sock < 0) { @@ -480,6 +510,12 @@ int https_request(https_req_t *request, https_req_response_t *response) { goto exit_sock; } + if (!SSL_CTX_set_default_verify_paths(ctx->ssl_ctx)) { + error("Error setting default verify paths"); + goto exit_CTX; + } + SSL_CTX_set_verify(ctx->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback); + ctx->ssl = SSL_new(ctx->ssl_ctx); if (ctx->ssl==NULL) { error("Cannot allocate SSL"); @@ -570,7 +606,7 @@ static int parse_host_port(url_t *url) { error(URL_PARSER_LOG_PREFIX ": specified but no port number"); return 1; } - if (port_len > 5 /* MAX port lenght is 5digit long in decimal */) { + if (port_len > 5 /* MAX port length is 5digit long in decimal */) { error(URL_PARSER_LOG_PREFIX "port # is too long"); return 1; } diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c index 96f955451..7f8368e44 100644 --- a/aclk/legacy/aclk_common.c +++ b/aclk/legacy/aclk_common.c @@ -1,201 +1,18 @@ #include "aclk_common.h" -#include "../../daemon/common.h" +#include "daemon/common.h" #ifdef ENABLE_ACLK #include <libwebsockets.h> #endif -netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +netdata_mutex_t legacy_aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; -int aclk_disable_runtime = 0; -int aclk_kill_link = 0; - -struct aclk_shared_state aclk_shared_state = { +struct legacy_aclk_shared_state legacy_aclk_shared_state = { .version_neg = 0, .version_neg_wait_till = 0 }; -struct { - ACLK_PROXY_TYPE type; - const char *url_str; -} supported_proxy_types[] = { - { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, - { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, -}; - -const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) -{ - switch (*type) { - case PROXY_DISABLED: - return "disabled"; - case PROXY_TYPE_HTTP: - return "HTTP"; - case PROXY_TYPE_SOCKS5: - return "SOCKS"; - default: - return "Unknown"; - } -} - -static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) -{ - int i = 0; - while (supported_proxy_types[i].url_str) { - if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) - return supported_proxy_types[i].type; - i++; - } - return PROXY_TYPE_UNKNOWN; -} - -ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) -{ - if (!string) - return PROXY_TYPE_UNKNOWN; - - while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove) - string++; - - if (!*string) - return PROXY_TYPE_UNKNOWN; - - return aclk_find_proxy(string); -} - -// helper function to censor user&password -// for logging purposes -void safe_log_proxy_censor(char *proxy) -{ - size_t length = strlen(proxy); - char *auth = proxy + length - 1; - char *cur; - - while ((auth >= proxy) && (*auth != '@')) - auth--; - - //if not found or @ is first char do nothing - if (auth <= proxy) - return; - - cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); - if (!cur) - cur = proxy; - else - cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); - - while (cur < auth) { - *cur = 'X'; - cur++; - } -} - -static inline void safe_log_proxy_error(char *str, const char *proxy) -{ - char *log = strdupz(proxy); - safe_log_proxy_censor(log); - error("%s Provided Value:\"%s\"", str, log); - freez(log); -} - -static inline int check_socks_environment(const char **proxy) -{ - char *tmp = getenv("socks_proxy"); - - if (!tmp) - return 1; - - if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { - *proxy = tmp; - return 0; - } - - safe_log_proxy_error( - "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", - tmp); - return 1; -} - -static inline int check_http_environment(const char **proxy) -{ - char *tmp = getenv("http_proxy"); - - if (!tmp) - return 1; - - if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { - *proxy = tmp; - return 0; - } - - safe_log_proxy_error( - "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", - tmp); - return 1; -} - -const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) -{ - const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); - *type = PROXY_DISABLED; - - if (strcmp(proxy, "none") == 0) - return proxy; - - if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { - if (check_socks_environment(&proxy) == 0) { -#ifdef LWS_WITH_SOCKS5 - *type = PROXY_TYPE_SOCKS5; - return proxy; -#else - safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " - "but Libwebsockets used doesn't have SOCKS5 support built in. " - "Ignoring and checking for other options.", - proxy); -#endif - } - if (check_http_environment(&proxy) == 0) - *type = PROXY_TYPE_HTTP; - return proxy; - } - - *type = aclk_verify_proxy(proxy); -#ifndef LWS_WITH_SOCKS5 - if (*type == PROXY_TYPE_SOCKS5) { - safe_log_proxy_error( - "Config var \"" ACLK_PROXY_CONFIG_VAR - "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", - proxy); - } -#endif - if (*type == PROXY_TYPE_UNKNOWN) { - *type = PROXY_DISABLED; - safe_log_proxy_error( - "Config var \"" ACLK_PROXY_CONFIG_VAR - "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", - proxy); - } - - return proxy; -} - -// helper function to read settings only once (static) -// as claiming, challenge/response and ACLK -// read the same thing, no need to parse again -const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) -{ - static const char *proxy = NULL; - static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; - - if (proxy_type == PROXY_NOT_SET) - proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); - - *type = proxy_type; - return proxy; -} - int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) { int pos = 0; @@ -234,27 +51,3 @@ int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); return 0; } - -struct label *add_aclk_host_labels(struct label *label) { -#ifdef ENABLE_ACLK - ACLK_PROXY_TYPE aclk_proxy; - char *proxy_str; - aclk_get_proxy(&aclk_proxy); - - switch(aclk_proxy) { - case PROXY_TYPE_SOCKS5: - proxy_str = "SOCKS5"; - break; - case PROXY_TYPE_HTTP: - proxy_str = "HTTP"; - break; - default: - proxy_str = "none"; - break; - } - label = add_label_to_list(label, "_aclk_impl", "Legacy", LABEL_SOURCE_AUTO); - return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); -#else - return label; -#endif -} diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h index eedb5b51c..080680ff1 100644 --- a/aclk/legacy/aclk_common.h +++ b/aclk/legacy/aclk_common.h @@ -1,12 +1,12 @@ #ifndef ACLK_COMMON_H #define ACLK_COMMON_H -#include "aclk_rrdhost_state.h" -#include "../../daemon/common.h" +#include "../aclk_rrdhost_state.h" +#include "daemon/common.h" -extern netdata_mutex_t aclk_shared_state_mutex; -#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) -#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) +extern netdata_mutex_t legacy_aclk_shared_state_mutex; +#define legacy_aclk_shared_state_LOCK netdata_mutex_lock(&legacy_aclk_shared_state_mutex) +#define legacy_aclk_shared_state_UNLOCK netdata_mutex_unlock(&legacy_aclk_shared_state_mutex) // minimum and maximum supported version of ACLK // in this version of agent @@ -33,8 +33,8 @@ extern netdata_mutex_t aclk_shared_state_mutex; #define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING) #define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update) -extern struct aclk_shared_state { - // optimization to avoid looping trough hosts +extern struct legacy_aclk_shared_state { + // optimization to avoid looping through hosts // every time Query Thread wakes up RRDHOST *next_popcorn_host; @@ -42,31 +42,10 @@ extern struct aclk_shared_state { // protect by lock otherwise int version_neg; usec_t version_neg_wait_till; -} aclk_shared_state; - -typedef enum aclk_proxy_type { - PROXY_TYPE_UNKNOWN = 0, - PROXY_TYPE_SOCKS5, - PROXY_TYPE_HTTP, - PROXY_DISABLED, - PROXY_NOT_SET, -} ACLK_PROXY_TYPE; - -extern int aclk_kill_link; // Tells the agent to tear down the link -extern int aclk_disable_runtime; +} legacy_aclk_shared_state; const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); -#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" -#define ACLK_PROXY_ENV "env" -#define ACLK_PROXY_CONFIG_VAR "proxy" - -ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); -const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); -void safe_log_proxy_censor(char *proxy); int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); -const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); - -struct label *add_aclk_host_labels(struct label *label); #endif //ACLK_COMMON_H diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c index f41a230db..8a490c6f4 100644 --- a/aclk/legacy/aclk_lws_https_client.c +++ b/aclk/legacy/aclk_lws_https_client.c @@ -2,13 +2,7 @@ #define ACLK_LWS_HTTPS_CLIENT_INTERNAL #include "aclk_lws_https_client.h" - -#ifndef ACLK_NG #include "aclk_common.h" -#else -#include "../aclk.h" -#endif - #include "aclk_lws_wss_client.h" #define SMALL_BUFFER 16 diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h index 811809dd1..5f30a37fd 100644 --- a/aclk/legacy/aclk_lws_https_client.h +++ b/aclk/legacy/aclk_lws_https_client.h @@ -3,7 +3,7 @@ #ifndef NETDATA_LWS_HTTPS_CLIENT_H #define NETDATA_LWS_HTTPS_CLIENT_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" #define DATAMAXLEN 1024*16 diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c index f73902b30..012f2a8cc 100644 --- a/aclk/legacy/aclk_lws_wss_client.c +++ b/aclk/legacy/aclk_lws_wss_client.c @@ -3,9 +3,10 @@ #include "aclk_lws_wss_client.h" #include "libnetdata/libnetdata.h" -#include "../../daemon/common.h" +#include "daemon/common.h" #include "aclk_common.h" #include "aclk_stats.h" +#include "../aclk_proxy.h" extern int aclk_shutting_down; @@ -450,9 +451,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas if (n>=0) { data->written += n; if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.write_q_consumed += n; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.write_q_consumed += n; + LEGACY_ACLK_STATS_UNLOCK; } } //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc); @@ -473,9 +474,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas retval = 1; aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.read_q_added += len; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.read_q_added += len; + LEGACY_ACLK_STATS_UNLOCK; } // to future myself -> do not call this while read lock is active as it will eventually @@ -553,9 +554,9 @@ int aclk_lws_wss_client_write(void *buf, size_t count) aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.write_q_added += count; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.write_q_added += count; + LEGACY_ACLK_STATS_UNLOCK; } lws_callback_on_writable(engine_instance->lws_wsi); @@ -584,9 +585,9 @@ int aclk_lws_wss_client_read(void *buf, size_t count) engine_instance->data_to_read = 0; if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.read_q_consumed += data_to_be_read; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.read_q_consumed += data_to_be_read; + LEGACY_ACLK_STATS_UNLOCK; } abort: diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h index eb99ee024..c68649cf3 100644 --- a/aclk/legacy/aclk_lws_wss_client.h +++ b/aclk/legacy/aclk_lws_wss_client.h @@ -58,7 +58,7 @@ struct aclk_lws_wss_engine_instance { struct lws_wss_packet_buffer *write_buffer_head; struct lws_ring *read_ringbuffer; - //flags to be readed by engine user + //flags to be read by engine user int websocket_connection_up; // currently this is by default disabled diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c index 040068e87..21eae11fd 100644 --- a/aclk/legacy/aclk_query.c +++ b/aclk/legacy/aclk_query.c @@ -2,15 +2,16 @@ #include "aclk_query.h" #include "aclk_stats.h" #include "aclk_rx_msgs.h" +#include "agent_cloud_link.h" #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" -pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; -pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; -#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) -#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) +#define ACLK_QUERY_THREAD_NAME "ACLK_Query" -volatile int aclk_connected = 0; +pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait) +#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait) #ifndef __GNUC__ #pragma region ACLK_QUEUE @@ -188,7 +189,7 @@ aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd * Add a query to execute, the result will be send to the specified topic */ -int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) +int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) { struct aclk_query *new_query, *tmp_query; @@ -205,7 +206,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run if (unlikely(tmp_query)) { if (tmp_query->run_after == run_after) { ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -220,9 +221,9 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run } if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_queued++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.queries_queued++; + LEGACY_ACLK_STATS_UNLOCK; } new_query = callocz(1, sizeof(struct aclk_query)); @@ -255,7 +256,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run aclk_queue.aclk_query_tail = new_query; aclk_queue.count++; ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -264,7 +265,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run aclk_queue.count++; ACLK_QUEUE_UNLOCK; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return 0; } @@ -332,12 +333,12 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) { usec_t t = now_boottime_usec(); - aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); w->response.code = web_client_api_request_v1(host, w, url); t = now_boottime_usec() - t; - aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t); return t; } @@ -375,7 +376,7 @@ static int aclk_execute_query(struct aclk_query *this_query) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); @@ -510,7 +511,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); buffer_strcat(local_buffer, w->response.header_output->buffer); @@ -607,7 +608,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) case ACLK_CMD_ONCONNECT: ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { + if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); break; } @@ -638,7 +639,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) debug(D_ACLK, "EXECUTING a chart delete command"); //TODO: This send the info metadata for now - aclk_send_info_metadata(ACLK_METADATA_SENT, host); + legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host); break; case ACLK_CMD_ALARM: @@ -673,10 +674,10 @@ static int aclk_process_query(struct aclk_query_thread *t_info) debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[t_info->idx]++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.queries_dispatched++; + legacy_aclk_queries_per_thread[t_info->idx]++; + LEGACY_ACLK_STATS_UNLOCK; if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) { getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]); @@ -690,7 +691,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) return 1; } -void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) { if (query_threads && query_threads->thread_list) { for (int i = 0; i < query_threads->count; i++) { @@ -707,8 +708,8 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) } while (this_query); } -#define TASK_LEN_MAX 16 -void aclk_query_threads_start(struct aclk_query_threads *query_threads) +#define TASK_LEN_MAX 22 +void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads) { info("Starting %d query threads.", query_threads->count); @@ -717,10 +718,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) for (int i = 0; i < query_threads->count; i++) { query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics - if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0)) + if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) error("snprintf encoding error"); netdata_thread_create( - &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread, &query_threads->thread_list[i]); } } @@ -730,10 +731,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) * returns actual/updated popcorning state */ -ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) +ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host) { rrdhost_aclk_state_lock(host); - ACLK_POPCORNING_STATE ret = host->aclk_state.state; + ACLK_AGENT_STATE ret = host->aclk_state.state; if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ rrdhost_aclk_state_unlock(host); return ret; @@ -766,7 +767,7 @@ ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) * of no new collectors coming in in order to mark the agent * as stable (set agent_state = AGENT_STABLE) */ -void *aclk_query_main_thread(void *ptr) +void *legacy_aclk_query_main_thread(void *ptr) { struct aclk_query_thread *info = ptr; @@ -785,25 +786,24 @@ void *aclk_query_main_thread(void *ptr) sleep(1); continue; } - ACLK_SHARED_STATE_LOCK; - if (unlikely(!aclk_shared_state.version_neg)) { - if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + if (unlikely(!legacy_aclk_shared_state.version_neg)) { + if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { + legacy_aclk_shared_state_UNLOCK; info("Waiting for ACLK Version Negotiation message from Cloud"); sleep(1); continue; } - errno = 0; - error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." + info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds." " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); - aclk_shared_state.version_neg = ACLK_VERSION_MIN; - aclk_set_rx_handlers(aclk_shared_state.version_neg); + legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN; + aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; rrdhost_aclk_state_lock(localhost); if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { - if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { rrdhost_aclk_state_unlock(localhost); errno = 0; error("ACLK failed to queue on_connect command"); @@ -814,25 +814,25 @@ void *aclk_query_main_thread(void *ptr) } rrdhost_aclk_state_unlock(localhost); - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { - aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); - aclk_shared_state.next_popcorn_host = NULL; + legacy_aclk_shared_state_LOCK; + if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { + legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); + legacy_aclk_shared_state.next_popcorn_host = NULL; aclk_update_next_child_to_popcorn(); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; while (aclk_process_query(info)) { // Process all commands }; - QUERY_THREAD_LOCK; + LEGACY_QUERY_THREAD_LOCK; // TODO: Need to check if there are queries awaiting already - if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait))) sleep_usec(USEC_PER_SEC * 1); - QUERY_THREAD_UNLOCK; + LEGACY_QUERY_THREAD_UNLOCK; } return NULL; diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h index 026985c8d..622b66e2c 100644 --- a/aclk/legacy/aclk_query.h +++ b/aclk/legacy/aclk_query.h @@ -10,14 +10,11 @@ #define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread. -extern pthread_cond_t query_cond_wait; -extern pthread_mutex_t query_lock_wait; +extern pthread_cond_t legacy_query_cond_wait; +extern pthread_mutex_t legacy_query_lock_wait; extern uint8_t *getrusage_called_this_tick; -#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) -#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) - -extern volatile int aclk_connected; - +#define LEGACY_QUERY_THREAD_WAKEUP pthread_cond_signal(&legacy_query_cond_wait) +#define LEGACY_QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&legacy_query_cond_wait) struct aclk_query_thread { netdata_thread_t thread; int idx; @@ -34,11 +31,11 @@ struct aclk_cloud_req_v2 { char *query_endpoint; }; -void *aclk_query_main_thread(void *ptr); -int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); +void *legacy_aclk_query_main_thread(void *ptr); +int legacy_aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); -void aclk_query_threads_start(struct aclk_query_threads *query_threads); -void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); +void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads); +void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); unsigned int aclk_query_size(); #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c index 68dad81e0..d4778bbcf 100644 --- a/aclk/legacy/aclk_rx_msgs.c +++ b/aclk/legacy/aclk_rx_msgs.c @@ -4,6 +4,7 @@ #include "aclk_common.h" #include "aclk_stats.h" #include "aclk_query.h" +#include "agent_cloud_link.h" #ifndef UUID_STR_LEN #define UUID_STR_LEN 37 @@ -107,7 +108,7 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha error( "Received \"http\" message from Cloud with version %d, but ACLK version %d is used", cloud_to_agent->version, - aclk_shared_state.version_neg); + legacy_aclk_shared_state.version_neg); return 1; } @@ -126,14 +127,14 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha return 1; } - if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) + if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_v1++; - aclk_metrics_per_sample.cloud_req_ok++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_v1++; + legacy_aclk_metrics_per_sample.cloud_req_ok++; + LEGACY_ACLK_STATS_UNLOCK; } return 0; @@ -181,11 +182,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha } // we do this here due to cloud_req being taken over by query thread - // which if crazy quick can free it after aclk_queue_query + // which if crazy quick can free it after legacy_aclk_queue_query stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint); - // aclk_queue_query takes ownership of data pointer - if (unlikely(aclk_queue_query( + // legacy_aclk_queue_query takes ownership of data pointer + if (unlikely(legacy_aclk_queue_query( cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD_QUERY_2))) { error("ACLK failed to queue incoming \"http\" v2 message"); @@ -193,11 +194,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha } if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_v2++; - aclk_metrics_per_sample.cloud_req_ok++; - aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_v2++; + legacy_aclk_metrics_per_sample.cloud_req_ok++; + legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; + LEGACY_ACLK_STATS_UNLOCK; } return 0; @@ -258,19 +259,19 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); - ACLK_SHARED_STATE_LOCK; - if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + legacy_aclk_shared_state_LOCK; + if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) { errno = 0; error("The \"version\" message came too late ignoring."); goto err_cleanup; } - if (unlikely(aclk_shared_state.version_neg)) { + if (unlikely(legacy_aclk_shared_state.version_neg)) { errno = 0; - error("Version has already been set to %d", aclk_shared_state.version_neg); + error("Version has already been set to %d", legacy_aclk_shared_state.version_neg); goto err_cleanup; } - aclk_shared_state.version_neg = version; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state.version_neg = version; + legacy_aclk_shared_state_UNLOCK; info("Choosing version %d of ACLK", version); @@ -279,7 +280,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha return 0; err_cleanup: - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return 1; } @@ -288,31 +289,31 @@ typedef struct aclk_incoming_msg_type{ int(*fnc)(struct aclk_request *, char *); }aclk_incoming_msg_type; -aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = { +aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v1 }, { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; -aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { +aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = { { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, { .name = "version", .fnc = aclk_handle_version_response }, { .name = NULL, .fnc = NULL } }; -struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1; +struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; void aclk_set_rx_handlers(int version) { if(version >= ACLK_V_COMPRESSION) { - aclk_incoming_msg_types = aclk_incoming_msg_types_compression; + legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression; return; } - aclk_incoming_msg_types = aclk_incoming_msg_types_v1; + legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; } -int aclk_handle_cloud_message(char *payload) +int legacy_aclk_handle_cloud_message(char *payload) { struct aclk_request cloud_to_agent; memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); @@ -325,7 +326,7 @@ int aclk_handle_cloud_message(char *payload) debug(D_ACLK, "ACLK incoming message (%s)", payload); - int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse); if (unlikely(rc != JSON_OK)) { errno = 0; @@ -339,22 +340,22 @@ int aclk_handle_cloud_message(char *payload) goto err_cleanup; } - if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { + if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); goto err_cleanup; } - for (int i = 0; aclk_incoming_msg_types[i].name; i++) { - if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { - if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { + for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) { + if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { // in case of success handler is supposed to clean up after itself // or as in the case of aclk_handle_cloud_request take // ownership of the pointers (done to avoid copying) - // see what `aclk_queue_query` parameter `internal` does + // see what `legacy_aclk_queue_query` parameter `internal` does // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! // msg handlers (namely aclk_handle_version_response) - // can freely change what aclk_incoming_msg_types points to + // can freely change what legacy_aclk_incoming_msg_types points to // so either exit or restart this for loop freez(cloud_to_agent.type_id); return 0; @@ -378,9 +379,9 @@ err_cleanup: err_cleanup_nojson: if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_err++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics_per_sample.cloud_req_err++; + LEGACY_ACLK_STATS_UNLOCK; } return 1; diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h index 3095e41a7..f1f99114f 100644 --- a/aclk/legacy/aclk_rx_msgs.h +++ b/aclk/legacy/aclk_rx_msgs.h @@ -3,10 +3,10 @@ #ifndef NETDATA_ACLK_RX_MSGS_H #define NETDATA_ACLK_RX_MSGS_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" -int aclk_handle_cloud_message(char *payload); +int legacy_aclk_handle_cloud_message(char *payload); void aclk_set_rx_handlers(int version); diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c index 88679cb3c..fbbb322a1 100644 --- a/aclk/legacy/aclk_stats.c +++ b/aclk/legacy/aclk_stats.c @@ -1,33 +1,31 @@ #include "aclk_stats.h" -netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; +netdata_mutex_t legacy_aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; -int aclk_stats_enabled; - -int query_thread_count; +int legacy_query_thread_count; // data ACLK stats need per query thread -struct aclk_qt_data { +struct legacy_aclk_qt_data { RRDDIM *dim; -} *aclk_qt_data = NULL; +} *legacy_aclk_qt_data = NULL; // ACLK per query thread cpu stats -struct aclk_cpu_data { +struct legacy_aclk_cpu_data { RRDDIM *user; RRDDIM *system; RRDSET *st; -} *aclk_cpu_data = NULL; +} *legacy_aclk_cpu_data = NULL; -uint32_t *aclk_queries_per_thread = NULL; -uint32_t *aclk_queries_per_thread_sample = NULL; +uint32_t *legacy_aclk_queries_per_thread = NULL; +uint32_t *legacy_aclk_queries_per_thread_sample = NULL; struct rusage *rusage_per_thread; uint8_t *getrusage_called_this_tick = NULL; -struct aclk_metrics aclk_metrics = { +static struct legacy_aclk_metrics legacy_aclk_metrics = { .online = 0, }; -struct aclk_metrics_per_sample aclk_metrics_per_sample; +struct legacy_aclk_metrics_per_sample legacy_aclk_metrics_per_sample; struct aclk_mat_metrics aclk_mat_metrics = { #ifdef NETDATA_INTERNAL_CHECKS @@ -61,20 +59,20 @@ struct aclk_mat_metrics aclk_mat_metrics = { "by query thread (just before passing to the database)." } }; -void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) +void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) { if (aclk_stats_enabled) { - ACLK_STATS_LOCK; + LEGACY_ACLK_STATS_LOCK; if (metric->max < measurement) metric->max = measurement; metric->total += measurement; metric->count++; - ACLK_STATS_UNLOCK; + LEGACY_ACLK_STATS_UNLOCK; } } -static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent) +static void aclk_stats_collect(struct legacy_aclk_metrics_per_sample *per_sample, struct legacy_aclk_metrics *permanent) { static RRDSET *st_aclkstats = NULL; static RRDDIM *rd_online_status = NULL; @@ -93,7 +91,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc rrdset_done(st_aclkstats); } -static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_query_queue(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st_query_thread = NULL; static RRDDIM *rd_queued = NULL; @@ -115,7 +113,7 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) rrdset_done(st_query_thread); } -static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_write_q(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_wq_add = NULL; @@ -137,7 +135,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_read_q(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_rq_add = NULL; @@ -159,7 +157,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_cloud_req(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_rq_ok = NULL; @@ -181,7 +179,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -static void aclk_stats_cloud_req_version(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_cloud_req_version(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; static RRDDIM *rd_rq_v1 = NULL; @@ -223,7 +221,7 @@ int aclk_cloud_req_type_to_idx(const char *name) return 0; } -static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_cloud_req_cmd(struct legacy_aclk_metrics_per_sample *per_sample) { static RRDSET *st; static int initialized = 0; @@ -246,7 +244,7 @@ static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } -#define MAX_DIM_NAME 16 +#define MAX_DIM_NAME 22 static void aclk_stats_query_threads(uint32_t *queries_per_thread) { static RRDSET *st = NULL; @@ -258,16 +256,16 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - for (int i = 0; i < query_thread_count; i++) { - if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) + for (int i = 0; i < legacy_query_thread_count; i++) { + if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) error("snprintf encoding error"); - aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + legacy_aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } } else rrdset_next(st); - for (int i = 0; i < query_thread_count; i++) { - rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); + for (int i = 0; i < legacy_query_thread_count; i++) { + rrddim_set_by_pointer(st, legacy_aclk_qt_data[i].dim, queries_per_thread[i]); } rrdset_done(st); @@ -301,59 +299,59 @@ static void aclk_stats_cpu_threads(void) char id[100 + 1]; char title[100 + 1]; - for (int i = 0; i < query_thread_count; i++) { - if (unlikely(!aclk_cpu_data[i].st)) { + for (int i = 0; i < legacy_query_thread_count; i++) { + if (unlikely(!legacy_aclk_cpu_data[i].st)) { snprintfz(id, 100, "aclk_thread%d_cpu", i); snprintfz(title, 100, "Cpu Usage For Thread No %d", i); - aclk_cpu_data[i].st = rrdset_create_localhost( + legacy_aclk_cpu_data[i].st = rrdset_create_localhost( "netdata", id, NULL, "aclk", NULL, title, "milliseconds/s", "netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - aclk_cpu_data[i].user = rrddim_add(aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - aclk_cpu_data[i].system = rrddim_add(aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + legacy_aclk_cpu_data[i].user = rrddim_add(legacy_aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + legacy_aclk_cpu_data[i].system = rrddim_add(legacy_aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); } else - rrdset_next(aclk_cpu_data[i].st); + rrdset_next(legacy_aclk_cpu_data[i].st); } - for (int i = 0; i < query_thread_count; i++) { - rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec); - rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec); - rrdset_done(aclk_cpu_data[i].st); + for (int i = 0; i < legacy_query_thread_count; i++) { + rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec); + rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec); + rrdset_done(legacy_aclk_cpu_data[i].st); } } -void aclk_stats_thread_cleanup() +void legacy_aclk_stats_thread_cleanup() { - freez(aclk_qt_data); - freez(aclk_queries_per_thread); - freez(aclk_queries_per_thread_sample); - freez(aclk_cpu_data); + freez(legacy_aclk_qt_data); + freez(legacy_aclk_queries_per_thread); + freez(legacy_aclk_queries_per_thread_sample); + freez(legacy_aclk_cpu_data); freez(rusage_per_thread); } -void *aclk_stats_main_thread(void *ptr) +void *legacy_aclk_stats_main_thread(void *ptr) { struct aclk_stats_thread *args = ptr; - query_thread_count = args->query_thread_count; - aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); - aclk_cpu_data = callocz(query_thread_count, sizeof(struct aclk_cpu_data)); - aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); - aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); - rusage_per_thread = callocz(query_thread_count, sizeof(struct rusage)); - getrusage_called_this_tick = callocz(query_thread_count, sizeof(uint8_t)); + legacy_query_thread_count = args->query_thread_count; + legacy_aclk_qt_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_qt_data)); + legacy_aclk_cpu_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_cpu_data)); + legacy_aclk_queries_per_thread = callocz(legacy_query_thread_count, sizeof(uint32_t)); + legacy_aclk_queries_per_thread_sample = callocz(legacy_query_thread_count, sizeof(uint32_t)); + rusage_per_thread = callocz(legacy_query_thread_count, sizeof(struct rusage)); + getrusage_called_this_tick = callocz(legacy_query_thread_count, sizeof(uint8_t)); heartbeat_t hb; heartbeat_init(&hb); usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; - memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample)); - struct aclk_metrics_per_sample per_sample; - struct aclk_metrics permanent; + struct legacy_aclk_metrics_per_sample per_sample; + struct legacy_aclk_metrics permanent; while (!netdata_exit) { netdata_thread_testcancel(); @@ -363,17 +361,17 @@ void *aclk_stats_main_thread(void *ptr) heartbeat_next(&hb, step_ut); if (netdata_exit) break; - ACLK_STATS_LOCK; + LEGACY_ACLK_STATS_LOCK; // to not hold lock longer than necessary, especially not to hold it // during database rrd* operations - memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); - memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); - memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + memcpy(&per_sample, &legacy_aclk_metrics_per_sample, sizeof(struct legacy_aclk_metrics_per_sample)); + memcpy(&permanent, &legacy_aclk_metrics, sizeof(struct legacy_aclk_metrics)); + memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample)); - memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); - memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); - memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * query_thread_count); - ACLK_STATS_UNLOCK; + memcpy(legacy_aclk_queries_per_thread_sample, legacy_aclk_queries_per_thread, sizeof(uint32_t) * legacy_query_thread_count); + memset(legacy_aclk_queries_per_thread, 0, sizeof(uint32_t) * legacy_query_thread_count); + memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * legacy_query_thread_count); + LEGACY_ACLK_STATS_UNLOCK; aclk_stats_collect(&per_sample, &permanent); aclk_stats_query_queue(&per_sample); @@ -386,7 +384,7 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_cloud_req_cmd(&per_sample); - aclk_stats_query_threads(aclk_queries_per_thread_sample); + aclk_stats_query_threads(legacy_aclk_queries_per_thread_sample); aclk_stats_cpu_threads(); @@ -400,14 +398,14 @@ void *aclk_stats_main_thread(void *ptr) return 0; } -void aclk_stats_upd_online(int online) { +void legacy_aclk_stats_upd_online(int online) { if(!aclk_stats_enabled) return; - ACLK_STATS_LOCK; - aclk_metrics.online = online; + LEGACY_ACLK_STATS_LOCK; + legacy_aclk_metrics.online = online; if(!online) - aclk_metrics_per_sample.offline_during_sample = 1; - ACLK_STATS_UNLOCK; + legacy_aclk_metrics_per_sample.offline_during_sample = 1; + LEGACY_ACLK_STATS_UNLOCK; } diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h index 5e50a2272..560de3b5e 100644 --- a/aclk/legacy/aclk_stats.h +++ b/aclk/legacy/aclk_stats.h @@ -3,18 +3,16 @@ #ifndef NETDATA_ACLK_STATS_H #define NETDATA_ACLK_STATS_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "libnetdata/libnetdata.h" #include "aclk_common.h" #define ACLK_STATS_THREAD_NAME "ACLK_Stats" -extern netdata_mutex_t aclk_stats_mutex; +extern netdata_mutex_t legacy_aclk_stats_mutex; -#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex) -#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex) - -extern int aclk_stats_enabled; +#define LEGACY_ACLK_STATS_LOCK netdata_mutex_lock(&legacy_aclk_stats_mutex) +#define LEGACY_ACLK_STATS_UNLOCK netdata_mutex_unlock(&legacy_aclk_stats_mutex) struct aclk_stats_thread { netdata_thread_t *thread; @@ -22,7 +20,7 @@ struct aclk_stats_thread { }; // preserve between samples -struct aclk_metrics { +struct legacy_aclk_metrics { volatile uint8_t online; }; @@ -53,7 +51,7 @@ extern struct aclk_mat_metrics { struct aclk_metric_mat cloud_q_recvd_to_processed; } aclk_mat_metrics; -void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); +void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); #define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7 // if you change update cloud_req_type_names @@ -61,7 +59,7 @@ void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurem int aclk_cloud_req_type_to_idx(const char *name); // reset to 0 on every sample -extern struct aclk_metrics_per_sample { +extern struct legacy_aclk_metrics_per_sample { /* in the unlikely event of ACLK disconnecting and reconnecting under 1 sampling rate we want to make sure we record the disconnection @@ -90,13 +88,13 @@ extern struct aclk_metrics_per_sample { #endif struct aclk_metric_mat_data cloud_q_db_query_time; struct aclk_metric_mat_data cloud_q_recvd_to_processed; -} aclk_metrics_per_sample; +} legacy_aclk_metrics_per_sample; -extern uint32_t *aclk_queries_per_thread; +extern uint32_t *legacy_aclk_queries_per_thread; extern struct rusage *rusage_per_thread; -void *aclk_stats_main_thread(void *ptr); -void aclk_stats_thread_cleanup(); -void aclk_stats_upd_online(int online); +void *legacy_aclk_stats_main_thread(void *ptr); +void legacy_aclk_stats_thread_cleanup(); +void legacy_aclk_stats_upd_online(int online); #endif /* NETDATA_ACLK_STATS_H */ diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c index 5ed7e66af..80ca23971 100644 --- a/aclk/legacy/agent_cloud_link.c +++ b/aclk/legacy/agent_cloud_link.c @@ -6,6 +6,7 @@ #include "aclk_query.h" #include "aclk_common.h" #include "aclk_stats.h" +#include "../aclk_collector_list.h" #ifdef ENABLE_ACLK #include <libwebsockets.h> @@ -15,46 +16,20 @@ int aclk_shutting_down = 0; // Other global state static int aclk_subscribed = 0; -static int aclk_disable_single_updates = 0; static char *aclk_username = NULL; static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_force_reconnect = 0; // Indication from lower layers -usec_t aclk_session_us = 0; // Used by the mqtt layer -time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; -static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) -#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) -#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) - void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); void aclk_lws_wss_destroy_context(); -/* - * Maintain a list of collectors and chart count - * If all the charts of a collector are deleted - * then a new metadata dataset must be send to the cloud - * - */ -struct _collector { - time_t created; - uint32_t count; //chart count - uint32_t hostname_hash; - uint32_t plugin_hash; - uint32_t module_hash; - char *hostname; - char *plugin_name; - char *module_name; - struct _collector *next; -}; - -struct _collector *collector_list = NULL; char *create_uuid() { @@ -67,7 +42,7 @@ char *create_uuid() return uuid_str; } -int cloud_to_agent_parse(JSON_ENTRY *e) +int legacy_cloud_to_agent_parse(JSON_ENTRY *e) { struct aclk_request *data = e->callback_data; @@ -247,202 +222,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size) return final_topic; } -#ifndef __GNUC__ -#pragma region ACLK Internal Collector Tracking -#endif - -/* - * Free a collector structure - */ - -static void _free_collector(struct _collector *collector) -{ - if (likely(collector->plugin_name)) - freez(collector->plugin_name); - - if (likely(collector->module_name)) - freez(collector->module_name); - - if (likely(collector->hostname)) - freez(collector->hostname); - - freez(collector); -} - -/* - * This will report the collector list - * - */ -#ifdef ACLK_DEBUG -static void _dump_collector_list() -{ - struct _collector *tmp_collector; - - COLLECTOR_LOCK; - - info("DUMPING ALL COLLECTORS"); - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - info("DUMPING ALL COLLECTORS -- nothing found"); - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - - while (tmp_collector) { - info( - "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, - tmp_collector->plugin_name ? tmp_collector->plugin_name : "", - tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); - - tmp_collector = tmp_collector->next; - } - info("DUMPING ALL COLLECTORS DONE"); - COLLECTOR_UNLOCK; -} -#endif - -/* - * This will cleanup the collector list - * - */ -static void _reset_collector_list() -{ - struct _collector *tmp_collector, *next_collector; - - COLLECTOR_LOCK; - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - collector_list->count = 0; - collector_list->next = NULL; - - // We broke the link; we can unlock - COLLECTOR_UNLOCK; - - while (tmp_collector) { - next_collector = tmp_collector->next; - _free_collector(tmp_collector); - tmp_collector = next_collector; - } -} - -/* - * Find a collector (if it exists) - * Must lock before calling this - * If last_collector is not null, it will return the previous collector in the linked - * list (used in collector delete) - */ -static struct _collector *_find_collector( - const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) -{ - struct _collector *tmp_collector, *prev_collector; - uint32_t plugin_hash; - uint32_t module_hash; - uint32_t hostname_hash; - - if (unlikely(!collector_list)) { - collector_list = callocz(1, sizeof(struct _collector)); - return NULL; - } - - if (unlikely(!collector_list->next)) - return NULL; - - plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - module_hash = module_name ? simple_hash(module_name) : 1; - hostname_hash = simple_hash(hostname); - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - prev_collector = collector_list; - while (tmp_collector) { - if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && - hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && - (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && - (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { - if (unlikely(last_collector)) - *last_collector = prev_collector; - - return tmp_collector; - } - - prev_collector = tmp_collector; - tmp_collector = tmp_collector->next; - } - - return tmp_collector; -} - -/* - * Called to delete a collector - * It will reduce the count (chart_count) and will remove it - * from the linked list if the count reaches zero - * The structure will be returned to the caller to free - * the resources - * - */ -static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector, *prev_collector = NULL; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); - - if (likely(tmp_collector)) { - --tmp_collector->count; - if (unlikely(!tmp_collector->count)) - prev_collector->next = tmp_collector->next; - } - return tmp_collector; -} - -/* - * Add a new collector (plugin / module) to the list - * If it already exists just update the chart count - * - * Lock before calling - */ -static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); - - if (unlikely(!tmp_collector)) { - tmp_collector = callocz(1, sizeof(struct _collector)); - tmp_collector->hostname_hash = simple_hash(hostname); - tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; - - tmp_collector->hostname = strdupz(hostname); - tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; - tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; - - tmp_collector->next = collector_list->next; - collector_list->next = tmp_collector; - } - tmp_collector->count++; - debug( - D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", - module_name ? module_name : "*", tmp_collector->count); - return tmp_collector; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -/* Avoids the need to scan trough all RRDHOSTS +/* Avoids the need to scan through all RRDHOSTS * every time any Query Thread Wakes Up * (every time we need to check child popcorn expiry) - * call with ACLK_SHARED_STATE_LOCK held + * call with legacy_aclk_shared_state_LOCK held */ void aclk_update_next_child_to_popcorn(void) { @@ -462,19 +245,19 @@ void aclk_update_next_child_to_popcorn(void) any = 1; - if (unlikely(!aclk_shared_state.next_popcorn_host)) { - aclk_shared_state.next_popcorn_host = host; + if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) { + legacy_aclk_shared_state.next_popcorn_host = host; rrdhost_aclk_state_unlock(host); continue; } - if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) - aclk_shared_state.next_popcorn_host = host; + if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) + legacy_aclk_shared_state.next_popcorn_host = host; rrdhost_aclk_state_unlock(host); } if(!any) - aclk_shared_state.next_popcorn_host = NULL; + legacy_aclk_shared_state.next_popcorn_host = NULL; rrd_unlock(); } @@ -487,7 +270,7 @@ static int aclk_popcorn_check_bump(RRDHOST *host) { time_t now = now_monotonic_sec(); int updated = 0, ret; - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); ret = ACLK_IS_HOST_INITIALIZING(host); @@ -502,12 +285,12 @@ static int aclk_popcorn_check_bump(RRDHOST *host) if (host != localhost && updated) aclk_update_next_child_to_popcorn(); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return ret; } rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return ret; } @@ -523,13 +306,13 @@ static void aclk_start_host_popcorning(RRDHOST *host) { usec_t now = now_monotonic_sec(); info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { errno = 0; error("Localhost is allowed to do popcorning only once after startup!"); rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return; } @@ -539,16 +322,16 @@ static void aclk_start_host_popcorning(RRDHOST *host) rrdhost_aclk_state_unlock(host); if (host != localhost) aclk_update_next_child_to_popcorn(); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; } static void aclk_stop_host_popcorning(RRDHOST *host) { - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); if (!ACLK_IS_HOST_POPCORNING(host)) { rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return; } @@ -557,18 +340,18 @@ static void aclk_stop_host_popcorning(RRDHOST *host) host->aclk_state.metadata = ACLK_METADATA_REQUIRED; rrdhost_aclk_state_unlock(host); - if(host == aclk_shared_state.next_popcorn_host) { - aclk_shared_state.next_popcorn_host = NULL; + if(host == legacy_aclk_shared_state.next_popcorn_host) { + legacy_aclk_shared_state.next_popcorn_host = NULL; aclk_update_next_child_to_popcorn(); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; } /* * Add a new collector to the list * If it exists, update the chart count */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -589,7 +372,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu if(aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); } @@ -601,7 +384,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu * This function will release the memory used and schedule * a cloud update */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -628,7 +411,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu if (aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); } @@ -639,7 +422,7 @@ static void aclk_graceful_disconnect() // Send a graceful disconnect message BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}"); aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); buffer_free(b); @@ -963,10 +746,10 @@ static void aclk_try_to_connect(char *hostname, int port) aclk_connecting = 1; create_publish_base_topic(); - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = 0; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + legacy_aclk_shared_state.version_neg = 0; + legacy_aclk_shared_state.version_neg_wait_till = 0; + legacy_aclk_shared_state_UNLOCK; rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password); if (unlikely(rc)) { @@ -981,10 +764,10 @@ static inline void aclk_hello_msg() char *msg_id = create_uuid(); - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + legacy_aclk_shared_state.version_neg = 0; + legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + legacy_aclk_shared_state_UNLOCK; //Hello message is versioned separately from the rest of the protocol aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); @@ -1004,7 +787,7 @@ static inline void aclk_hello_msg() * * @return It always returns NULL */ -void *aclk_main(void *ptr) +void *legacy_aclk_main(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct aclk_query_threads query_threads; @@ -1065,7 +848,7 @@ void *aclk_main(void *ptr) stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; netdata_thread_create( - stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread, stats_thread); } @@ -1165,20 +948,20 @@ void *aclk_main(void *ptr) } if (unlikely(!query_threads.thread_list)) { - aclk_query_threads_start(&query_threads); + legacy_aclk_query_threads_start(&query_threads); } time_t now = now_monotonic_sec(); if(aclk_connected && last_periodic_query_wakeup < now) { - // to make `aclk_queue_query()` param `run_after` work + // to make `legacy_aclk_queue_query()` param `run_after` work // also makes per child popcorning work last_periodic_query_wakeup = now; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; } } // forever exited: // Wakeup query thread to cleanup - QUERY_THREAD_WAKEUP_ALL; + LEGACY_QUERY_THREAD_WAKEUP_ALL; freez(aclk_username); freez(aclk_password); @@ -1192,18 +975,18 @@ exited: if (agent_id && aclk_connected) { freez(agent_id); // Wakeup thread to cleanup - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; aclk_graceful_disconnect(); } - aclk_query_threads_cleanup(&query_threads); + legacy_aclk_query_threads_cleanup(&query_threads); _reset_collector_list(); freez(collector_list); if(aclk_stats_enabled) { netdata_thread_join(*stats_thread->thread, NULL); - aclk_stats_thread_cleanup(); + legacy_aclk_stats_thread_cleanup(); freez(stats_thread->thread); freez(stats_thread); } @@ -1306,12 +1089,12 @@ void aclk_connect() { info("Connection detected (%u queued queries)", aclk_query_size()); - aclk_stats_upd_online(1); + legacy_aclk_stats_upd_online(1); aclk_connected = 1; aclk_reconnect_delay(0); - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return; } @@ -1321,7 +1104,7 @@ void aclk_disconnect() if (likely(aclk_connected)) info("Disconnect detected (%u queued queries)", aclk_query_size()); - aclk_stats_upd_online(0); + legacy_aclk_stats_upd_online(0); aclk_subscribed = 0; rrdhost_aclk_state_lock(localhost); @@ -1372,7 +1155,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts */ void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); -void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) +void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1388,9 +1171,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); @@ -1418,7 +1201,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) * /api/v1/info * charts */ -int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) +int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1433,9 +1216,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *hos // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"info\" : "); @@ -1459,14 +1242,14 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); local_buffer->contenttype = CT_APPLICATION_JSON; - if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) - fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg); + if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg); debug(D_ACLK, "Sending Child Disconnect"); char *msg_id = create_uuid(); - aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\"payload\":"); @@ -1486,10 +1269,10 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) return 0; } -void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +void legacy_aclk_host_state_update(RRDHOST *host, int connect) { #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) return; #else #warning "This check became unnecessary. Remove" @@ -1498,19 +1281,14 @@ void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) if (unlikely(aclk_host_initializing(localhost))) return; - switch (cmd) { - case ACLK_CMD_CHILD_CONNECT: - debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); - aclk_start_host_popcorning(host); - aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); - break; - case ACLK_CMD_CHILD_DISCONNECT: - debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); - aclk_stop_host_popcorning(host); - aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); - break; - default: - error("Unknown command for aclk_host_state_update %d.", (int)cmd); + if (connect) { + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + } else { + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); } } @@ -1537,31 +1315,21 @@ void aclk_send_stress_test(size_t size) // or on request int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host) { - aclk_send_info_metadata(state, host); + legacy_aclk_send_info_metadata(state, host); if(host == localhost) - aclk_send_alarm_metadata(state); + legacy_aclk_send_alarm_metadata(state); return 0; } -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} - // Triggered by a health reload, sends the alarm metadata -void aclk_alarm_reload() +void legacy_aclk_alarm_reload() { if (unlikely(aclk_host_initializing(localhost))) return; - if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue on_connect command on alarm reload"); @@ -1585,7 +1353,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); rrdset2json(st, local_buffer, NULL, NULL, 1); @@ -1598,7 +1366,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart) return 0; } -int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) +int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create) { #ifndef ENABLE_ACLK UNUSED(host); @@ -1611,7 +1379,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (!netdata_cloud_setting) return 0; - if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) + if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) return 0; if (aclk_host_initializing(localhost)) @@ -1623,7 +1391,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (aclk_popcorn_check_bump(host)) return 0; - if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { + if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue chart_update command"); @@ -1634,7 +1402,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) #endif } -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) { BUFFER *local_buffer = NULL; @@ -1661,7 +1429,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) char *msg_id = create_uuid(); buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); @@ -1670,7 +1438,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) buffer_sprintf(local_buffer, "\n}"); - if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { + if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue alarm_command on alarm_update"); @@ -1682,3 +1450,53 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; } + +char *legacy_aclk_state(void) +{ + BUFFER *wb = buffer_create(1024); + char *ret; + + buffer_strcat(wb, + "ACLK Available: Yes\n" + "ACLK Implementation: Legacy\n" + "Claimed: " + ); + + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + buffer_strcat(wb, "No\n"); + else { + buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + freez(agent_id); + } + + buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No"); + + ret = strdupz(buffer_tostring(wb)); + buffer_free(wb); + return ret; +} + +char *legacy_aclk_state_json(void) +{ + BUFFER *wb = buffer_create(1024); + char *agent_id = is_agent_claimed(); + + buffer_sprintf(wb, + "{\"aclk-available\":true," + "\"aclk-implementation\":\"Legacy\"," + "\"agent-claimed\":%s," + "\"claimed-id\":", + agent_id ? "true" : "false" + ); + + if (agent_id) { + buffer_sprintf(wb, "\"%s\"", agent_id); + freez(agent_id); + } else + buffer_strcat(wb, "null"); + + buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false"); + + return strdupz(buffer_tostring(wb)); +} diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h index bfcfef8e9..8954a337a 100644 --- a/aclk/legacy/agent_cloud_link.h +++ b/aclk/legacy/agent_cloud_link.h @@ -3,11 +3,10 @@ #ifndef NETDATA_AGENT_CLOUD_LINK_H #define NETDATA_AGENT_CLOUD_LINK_H -#include "../../daemon/common.h" +#include "daemon/common.h" #include "mqtt.h" #include "aclk_common.h" -#define ACLK_THREAD_NAME "ACLK_Query" #define ACLK_CHART_TOPIC "outbound/meta" #define ACLK_ALARMS_TOPIC "outbound/alarms" #define ACLK_METADATA_TOPIC "outbound/meta" @@ -18,7 +17,6 @@ #define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg) #define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds -#define ACLK_QOS 1 #define ACLK_PING_INTERVAL 60 #define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop @@ -42,16 +40,7 @@ struct aclk_request { typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION; -void *aclk_main(void *ptr); - -#define NETDATA_ACLK_HOOK \ - { .name = "ACLK_Main", \ - .config_section = NULL, \ - .config_name = NULL, \ - .enabled = 1, \ - .thread = NULL, \ - .init_routine = NULL, \ - .start_routine = aclk_main }, +void *legacy_aclk_main(void *ptr); extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id); @@ -62,32 +51,35 @@ char *create_uuid(); // callbacks for agent cloud link int aclk_subscribe(char *topic, int qos); -int cloud_to_agent_parse(JSON_ENTRY *e); +int legacy_cloud_to_agent_parse(JSON_ENTRY *e); void aclk_disconnect(); void aclk_connect(); +#ifdef ENABLE_ACLK int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host); -int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); -void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); +int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); +void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); int aclk_wait_for_initialization(); char *create_publish_base_topic(); int aclk_send_single_chart(RRDHOST *host, char *chart); -int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); +int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create); +int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version); -int aclk_handle_cloud_message(char *payload); -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void aclk_alarm_reload(); +int legacy_aclk_handle_cloud_message(char *payload); +void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void legacy_aclk_alarm_reload(void); unsigned long int aclk_reconnect_delay(int mode); extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); -void aclk_single_update_enable(); -void aclk_single_update_disable(); -void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd); +void legacy_aclk_host_state_update(RRDHOST *host, int connect); int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd); void aclk_update_next_child_to_popcorn(void); +char *legacy_aclk_state(void); +char *legacy_aclk_state_json(void); +#endif + #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c index 74f774555..0e4bb2ec9 100644 --- a/aclk/legacy/mqtt.c +++ b/aclk/legacy/mqtt.c @@ -1,12 +1,16 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include <libnetdata/json/json.h> -#include "../../daemon/common.h" +#include "daemon/common.h" #include "mqtt.h" #include "aclk_lws_wss_client.h" #include "aclk_stats.h" #include "aclk_rx_msgs.h" +#include "agent_cloud_link.h" + +#define ACLK_QOS 1 + extern usec_t aclk_session_us; extern time_t aclk_session_sec; @@ -27,7 +31,7 @@ void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosqu UNUSED(mosq); UNUSED(obj); - aclk_handle_cloud_message(msg->payload); + legacy_aclk_handle_cloud_message(msg->payload); } void publish_callback(struct mosquitto *mosq, void *obj, int rc) @@ -44,7 +48,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc) info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff); - aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff); + legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.latency, diff); #endif return; } diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h index cc4765d62..98d599f51 100644 --- a/aclk/legacy/mqtt.h +++ b/aclk/legacy/mqtt.h @@ -19,7 +19,7 @@ const char *_link_strerror(int rc); int _link_set_lwt(char *topic, int qos); -int aclk_handle_cloud_message(char *); +int legacy_aclk_handle_cloud_message(char *); extern char *get_topic(char *sub_topic, char *final_topic, int max_size); #endif //NETDATA_MQTT_H diff --git a/aclk/schema-wrappers/alarm_config.cc b/aclk/schema-wrappers/alarm_config.cc new file mode 100644 index 000000000..56d7e6f39 --- /dev/null +++ b/aclk/schema-wrappers/alarm_config.cc @@ -0,0 +1,147 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "alarm_config.h" + +#include "proto/alarm/v1/config.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +using namespace alarms::v1; + +void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg) +{ + freez(cfg->alarm); + freez(cfg->tmpl); + freez(cfg->on_chart); + + freez(cfg->classification); + freez(cfg->type); + freez(cfg->component); + + freez(cfg->os); + freez(cfg->hosts); + freez(cfg->plugin); + freez(cfg->module); + freez(cfg->charts); + freez(cfg->families); + freez(cfg->lookup); + freez(cfg->every); + freez(cfg->units); + + freez(cfg->green); + freez(cfg->red); + + freez(cfg->calculation_expr); + freez(cfg->warning_expr); + freez(cfg->critical_expr); + + freez(cfg->recipient); + freez(cfg->exec); + freez(cfg->delay); + freez(cfg->repeat); + freez(cfg->info); + freez(cfg->options); + freez(cfg->host_labels); + + freez(cfg->p_db_lookup_dimensions); + freez(cfg->p_db_lookup_method); + freez(cfg->p_db_lookup_options); +} + +char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data) +{ + ProvideAlarmConfiguration msg; + AlarmConfiguration *cfg = msg.mutable_config(); + + msg.set_config_hash(data->cfg_hash); + + if (data->cfg.alarm) + cfg->set_alarm(data->cfg.alarm); + if (data->cfg.tmpl) + cfg->set_template_(data->cfg.tmpl); + if(data->cfg.on_chart) + cfg->set_on_chart(data->cfg.on_chart); + + if (data->cfg.classification) + cfg->set_classification(data->cfg.classification); + if (data->cfg.type) + cfg->set_type(data->cfg.type); + if (data->cfg.component) + cfg->set_component(data->cfg.component); + + if (data->cfg.os) + cfg->set_os(data->cfg.os); + if (data->cfg.hosts) + cfg->set_hosts(data->cfg.hosts); + if (data->cfg.plugin) + cfg->set_plugin(data->cfg.plugin); + if(data->cfg.module) + cfg->set_module(data->cfg.module); + if(data->cfg.charts) + cfg->set_charts(data->cfg.charts); + if(data->cfg.families) + cfg->set_families(data->cfg.families); + if(data->cfg.lookup) + cfg->set_lookup(data->cfg.lookup); + if(data->cfg.every) + cfg->set_every(data->cfg.every); + if(data->cfg.units) + cfg->set_units(data->cfg.units); + + if (data->cfg.green) + cfg->set_green(data->cfg.green); + if (data->cfg.red) + cfg->set_red(data->cfg.red); + + if (data->cfg.calculation_expr) + cfg->set_calculation_expr(data->cfg.calculation_expr); + if (data->cfg.warning_expr) + cfg->set_warning_expr(data->cfg.warning_expr); + if (data->cfg.critical_expr) + cfg->set_critical_expr(data->cfg.critical_expr); + + if (data->cfg.recipient) + cfg->set_recipient(data->cfg.recipient); + if (data->cfg.exec) + cfg->set_exec(data->cfg.exec); + if (data->cfg.delay) + cfg->set_delay(data->cfg.delay); + if (data->cfg.repeat) + cfg->set_repeat(data->cfg.repeat); + if (data->cfg.info) + cfg->set_info(data->cfg.info); + if (data->cfg.options) + cfg->set_options(data->cfg.options); + if (data->cfg.host_labels) + cfg->set_host_labels(data->cfg.host_labels); + + cfg->set_p_db_lookup_after(data->cfg.p_db_lookup_after); + cfg->set_p_db_lookup_before(data->cfg.p_db_lookup_before); + if (data->cfg.p_db_lookup_dimensions) + cfg->set_p_db_lookup_dimensions(data->cfg.p_db_lookup_dimensions); + if (data->cfg.p_db_lookup_method) + cfg->set_p_db_lookup_method(data->cfg.p_db_lookup_method); + if (data->cfg.p_db_lookup_options) + cfg->set_p_db_lookup_options(data->cfg.p_db_lookup_options); + cfg->set_p_update_every(data->cfg.p_update_every); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (!msg.SerializeToArray(bin, *len)) + return NULL; + + return bin; +} + +char *parse_send_alarm_configuration(const char *data, size_t len) +{ + SendAlarmConfiguration msg; + if (!msg.ParseFromArray(data, len)) + return NULL; + if (!msg.config_hash().c_str()) + return NULL; + return strdupz(msg.config_hash().c_str()); +} + diff --git a/aclk/schema-wrappers/alarm_config.h b/aclk/schema-wrappers/alarm_config.h new file mode 100644 index 000000000..157fbc60f --- /dev/null +++ b/aclk/schema-wrappers/alarm_config.h @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H +#define ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H + +#include <stdlib.h> +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +struct aclk_alarm_configuration { + char *alarm; + char *tmpl; + char *on_chart; + + char *classification; + char *type; + char *component; + + char *os; + char *hosts; + char *plugin; + char *module; + char *charts; + char *families; + char *lookup; + char *every; + char *units; + + char *green; + char *red; + + char *calculation_expr; + char *warning_expr; + char *critical_expr; + + char *recipient; + char *exec; + char *delay; + char *repeat; + char *info; + char *options; + char *host_labels; + + int32_t p_db_lookup_after; + int32_t p_db_lookup_before; + char *p_db_lookup_dimensions; + char *p_db_lookup_method; + char *p_db_lookup_options; + int32_t p_update_every; +}; + +void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg); + +struct provide_alarm_configuration { + char *cfg_hash; + struct aclk_alarm_configuration cfg; +}; + +char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data); +char *parse_send_alarm_configuration(const char *data, size_t len); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H */ diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc new file mode 100644 index 000000000..5868e5d67 --- /dev/null +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -0,0 +1,248 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "alarm_stream.h" + +#include "proto/alarm/v1/stream.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +using namespace alarms::v1; + +struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len) +{ + struct start_alarm_streaming ret; + memset(&ret, 0, sizeof(ret)); + + StartAlarmStreaming msg; + + if (!msg.ParseFromArray(data, len)) + return ret; + + ret.node_id = strdupz(msg.node_id().c_str()); + ret.batch_id = msg.batch_id(); + ret.start_seq_id = msg.start_sequnce_id(); + + return ret; +} + +char *parse_send_alarm_log_health(const char *data, size_t len) +{ + SendAlarmLogHealth msg; + if (!msg.ParseFromArray(data, len)) + return NULL; + return strdupz(msg.node_id().c_str()); +} + +char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data) +{ + AlarmLogHealth msg; + LogEntries *entries; + + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + msg.set_enabled(data->enabled); + + switch (data->status) { + case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE: + msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE); + break; + case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING: + msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING); + break; + case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED: + msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED); + break; + default: + error("Unknown status of AlarmLogHealth LogEntry"); + return NULL; + } + + entries = msg.mutable_log_entries(); + entries->set_first_sequence_id(data->log_entries.first_seq_id); + entries->set_last_sequence_id(data->log_entries.last_seq_id); + + set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when()); + set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when()); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (!msg.SerializeToArray(bin, *len)) + return NULL; + + return bin; +} + +static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status) +{ + switch (status) { + case aclk_alarm_status::ALARM_STATUS_NULL: + return alarms::v1::ALARM_STATUS_NULL; + case aclk_alarm_status::ALARM_STATUS_UNKNOWN: + return alarms::v1::ALARM_STATUS_UNKNOWN; + case aclk_alarm_status::ALARM_STATUS_REMOVED: + return alarms::v1::ALARM_STATUS_REMOVED; + case aclk_alarm_status::ALARM_STATUS_NOT_A_NUMBER: + return alarms::v1::ALARM_STATUS_NOT_A_NUMBER; + case aclk_alarm_status::ALARM_STATUS_CLEAR: + return alarms::v1::ALARM_STATUS_CLEAR; + case aclk_alarm_status::ALARM_STATUS_WARNING: + return alarms::v1::ALARM_STATUS_WARNING; + case aclk_alarm_status::ALARM_STATUS_CRITICAL: + return alarms::v1::ALARM_STATUS_CRITICAL; + default: + error("Unknown alarm status"); + return alarms::v1::ALARM_STATUS_UNKNOWN; + } +} + +void destroy_alarm_log_entry(struct alarm_log_entry *entry) +{ + //freez(entry->node_id); + //freez(entry->claim_id); + + freez(entry->chart); + freez(entry->name); + freez(entry->family); + + freez(entry->config_hash); + + freez(entry->timezone); + + freez(entry->exec_path); + freez(entry->conf_source); + freez(entry->command); + + freez(entry->value_string); + freez(entry->old_value_string); + + freez(entry->rendered_info); +} + +static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto) +{ + proto->set_node_id(data->node_id); + proto->set_claim_id(data->claim_id); + + proto->set_chart(data->chart); + proto->set_name(data->name); + if (data->family) + proto->set_family(data->family); + + proto->set_batch_id(data->batch_id); + proto->set_sequence_id(data->sequence_id); + proto->set_when(data->when); + + proto->set_config_hash(data->config_hash); + + proto->set_utc_offset(data->utc_offset); + proto->set_timezone(data->timezone); + + proto->set_exec_path(data->exec_path); + proto->set_conf_source(data->conf_source); + proto->set_command(data->command); + + proto->set_duration(data->duration); + proto->set_non_clear_duration(data->non_clear_duration); + + + proto->set_status(aclk_alarm_status_to_proto(data->status)); + proto->set_old_status(aclk_alarm_status_to_proto(data->old_status)); + proto->set_delay(data->delay); + proto->set_delay_up_to_timestamp(data->delay_up_to_timestamp); + + proto->set_last_repeat(data->last_repeat); + proto->set_silenced(data->silenced); + + if (data->value_string) + proto->set_value_string(data->value_string); + if (data->old_value_string) + proto->set_old_value_string(data->old_value_string); + + proto->set_value(data->value); + proto->set_old_value(data->old_value); + + proto->set_updated(data->updated); + + proto->set_rendered_info(data->rendered_info); +} + +char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) +{ + AlarmLogEntry le; + + fill_alarm_log_entry(data, &le); + + *len = PROTO_COMPAT_MSG_SIZE(le); + char *bin = (char*)mallocz(*len); + if (!le.SerializeToArray(bin, *len)) + return NULL; + + return bin; +} + +struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len) +{ + SendAlarmSnapshot msg; + if (!msg.ParseFromArray(data, len)) + return NULL; + + struct send_alarm_snapshot *ret = (struct send_alarm_snapshot*)callocz(1, sizeof(struct send_alarm_snapshot)); + if (msg.claim_id().c_str()) + ret->claim_id = strdupz(msg.claim_id().c_str()); + if (msg.node_id().c_str()) + ret->node_id = strdupz(msg.node_id().c_str()); + ret->snapshot_id = msg.snapshot_id(); + ret->sequence_id = msg.sequence_id(); + + return ret; +} + +void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr) +{ + freez(ptr->claim_id); + freez(ptr->node_id); + freez(ptr); +} + +alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data) +{ + AlarmSnapshot *msg = new AlarmSnapshot; + if (unlikely(!msg)) fatal("Cannot allocate memory for AlarmSnapshot"); + + msg->set_node_id(data->node_id); + msg->set_claim_id(data->claim_id); + msg->set_snapshot_id(data->snapshot_id); + msg->set_chunks(data->chunks); + msg->set_chunk(data->chunk); + + // this is handled automatically by add_alarm_log_entry2snapshot function + msg->set_chunk_size(0); + + return msg; +} + +void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data) +{ + AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot; + AlarmLogEntry *alarm_log_entry = alarm_snapshot->add_alarms(); + + fill_alarm_log_entry(data, alarm_log_entry); + + alarm_snapshot->set_chunk_size(alarm_snapshot->chunk_size() + 1); +} + +char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot) +{ + AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot; + *len = PROTO_COMPAT_MSG_SIZE_PTR(alarm_snapshot); + char *bin = (char*)mallocz(*len); + if (!alarm_snapshot->SerializeToArray(bin, *len)) { + delete alarm_snapshot; + return NULL; + } + + delete alarm_snapshot; + return bin; +} diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h new file mode 100644 index 000000000..2932bb192 --- /dev/null +++ b/aclk/schema-wrappers/alarm_stream.h @@ -0,0 +1,134 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H +#define ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H + +#include <stdlib.h> + +#include "database/rrd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +enum alarm_log_status_aclk { + ALARM_LOG_STATUS_UNSPECIFIED = 0, + ALARM_LOG_STATUS_RUNNING = 1, + ALARM_LOG_STATUS_IDLE = 2 +}; + +struct alarm_log_entries { + int64_t first_seq_id; + struct timeval first_when; + + int64_t last_seq_id; + struct timeval last_when; +}; + +struct alarm_log_health { + char *claim_id; + char *node_id; + int enabled; + enum alarm_log_status_aclk status; + struct alarm_log_entries log_entries; +}; + +struct start_alarm_streaming { + char *node_id; + uint64_t batch_id; + uint64_t start_seq_id; +}; + +struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len); +char *parse_send_alarm_log_health(const char *data, size_t len); + +char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data); + +enum aclk_alarm_status { + ALARM_STATUS_NULL = 0, + ALARM_STATUS_UNKNOWN = 1, + ALARM_STATUS_REMOVED = 2, + ALARM_STATUS_NOT_A_NUMBER = 3, + ALARM_STATUS_CLEAR = 4, + ALARM_STATUS_WARNING = 5, + ALARM_STATUS_CRITICAL = 6 +}; + +struct alarm_log_entry { + char *node_id; + char *claim_id; + + char *chart; + char *name; + char *family; + + uint64_t batch_id; + uint64_t sequence_id; + uint64_t when; + + char *config_hash; + + int32_t utc_offset; + char *timezone; + + char *exec_path; + char *conf_source; + char *command; + + uint32_t duration; + uint32_t non_clear_duration; + + enum aclk_alarm_status status; + enum aclk_alarm_status old_status; + uint64_t delay; + uint64_t delay_up_to_timestamp; + + uint64_t last_repeat; + int silenced; + + char *value_string; + char *old_value_string; + + double value; + double old_value; + + // updated alarm entry, when the status of the alarm has been updated by a later entry + int updated; + + // rendered_info + char *rendered_info; +}; + +struct send_alarm_snapshot { + char *node_id; + char *claim_id; + uint64_t snapshot_id; + uint64_t sequence_id; +}; + +struct alarm_snapshot { + char *node_id; + char *claim_id; + uint64_t snapshot_id; + uint32_t chunks; + uint32_t chunk; +}; + +typedef void* alarm_snapshot_proto_ptr_t; + +void destroy_alarm_log_entry(struct alarm_log_entry *entry); + +char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data); + +struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len); +void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr); + +alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data); +void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data); +char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H */ diff --git a/aclk/schema-wrappers/chart_config.cc b/aclk/schema-wrappers/chart_config.cc new file mode 100644 index 000000000..87e34e0df --- /dev/null +++ b/aclk/schema-wrappers/chart_config.cc @@ -0,0 +1,105 @@ +#include "chart_config.h" + +#include "proto/chart/v1/config.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +void destroy_update_chart_config(struct update_chart_config *cfg) +{ + freez(cfg->claim_id); + freez(cfg->node_id); + freez(cfg->hashes); +} + +void destroy_chart_config_updated(struct chart_config_updated *cfg) +{ + freez(cfg->type); + freez(cfg->family); + freez(cfg->context); + freez(cfg->title); + freez(cfg->plugin); + freez(cfg->module); + freez(cfg->units); + freez(cfg->config_hash); +} + +struct update_chart_config parse_update_chart_config(const char *data, size_t len) +{ + chart::v1::UpdateChartConfigs cfgs; + update_chart_config res; + memset(&res, 0, sizeof(res)); + + if (!cfgs.ParseFromArray(data, len)) + return res; + + res.claim_id = strdupz(cfgs.claim_id().c_str()); + res.node_id = strdupz(cfgs.node_id().c_str()); + + // to not do bazillion tiny allocations for individual strings + // we calculate how much memory we will need for all of them + // and allocate at once + int hash_count = cfgs.config_hashes_size(); + size_t total_strlen = 0; + for (int i = 0; i < hash_count; i++) + total_strlen += cfgs.config_hashes(i).length(); + total_strlen += hash_count; //null bytes + + res.hashes = (char**)callocz( 1, + (hash_count+1) * sizeof(char*) + //char * array incl. terminating NULL at the end + total_strlen //strings themselves incl. 1 null byte each + ); + + char* dest = ((char*)res.hashes) + (hash_count + 1 /* NULL ptr */) * sizeof(char *); + // now copy them strings + // null bytes handled by callocz + for (int i = 0; i < hash_count; i++) { + strcpy(dest, cfgs.config_hashes(i).c_str()); + res.hashes[i] = dest; + dest += strlen(dest) + 1 /* end string null */; + } + + return res; +} + +char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size) +{ + chart::v1::ChartConfigsUpdated configs; + for (int i = 0; i < list_size; i++) { + chart::v1::ChartConfigUpdated *config = configs.add_configs(); + config->set_type(config_list[i].type); + if (config_list[i].family) + config->set_family(config_list[i].family); + config->set_context(config_list[i].context); + config->set_title(config_list[i].title); + config->set_priority(config_list[i].priority); + config->set_plugin(config_list[i].plugin); + + if (config_list[i].module) + config->set_module(config_list[i].module); + + switch (config_list[i].chart_type) { + case RRDSET_TYPE_LINE: + config->set_chart_type(chart::v1::LINE); + break; + case RRDSET_TYPE_AREA: + config->set_chart_type(chart::v1::AREA); + break; + case RRDSET_TYPE_STACKED: + config->set_chart_type(chart::v1::STACKED); + break; + default: + return NULL; + } + + config->set_units(config_list[i].units); + config->set_config_hash(config_list[i].config_hash); + } + + *len = PROTO_COMPAT_MSG_SIZE(configs); + char *bin = (char*)mallocz(*len); + configs.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/chart_config.h b/aclk/schema-wrappers/chart_config.h new file mode 100644 index 000000000..f08f76b61 --- /dev/null +++ b/aclk/schema-wrappers/chart_config.h @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H +#define ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H + +#include <stdlib.h> + +#include "database/rrd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct update_chart_config { + char *claim_id; + char *node_id; + char **hashes; +}; + +enum chart_config_chart_type { + LINE, + AREA, + STACKED +}; + +struct chart_config_updated { + char *type; + char *family; + char *context; + char *title; + uint64_t priority; + char *plugin; + char *module; + RRDSET_TYPE chart_type; + char *units; + char *config_hash; +}; + +void destroy_update_chart_config(struct update_chart_config *cfg); +void destroy_chart_config_updated(struct chart_config_updated *cfg); + +struct update_chart_config parse_update_chart_config(const char *data, size_t len); + +char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H */ diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc new file mode 100644 index 000000000..7d820e533 --- /dev/null +++ b/aclk/schema-wrappers/chart_stream.cc @@ -0,0 +1,342 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk/aclk_util.h" + +#include "proto/chart/v1/stream.pb.h" +#include "chart_stream.h" + +#include "schema_wrapper_utils.h" + +#include <sys/time.h> +#include <stdlib.h> + +stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len) +{ + chart::v1::StreamChartsAndDimensions msg; + stream_charts_and_dims_t res; + memset(&res, 0, sizeof(res)); + + if (!msg.ParseFromArray(data, len)) + return res; + + res.node_id = strdup(msg.node_id().c_str()); + res.claim_id = strdup(msg.claim_id().c_str()); + res.seq_id = msg.sequence_id(); + res.batch_id = msg.batch_id(); + set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at); + + return res; +} + +chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len) +{ + chart::v1::ChartsAndDimensionsAck msg; + chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 }; + + if (!msg.ParseFromArray(data, len)) + return res; + + res.node_id = strdup(msg.node_id().c_str()); + res.claim_id = strdup(msg.claim_id().c_str()); + res.last_seq_id = msg.last_sequence_id(); + + return res; +} + +char *generate_reset_chart_messages(size_t *len, chart_reset_t reset) +{ + chart::v1::ResetChartMessages msg; + + msg.set_claim_id(reset.claim_id); + msg.set_node_id(reset.node_id); + switch (reset.reason) { + case DB_EMPTY: + msg.set_reason(chart::v1::ResetReason::DB_EMPTY); + break; + case SEQ_ID_NOT_EXISTS: + msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS); + break; + case TIMESTAMP_MISMATCH: + msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH); + break; + default: + return NULL; + } + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} + +void chart_instance_updated_destroy(struct chart_instance_updated *instance) +{ + freez((char*)instance->id); + freez((char*)instance->claim_id); + + free_label_list(instance->label_head); + + freez((char*)instance->config_hash); +} + +static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update) +{ + google::protobuf::Map<std::string, std::string> *map; + aclk_lib::v1::ACLKMessagePosition *pos; + struct label *label; + + chart->set_id(update->id); + chart->set_claim_id(update->claim_id); + chart->set_node_id(update->node_id); + chart->set_name(update->name); + + map = chart->mutable_chart_labels(); + label = update->label_head; + while (label) { + map->insert({label->key, label->value}); + label = label->next; + } + + switch (update->memory_mode) { + case RRD_MEMORY_MODE_NONE: + chart->set_memory_mode(chart::v1::NONE); + break; + case RRD_MEMORY_MODE_RAM: + chart->set_memory_mode(chart::v1::RAM); + break; + case RRD_MEMORY_MODE_MAP: + chart->set_memory_mode(chart::v1::MAP); + break; + case RRD_MEMORY_MODE_SAVE: + chart->set_memory_mode(chart::v1::SAVE); + break; + case RRD_MEMORY_MODE_ALLOC: + chart->set_memory_mode(chart::v1::ALLOC); + break; + case RRD_MEMORY_MODE_DBENGINE: + chart->set_memory_mode(chart::v1::DB_ENGINE); + break; + default: + return 1; + break; + } + + chart->set_update_every_interval(update->update_every); + chart->set_config_hash(update->config_hash); + + pos = chart->mutable_position(); + pos->set_sequence_id(update->position.sequence_id); + pos->set_previous_sequence_id(update->position.previous_sequence_id); + set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); + + return 0; +} + +static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim) +{ + aclk_lib::v1::ACLKMessagePosition *pos; + + dim->set_id(c_dim->id); + dim->set_chart_id(c_dim->chart_id); + dim->set_node_id(c_dim->node_id); + dim->set_claim_id(c_dim->claim_id); + dim->set_name(c_dim->name); + + set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at()); + set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp()); + + pos = dim->mutable_position(); + pos->set_sequence_id(c_dim->position.sequence_id); + pos->set_previous_sequence_id(c_dim->position.previous_sequence_id); + set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); + + return 0; +} + +char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) +{ + chart::v1::ChartsAndDimensionsUpdated msg; + chart::v1::ChartInstanceUpdated db_chart; + chart::v1::ChartDimensionUpdated db_dim; + aclk_lib::v1::ACLKMessagePosition *pos; + + msg.set_batch_id(batch_id); + + for (int i = 0; payloads[i]; i++) { + if (is_dim[i]) { + if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) { + error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); + return NULL; + } + + pos = db_dim.mutable_position(); + pos->set_sequence_id(new_positions[i].sequence_id); + pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); + set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); + + chart::v1::ChartDimensionUpdated *dim = msg.add_dimensions(); + *dim = db_dim; + } else { + if (!db_chart.ParseFromArray(payloads[i], payload_sizes[i])) { + error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); + return NULL; + } + + pos = db_chart.mutable_position(); + pos->set_sequence_id(new_positions[i].sequence_id); + pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); + set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); + + chart::v1::ChartInstanceUpdated *chart = msg.add_charts(); + *chart = db_chart; + } + } + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + msg.SerializeToArray(bin, *len); + + return bin; +} + +char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) +{ + chart::v1::ChartsAndDimensionsUpdated msg; + + msg.set_batch_id(chart_batch_id); + + for (int i = 0; payloads[i]; i++) { + chart::v1::ChartInstanceUpdated db_msg; + chart::v1::ChartInstanceUpdated *chart; + aclk_lib::v1::ACLKMessagePosition *pos; + + if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { + error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); + return NULL; + } + + pos = db_msg.mutable_position(); + pos->set_sequence_id(new_positions[i].sequence_id); + pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); + set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); + + chart = msg.add_charts(); + *chart = db_msg; + } + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + msg.SerializeToArray(bin, *len); + + return bin; +} + +char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) +{ + chart::v1::ChartsAndDimensionsUpdated msg; + + msg.set_batch_id(chart_batch_id); + + for (int i = 0; payloads[i]; i++) { + chart::v1::ChartDimensionUpdated db_msg; + chart::v1::ChartDimensionUpdated *dim; + aclk_lib::v1::ACLKMessagePosition *pos; + + if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { + error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); + return NULL; + } + + pos = db_msg.mutable_position(); + pos->set_sequence_id(new_positions[i].sequence_id); + pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); + set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); + + dim = msg.add_dimensions(); + *dim = db_msg; + } + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + msg.SerializeToArray(bin, *len); + + return bin; +} + +char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update) +{ + chart::v1::ChartInstanceUpdated *chart = new chart::v1::ChartInstanceUpdated(); + + if (set_chart_instance_updated(chart, update)) + return NULL; + + *len = PROTO_COMPAT_MSG_SIZE_PTR(chart); + char *bin = (char*)mallocz(*len); + chart->SerializeToArray(bin, *len); + + delete chart; + return bin; +} + +char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim) +{ + chart::v1::ChartDimensionUpdated *proto_dim = new chart::v1::ChartDimensionUpdated(); + + if (set_chart_dim_updated(proto_dim, dim)) + return NULL; + + *len = PROTO_COMPAT_MSG_SIZE_PTR(proto_dim); + char *bin = (char*)mallocz(*len); + proto_dim->SerializeToArray(bin, *len); + + delete proto_dim; + return bin; +} + +using namespace google::protobuf; + +char *generate_retention_updated(size_t *len, struct retention_updated *data) +{ + chart::v1::RetentionUpdated msg; + + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + + switch (data->memory_mode) { + case RRD_MEMORY_MODE_NONE: + msg.set_memory_mode(chart::v1::NONE); + break; + case RRD_MEMORY_MODE_RAM: + msg.set_memory_mode(chart::v1::RAM); + break; + case RRD_MEMORY_MODE_MAP: + msg.set_memory_mode(chart::v1::MAP); + break; + case RRD_MEMORY_MODE_SAVE: + msg.set_memory_mode(chart::v1::SAVE); + break; + case RRD_MEMORY_MODE_ALLOC: + msg.set_memory_mode(chart::v1::ALLOC); + break; + case RRD_MEMORY_MODE_DBENGINE: + msg.set_memory_mode(chart::v1::DB_ENGINE); + break; + default: + return NULL; + } + + for (int i = 0; i < data->interval_duration_count; i++) { + Map<uint32, uint32> *map = msg.mutable_interval_durations(); + map->insert({data->interval_durations[i].update_every, data->interval_durations[i].retention}); + } + + set_google_timestamp_from_timeval(data->rotation_timestamp, msg.mutable_rotation_timestamp()); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + msg.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/chart_stream.h b/aclk/schema-wrappers/chart_stream.h new file mode 100644 index 000000000..7a46ecd8e --- /dev/null +++ b/aclk/schema-wrappers/chart_stream.h @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CHART_STREAM_H +#define ACLK_SCHEMA_WRAPPER_CHART_STREAM_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "database/rrd.h" + +typedef struct { + char* claim_id; + char* node_id; + + uint64_t seq_id; + uint64_t batch_id; + + struct timeval seq_id_created_at; +} stream_charts_and_dims_t; + +stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len); + +typedef struct { + char* claim_id; + char* node_id; + + uint64_t last_seq_id; +} chart_and_dim_ack_t; + +chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len); + +enum chart_reset_reason { + DB_EMPTY, + SEQ_ID_NOT_EXISTS, + TIMESTAMP_MISMATCH +}; + +typedef struct { + char *claim_id; + char *node_id; + + enum chart_reset_reason reason; +} chart_reset_t; + +char *generate_reset_chart_messages(size_t *len, const chart_reset_t reset); + +struct aclk_message_position { + uint64_t sequence_id; + struct timeval seq_id_creation_time; + uint64_t previous_sequence_id; +}; + +struct chart_instance_updated { + const char *id; + const char *claim_id; + const char *node_id; + const char *name; + + struct label *label_head; + + RRD_MEMORY_MODE memory_mode; + + uint32_t update_every; + const char * config_hash; + + struct aclk_message_position position; +}; + +void chart_instance_updated_destroy(struct chart_instance_updated *instance); + +struct chart_dimension_updated { + const char *id; + const char *chart_id; + const char *node_id; + const char *claim_id; + const char *name; + struct timeval created_at; + struct timeval last_timestamp; + struct aclk_message_position position; +}; + +typedef struct { + struct chart_instance_updated *charts; + uint16_t chart_count; + + struct chart_dimension_updated *dims; + uint16_t dim_count; + + uint64_t batch_id; +} charts_and_dims_updated_t; + +struct interval_duration { + uint32_t update_every; + uint32_t retention; +}; + +struct retention_updated { + char *claim_id; + char *node_id; + + RRD_MEMORY_MODE memory_mode; + + struct interval_duration *interval_durations; + int interval_duration_count; + + struct timeval rotation_timestamp; +}; + +char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id); +char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); +char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update); +char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions); +char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim); +char *generate_retention_updated(size_t *len, struct retention_updated *data); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CHART_STREAM_H */ diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc new file mode 100644 index 000000000..e3bbfe31f --- /dev/null +++ b/aclk/schema-wrappers/connection.cc @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/agent/v1/connection.pb.h" +#include "proto/agent/v1/disconnect.pb.h" +#include "connection.h" + +#include "schema_wrapper_utils.h" + +#include <sys/time.h> +#include <stdlib.h> + +using namespace agent::v1; + +char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data) +{ + UpdateAgentConnection connupd; + + connupd.set_claim_id(data->claim_id); + connupd.set_reachable(data->reachable); + connupd.set_session_id(data->session_id); + + connupd.set_update_source((data->lwt) ? CONNECTION_UPDATE_SOURCE_LWT : CONNECTION_UPDATE_SOURCE_AGENT); + + struct timeval tv; + gettimeofday(&tv, NULL); + + google::protobuf::Timestamp *timestamp = connupd.mutable_updated_at(); + timestamp->set_seconds(tv.tv_sec); + timestamp->set_nanos(tv.tv_usec * 1000); + + *len = PROTO_COMPAT_MSG_SIZE(connupd); + char *msg = (char*)malloc(*len); + if (msg) + connupd.SerializeToArray(msg, *len); + + return msg; +} + +struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) { + DisconnectReq req; + struct disconnect_cmd *res; + + if (!req.ParseFromArray(data, len)) + return NULL; + + res = (struct disconnect_cmd *)calloc(1, sizeof(struct disconnect_cmd)); + + if (!res) + return NULL; + + res->reconnect_after_s = req.reconnect_after_seconds(); + res->permaban = req.permaban(); + res->error_code = req.error_code(); + if (req.error_description().c_str()) { + res->error_description = strdup(req.error_description().c_str()); + if (!res->error_description) { + free(res); + return NULL; + } + } + + return res; +} diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h new file mode 100644 index 000000000..8c223869a --- /dev/null +++ b/aclk/schema-wrappers/connection.h @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H +#define ACLK_SCHEMA_WRAPPER_CONNECTION_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char *claim_id; + unsigned int reachable:1; + + int64_t session_id; + + unsigned int lwt:1; + +// TODO in future optional fields +// > 15 optional fields: +// How long the system was running until connection (only applicable when reachable=true) +// google.protobuf.Duration system_uptime = 15; +// How long the netdata agent was running until connection (only applicable when reachable=true) +// google.protobuf.Duration agent_uptime = 16; + + +} update_agent_connection_t; + +char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data); + +struct disconnect_cmd { + uint64_t reconnect_after_s; + int permaban; + uint32_t error_code; + char *error_description; +}; + +struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONNECTION_H */ diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc new file mode 100644 index 000000000..0a4c8ece1 --- /dev/null +++ b/aclk/schema-wrappers/node_connection.cc @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/nodeinstance/connection/v1/connection.pb.h" +#include "node_connection.h" + +#include "schema_wrapper_utils.h" + +#include <sys/time.h> +#include <stdlib.h> + +char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data) { + nodeinstance::v1::UpdateNodeInstanceConnection msg; + + if(data->claim_id) + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + + msg.set_liveness(data->live); + msg.set_queryable(data->queryable); + + msg.set_session_id(data->session_id); + msg.set_hops(data->hops); + + struct timeval tv; + gettimeofday(&tv, NULL); + + google::protobuf::Timestamp *timestamp = msg.mutable_updated_at(); + timestamp->set_seconds(tv.tv_sec); + timestamp->set_nanos(tv.tv_usec * 1000); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h new file mode 100644 index 000000000..3fd207213 --- /dev/null +++ b/aclk/schema-wrappers/node_connection.h @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H +#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char* claim_id; + const char* node_id; + + unsigned int live:1; + unsigned int queryable:1; + + int64_t session_id; + + int32_t hops; +} node_instance_connection_t; + +char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H */ diff --git a/aclk/schema-wrappers/node_creation.cc b/aclk/schema-wrappers/node_creation.cc new file mode 100644 index 000000000..c696bb27b --- /dev/null +++ b/aclk/schema-wrappers/node_creation.cc @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/nodeinstance/create/v1/creation.pb.h" +#include "node_creation.h" + +#include "schema_wrapper_utils.h" + +#include <stdlib.h> + +char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data) +{ + nodeinstance::create::v1::CreateNodeInstance msg; + + if (data->claim_id) + msg.set_claim_id(data->claim_id); + msg.set_machine_guid(data->machine_guid); + msg.set_hostname(data->hostname); + msg.set_hops(data->hops); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} + +node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len) +{ + nodeinstance::create::v1::CreateNodeInstanceResult msg; + node_instance_creation_result_t res = { .node_id = NULL, .machine_guid = NULL }; + + if (!msg.ParseFromArray(data, len)) + return res; + + res.node_id = strdup(msg.node_id().c_str()); + res.machine_guid = strdup(msg.machine_guid().c_str()); + return res; +} diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h new file mode 100644 index 000000000..71e45ef55 --- /dev/null +++ b/aclk/schema-wrappers/node_creation.h @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_CREATION_H +#define ACLK_SCHEMA_WRAPPER_NODE_CREATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char* claim_id; + const char* machine_guid; + const char* hostname; + + int32_t hops; +} node_instance_creation_t; + +typedef struct { + char *node_id; + char *machine_guid; +} node_instance_creation_result_t; + +char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data); +node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_CREATION_H */ diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc new file mode 100644 index 000000000..f6638aa5f --- /dev/null +++ b/aclk/schema-wrappers/node_info.cc @@ -0,0 +1,95 @@ +#include "node_info.h" + +#include "proto/nodeinstance/info/v1/info.pb.h" + +#include "schema_wrapper_utils.h" + +static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data) +{ + struct label *label; + google::protobuf::Map<std::string, std::string> *map; + + if (data->name) + info->set_name(data->name); + + if (data->os) + info->set_os(data->os); + if (data->os_name) + info->set_os_name(data->os_name); + if (data->os_version) + info->set_os_version(data->os_version); + + if (data->kernel_name) + info->set_kernel_name(data->kernel_name); + if (data->kernel_version) + info->set_kernel_version(data->kernel_version); + + if (data->architecture) + info->set_architecture(data->architecture); + + info->set_cpus(data->cpus); + + if (data->cpu_frequency) + info->set_cpu_frequency(data->cpu_frequency); + + if (data->memory) + info->set_memory(data->memory); + + if (data->disk_space) + info->set_disk_space(data->disk_space); + + if (data->version) + info->set_version(data->version); + + if (data->release_channel) + info->set_release_channel(data->release_channel); + + if (data->timezone) + info->set_timezone(data->timezone); + + if (data->virtualization_type) + info->set_virtualization_type(data->virtualization_type); + + if (data->container_type) + info->set_container_type(data->container_type); + + if (data->custom_info) + info->set_custom_info(data->custom_info); + + for (size_t i = 0; i < data->service_count; i++) + info->add_services(data->services[i]); + + if (data->machine_guid) + info->set_machine_guid(data->machine_guid); + + map = info->mutable_host_labels(); + label = data->host_labels_head; + while (label) { + map->insert({label->key, label->value}); + label = label->next; + } + + return 0; +} + +char *generate_update_node_info_message(size_t *len, struct update_node_info *info) +{ + nodeinstance::info::v1::UpdateNodeInfo msg; + + msg.set_node_id(info->node_id); + msg.set_claim_id(info->claim_id); + + if (generate_node_info(msg.mutable_data(), &info->data)) + return NULL; + + set_google_timestamp_from_timeval(info->updated_at, msg.mutable_updated_at()); + msg.set_machine_guid(info->machine_guid); + msg.set_child(info->child); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)malloc(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h new file mode 100644 index 000000000..4acb671a5 --- /dev/null +++ b/aclk/schema-wrappers/node_info.h @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_INFO_H +#define ACLK_SCHEMA_WRAPPER_NODE_INFO_H + +#include <stdlib.h> + +#include "database/rrd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct aclk_node_info { + char *name; + + char *os; + char *os_name; + char *os_version; + + char *kernel_name; + char *kernel_version; + + char *architecture; + + uint32_t cpus; + + char *cpu_frequency; + + char *memory; + + char *disk_space; + + char *version; + + char *release_channel; + + char *timezone; + + char *virtualization_type; + + char *container_type; + + char *custom_info; + + char **services; + size_t service_count; + + char *machine_guid; + + struct label *host_labels_head; +}; + +struct update_node_info { + char *node_id; + char *claim_id; + struct aclk_node_info data; + struct timeval updated_at; + char *machine_guid; + int child; +}; + +char *generate_update_node_info_message(size_t *len, struct update_node_info *info); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_INFO_H */ diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/aclk/schema-wrappers/schema_wrapper_utils.cc new file mode 100644 index 000000000..b100e20c3 --- /dev/null +++ b/aclk/schema-wrappers/schema_wrapper_utils.cc @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "schema_wrapper_utils.h" + +void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts) +{ + ts->set_nanos(tv.tv_usec*1000); + ts->set_seconds(tv.tv_sec); +} + +void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv) +{ + tv->tv_sec = ts.seconds(); + tv->tv_usec = ts.nanos()/1000; +} diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h new file mode 100644 index 000000000..494855f82 --- /dev/null +++ b/aclk/schema-wrappers/schema_wrapper_utils.h @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef SCHEMA_WRAPPER_UTILS_H +#define SCHEMA_WRAPPER_UTILS_H + +#include <sys/time.h> +#include <google/protobuf/timestamp.pb.h> + +#if GOOGLE_PROTOBUF_VERSION < 3001000 +#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize(); +#define PROTO_COMPAT_MSG_SIZE_PTR(msg) (size_t)msg->ByteSize(); +#else +#define PROTO_COMPAT_MSG_SIZE(msg) msg.ByteSizeLong(); +#define PROTO_COMPAT_MSG_SIZE_PTR(msg) msg->ByteSizeLong(); +#endif + +void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts); +void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv); + +#endif /* SCHEMA_WRAPPER_UTILS_H */ diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h new file mode 100644 index 000000000..a3975fca3 --- /dev/null +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +// utility header to include all the message wrappers at once + +#ifndef SCHEMA_WRAPPERS_H +#define SCHEMA_WRAPPERS_H + +#include "connection.h" +#include "node_connection.h" +#include "node_creation.h" +#include "chart_config.h" +#include "chart_stream.h" +#include "alarm_config.h" +#include "alarm_stream.h" +#include "node_info.h" + +#endif /* SCHEMA_WRAPPERS_H */ |