diff options
Diffstat (limited to '')
53 files changed, 7509 insertions, 0 deletions
diff --git a/aclk/README.md b/aclk/README.md new file mode 100644 index 0000000..af0f5fd --- /dev/null +++ b/aclk/README.md @@ -0,0 +1,147 @@ +<!-- +title: "Agent-Cloud link (ACLK)" +description: "The Agent-Cloud link (ACLK) is the mechanism responsible for connecting a Netdata agent to Netdata Cloud." +date: 2020-05-11 +custom_edit_url: https://github.com/netdata/netdata/edit/master/aclk/README.md +--> + +# Agent-cloud link (ACLK) + +The Agent-Cloud link (ACLK) is the mechanism responsible for securely connecting a Netdata Agent to your web browser +through Netdata Cloud. The ACLK establishes an outgoing secure WebSocket (WSS) connection to Netdata Cloud on port +`443`. The ACLK is encrypted, safe, and _is only established if you connect your node_. + +The Cloud App lives at app.netdata.cloud which currently resolves to the following list of IPs: + +- 54.198.178.11 +- 44.207.131.212 +- 44.196.50.41 + +:::caution + +This list of IPs can change without notice, we strongly advise you to whitelist following domains `api.netdata.cloud`, `mqtt.netdata.cloud`, if +this is not an option in your case always verify the current domain resolution (e.g via the `host` command). + +::: + +For a guide to connecting a node using the ACLK, plus additional troubleshooting and reference information, read our [get +started with Cloud](https://learn.netdata.cloud/docs/cloud/get-started) guide or the full [connect to Cloud +documentation](/claim/README.md). + +## Data privacy +[Data privacy](https://netdata.cloud/privacy/) is very important to us. We firmly believe that your data belongs to +you. This is why **we don't store any metric data in Netdata Cloud**. + +All the data that you see in the web browser when using Netdata Cloud, is actually streamed directly from the Netdata Agent to the Netdata Cloud dashboard. +The data passes through our systems, but it isn't stored. + +However, to be able to offer the stunning visualizations and advanced functionality of Netdata Cloud, it does store a limited number of _metadata_. + +Read more about [Data privacy in the Netdata Cloud](https://learn.netdata.cloud/docs/cloud/data-privacy) in the documentation. + + +## Enable and configure the ACLK + +The ACLK is enabled by default, with its settings automatically configured and stored in the Agent's memory. No file is +created at `/var/lib/netdata/cloud.d/cloud.conf` until you either connect a node or create it yourself. The default +configuration uses two settings: + +```conf +[global] + enabled = yes + cloud base url = https://api.netdata.cloud +``` + +If your Agent needs to use a proxy to access the internet, you must [set up a proxy for +connecting to cloud](/claim/README.md#connect-through-a-proxy). + +You can configure following keys in the `netdata.conf` section `[cloud]`: +``` +[cloud] + statistics = yes + query thread count = 2 +``` + +- `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent. +- `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries). + +## Disable the ACLK + +You have two options if you prefer to disable the ACLK and not use Netdata Cloud. + +### Disable at installation + +You can pass the `--disable-cloud` parameter to the Agent installation when using a kickstart script +([kickstart.sh](/packaging/installer/methods/kickstart.md), or a [manual installation from +Git](/packaging/installer/methods/manual.md). + +When you pass this parameter, the installer does not download or compile any extra libraries. Once running, the Agent +kills the thread responsible for the ACLK and connecting behavior, and behaves as though the ACLK, and thus Netdata Cloud, +does not exist. + +### Disable at runtime + +You can change a runtime setting in your `cloud.conf` file to disable the ACLK. This setting only stops the Agent from +attempting any connection via the ACLK, but does not prevent the installer from downloading and compiling the ACLK's +dependencies. + +The file typically exists at `/var/lib/netdata/cloud.d/cloud.conf`, but can change if you set a prefix during +installation. To disable the ACLK, open that file and change the `enabled` setting to `no`: + +```conf +[global] + enabled = no +``` + +If the file at `/var/lib/netdata/cloud.d/cloud.conf` doesn't exist, you need to create it. + +Copy and paste the first two lines from below, which will change your prompt to `cat`. + +```bash +cd /var/lib/netdata/cloud.d +cat > cloud.conf << EOF +``` + +Copy and paste in lines 3-6, and after the final `EOF`, hit **Enter**. The final line must contain only `EOF`. Hit **Enter** again to return to your normal prompt with the newly-created file. + +To get your normal prompt back, the final line +must contain only `EOF`. + +```bash +[global] + enabled = no + cloud base url = https://api.netdata.cloud +EOF +``` + +You also need to change the file's permissions. Use `grep "run as user" /etc/netdata/netdata.conf` to figure out which +user your Agent runs as (typically `netdata`), and replace `netdata:netdata` as shown below if necessary: + +```bash +sudo chmod 0770 cloud.conf +sudo chown netdata:netdata cloud.conf +``` + +Restart your Agent to disable the ACLK. + +### Re-enable the ACLK + +If you first disable the ACLK and any Cloud functionality and then decide you would like to use Cloud, you must either +[reinstall Netdata](/packaging/installer/REINSTALL.md) with Cloud enabled or change the runtime setting in your +`cloud.conf` file. + +If you passed `--disable-cloud` to `netdata-installer.sh` during installation, you must +[reinstall](/packaging/installer/REINSTALL.md) your Agent. Use the same method as before, but pass `--require-cloud` to +the installer. When installation finishes you can [connect your node](/claim/README.md#how-to-connect-a-node). + +If you changed the runtime setting in your `var/lib/netdata/cloud.d/cloud.conf` file, edit the file again and change +`enabled` to `yes`: + +```conf +[global] + enabled = yes +``` + +Restart your Agent and [connect your node](/claim/README.md#how-to-connect-a-node). + + diff --git a/aclk/aclk.c b/aclk/aclk.c new file mode 100644 index 0000000..3b035b8 --- /dev/null +++ b/aclk/aclk.c @@ -0,0 +1,1173 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk.h" + +#ifdef ENABLE_ACLK +#include "aclk_stats.h" +#include "mqtt_wss_client.h" +#include "aclk_otp.h" +#include "aclk_tx_msgs.h" +#include "aclk_query.h" +#include "aclk_query_queue.h" +#include "aclk_util.h" +#include "aclk_rx_msgs.h" +#include "https_client.h" +#include "schema-wrappers/schema_wrappers.h" +#include "aclk_capas.h" + +#include "aclk_proxy.h" + +#ifdef ACLK_LOG_CONVERSATION_DIR +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#endif + +#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable + +#endif /* ENABLE_ACLK */ + +int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +int aclk_rcvd_cloud_msgs = 0; +int aclk_connection_counter = 0; +int disconnect_req = 0; + +int aclk_connected = 0; +int aclk_ctx_based = 0; +int aclk_disable_runtime = 0; +int aclk_stats_enabled; +int aclk_kill_link = 0; + +usec_t aclk_session_us = 0; +time_t aclk_session_sec = 0; + +time_t last_conn_time_mqtt = 0; +time_t last_conn_time_appl = 0; +time_t last_disconnect_time = 0; +time_t next_connection_attempt = 0; +float last_backoff_value = 0; + +time_t aclk_block_until = 0; + +#ifdef ENABLE_ACLK +mqtt_wss_client mqttwss_client; + +netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +struct aclk_shared_state aclk_shared_state = { + .mqtt_shutdown_msg_id = -1, + .mqtt_shutdown_msg_rcvd = 0 +}; + +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +OSSL_DECODER_CTX *aclk_dctx = NULL; +EVP_PKEY *aclk_private_key = NULL; +#else +static RSA *aclk_private_key = NULL; +#endif +static int load_private_key() +{ + if (aclk_private_key != NULL) { +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + EVP_PKEY_free(aclk_private_key); + if (aclk_dctx) + OSSL_DECODER_CTX_free(aclk_dctx); + + aclk_dctx = NULL; +#else + RSA_free(aclk_private_key); +#endif + } + aclk_private_key = NULL; + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); + + long bytes_read; + char *private_key = read_by_filename(filename, &bytes_read); + if (!private_key) { + error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + return 1; + } + debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + + BIO *key_bio = BIO_new_mem_buf(private_key, -1); + if (key_bio==NULL) { + error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + goto biofailed; + } + +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL, + "RSA", + OSSL_KEYMGMT_SELECT_PRIVATE_KEY, + NULL, NULL); + + if (!aclk_dctx) { + error("Loading private key (from claiming) failed - no OpenSSL Decoders found"); + goto biofailed; + } + + // this is necesseary to avoid RSA key with wrong size + if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) { + error("Decoding private key (from claiming) failed - invalid format."); + goto biofailed; + } +#else + aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); +#endif + BIO_free(key_bio); + if (aclk_private_key!=NULL) + { + freez(private_key); + return 0; + } + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + +biofailed: + freez(private_key); + return 1; +} + +static int wait_till_cloud_enabled() +{ + info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_setting) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + return 1; + } + return 0; +} + +/** + * Will block until agent is claimed. Returns only if agent claimed + * or if agent needs to shutdown. + * + * @return `0` if agent has been claimed, + * `1` if interrupted due to agent shutting down + */ +static int wait_till_agent_claimed(void) +{ + //TODO prevent malloc and freez + char *agent_id = get_agent_claimid(); + while (likely(!agent_id)) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + return 1; + agent_id = get_agent_claimid(); + } + freez(agent_id); + return 0; +} + +/** + * Checks everything is ready for connection + * agent claimed, cloud url set and private key available + * + * @param aclk_hostname points to location where string pointer to hostname will be set + * @param aclk_port port to int where port will be saved + * + * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated + */ +static int wait_till_agent_claim_ready() +{ + url_t url; + while (!netdata_exit) { + if (wait_till_agent_claimed()) + return 1; + + // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. + // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); + return 1; + } + + // We just check configuration is valid here + // TODO make it without malloc/free + memset(&url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &url)) { + error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix"); + url_t_destroy(&url); + sleep(5); + continue; + } + url_t_destroy(&url); + + if (!load_private_key()) + return 0; + + sleep(5); + } + + return 1; +} + +void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) +{ + switch(log_type) { + case MQTT_WSS_LOG_ERROR: + case MQTT_WSS_LOG_FATAL: + case MQTT_WSS_LOG_WARN: + error_report("%s", str); + return; + case MQTT_WSS_LOG_INFO: + info("%s", str); + return; + case MQTT_WSS_LOG_DEBUG: + debug(D_ACLK, "%s", str); + return; + default: + error("Unknown log type from mqtt_wss"); + } +} + +//TODO prevent big buffer on stack +#define RX_MSGLEN_MAX 4096 +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) +{ + UNUSED(qos); + aclk_rcvd_cloud_msgs++; + if (msglen > RX_MSGLEN_MAX) + error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); + + debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); + + if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { + error("Link is shutting down. Ignoring incoming message."); + return; + } + + const char *msgtype = strrchr(topic, '/'); + if (unlikely(!msgtype)) { + error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic); + return; + } + msgtype++; + if (unlikely(!*msgtype)) { + error_report("Message type empty. Ignoring message from topic \"%s\"", topic); + return; + } + +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 512 + char filename[FN_MAX_LEN]; + int logfd; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype); + logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); + if(logfd < 0) + error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + write(logfd, msg, msglen); + close(logfd); +#endif + + aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic); +} + +static void puback_callback(uint16_t packet_id) +{ + if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) { + last_conn_time_appl = now_realtime_sec(); + aclk_tbeb_reset(); + } + +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_puback(packet_id); +#endif + + if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { + info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); + aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; + } +} + +static int read_query_thread_count() +{ + int threads = MIN(processors/2, 6); + threads = MAX(threads, 2); + threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + if(threads < 1) { + error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); + threads = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + } + return threads; +} + +void aclk_graceful_disconnect(mqtt_wss_client client); + +/* Keeps connection alive and handles all network communications. + * Returns on error or when netdata is shutting down. + * @param client instance of mqtt_wss_client + * @returns 0 - Netdata Exits + * >0 - Error happened. Reconnect and start over. + */ +static int handle_connection(mqtt_wss_client client) +{ + time_t last_periodic_query_wakeup = now_monotonic_sec(); + while (!netdata_exit) { + // timeout 1000 to check at least once a second + // for netdata_exit + if (mqtt_wss_service(client, 1000) < 0){ + error_report("Connection Error or Dropped"); + return 1; + } + + if (disconnect_req || aclk_kill_link) { + info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", + disconnect_req ? "true" : "false", + aclk_kill_link ? "true" : "false"); + disconnect_req = 0; + aclk_kill_link = 0; + aclk_graceful_disconnect(client); + aclk_queue_unlock(); + aclk_shared_state.mqtt_shutdown_msg_id = -1; + aclk_shared_state.mqtt_shutdown_msg_rcvd = 0; + return 1; + } + + // mqtt_wss_service will return faster than in one second + // if there is enough work to do + time_t now = now_monotonic_sec(); + if (last_periodic_query_wakeup < now) { + // wake up at least one Query Thread at least + // once per second + last_periodic_query_wakeup = now; + QUERY_THREAD_WAKEUP; + } + } + return 0; +} + +static inline void mqtt_connected_actions(mqtt_wss_client client) +{ + char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND); + + if (!topic) + error("Unable to fetch topic for COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); + + topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1); + if (!topic) + error("Unable to fetch topic for protobuf COMMAND (to subscribe)"); + else + mqtt_wss_subscribe(client, topic, 1); + + aclk_stats_upd_online(1); + aclk_connected = 1; + aclk_pubacks_per_conn = 0; + aclk_rcvd_cloud_msgs = 0; + aclk_connection_counter++; + + aclk_send_agent_connection_update(client, 1); +} + +void aclk_graceful_disconnect(mqtt_wss_client client) +{ + info("Preparing to gracefully shutdown ACLK connection"); + aclk_queue_lock(); + aclk_queue_flush(); + + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0); + + time_t t = now_monotonic_sec(); + while (!mqtt_wss_service(client, 100)) { + if (now_monotonic_sec() - t >= 2) { + error("Wasn't able to gracefully shutdown ACLK in time!"); + break; + } + if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { + info("MQTT App Layer `disconnect` message sent successfully"); + break; + } + } + info("ACLK link is down"); + log_access("ACLK DISCONNECTED"); + aclk_stats_upd_online(0); + last_disconnect_time = now_realtime_sec(); + aclk_connected = 0; + + info("Attempting to gracefully shutdown the MQTT/WSS connection"); + mqtt_wss_disconnect(client, 1000); +} + +static unsigned long aclk_reconnect_delay() { + unsigned long recon_delay; + time_t now; + + if (aclk_disable_runtime) { + aclk_tbeb_reset(); + return 60 * MSEC_PER_SEC; + } + + now = now_monotonic_sec(); + if (aclk_block_until) { + if (now < aclk_block_until) { + recon_delay = aclk_block_until - now; + recon_delay *= MSEC_PER_SEC; + aclk_block_until = 0; + aclk_tbeb_reset(); + return recon_delay; + } + aclk_block_until = 0; + } + + if (!aclk_env || !aclk_env->backoff.base) + return aclk_tbeb_delay(0, 2, 0, 1024); + + return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s); +} + +/* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled + * @return 0 - Go ahead and connect (delay expired) + * 1 - netdata_exit + */ +#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4) +static int aclk_block_till_recon_allowed() { + unsigned long recon_delay = aclk_reconnect_delay(); + + next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC); + last_backoff_value = (float)recon_delay / MSEC_PER_SEC; + + info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC); + // we want to wake up from time to time to check netdata_exit + while (recon_delay) + { + if (netdata_exit) + return 1; + if (recon_delay > NETDATA_EXIT_POLL_MS) { + sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS); + recon_delay -= NETDATA_EXIT_POLL_MS; + continue; + } + sleep_usec(recon_delay * USEC_PER_MS); + recon_delay = 0; + } + return netdata_exit; +} + +#ifndef ACLK_DISABLE_CHALLENGE +/* Cloud returns transport list ordered with highest + * priority first. This function selects highest prio + * transport that we can actually use (support) + */ +static int aclk_get_transport_idx(aclk_env_t *env) { + for (size_t i = 0; i < env->transport_count; i++) { + // currently we support only MQTT 5 + // therefore select first transport that matches + if (env->transports[i]->type == ACLK_TRP_MQTT_5) { + return i; + } + } + return -1; +} +#endif + +/* Attempts to make a connection to MQTT broker over WSS + * @param client instance of mqtt_wss_client + * @return 0 - Successful Connection, + * <0 - Irrecoverable Error -> Kill ACLK, + * >0 - netdata_exit + */ +#define CLOUD_BASE_URL_READ_RETRY 30 +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED +#define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED +#else +#define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL +#endif +static int aclk_attempt_to_connect(mqtt_wss_client client) +{ + int ret; + + url_t base_url; + +#ifndef ACLK_DISABLE_CHALLENGE + url_t auth_url; + url_t mqtt_url; +#endif + + while (!netdata_exit) { + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error_report("Do not move the cloud base url out of post_conf_load!!"); + return -1; + } + + if (aclk_block_till_recon_allowed()) + return 1; + + info("Attempting connection now"); + memset(&base_url, 0, sizeof(url_t)); + if (url_parse(cloud_base_url, &base_url)) { + error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); + sleep(CLOUD_BASE_URL_READ_RETRY); + url_t_destroy(&base_url); + continue; + } + + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT }; + aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); + + struct mqtt_connect_params mqtt_conn_params = { + .clientid = "anon", + .username = "anon", + .password = "anon", + .will_topic = "lwt", + .will_msg = NULL, + .will_flags = MQTT_WSS_PUB_QOS2, + .keep_alive = 60, + .drop_on_publish_fail = 1 + }; + +#ifndef ACLK_DISABLE_CHALLENGE + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } + aclk_env = callocz(1, sizeof(aclk_env_t)); + + ret = aclk_get_env(aclk_env, base_url.host, base_url.port); + url_t_destroy(&base_url); + if (ret) { + error_report("Failed to Get ACLK environment"); + // delay handled by aclk_block_till_recon_allowed + continue; + } + + if (netdata_exit) + return 1; + + if (aclk_env->encoding != ACLK_ENC_PROTO) { + error_report("This agent can only use the new cloud protocol but cloud requested old one."); + continue; + } + + if (!aclk_env_has_capa("proto")) { + error_report("Can't use encoding=proto without at least \"proto\" capability."); + continue; + } + info("New ACLK protobuf protocol negotiated successfully (/env response)."); + + memset(&auth_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->auth_endpoint, &auth_url)) { + error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint); + url_t_destroy(&auth_url); + continue; + } + + ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url); + url_t_destroy(&auth_url); + if (ret) { + error_report("Error passing Challenge/Response to get OTP"); + continue; + } + + // aclk_get_topic moved here as during OTP we + // generate the topic cache + mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN); + + if (!mqtt_conn_params.will_topic) { + error_report("Couldn't get LWT topic. Will not send LWT."); + continue; + } + + // Do the MQTT connection + ret = aclk_get_transport_idx(aclk_env); + if (ret < 0) { + error_report("Cloud /env endpoint didn't return any transport usable by this Agent."); + continue; + } + + memset(&mqtt_url, 0, sizeof(url_t)); + if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){ + error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint); + url_t_destroy(&mqtt_url); + continue; + } +#endif + + aclk_session_newarch = now_realtime_usec(); + aclk_session_sec = aclk_session_newarch / USEC_PER_SEC; + aclk_session_us = aclk_session_newarch % USEC_PER_SEC; + + mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len); + +#ifdef ACLK_DISABLE_CHALLENGE + ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&base_url); +#else + ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf); + url_t_destroy(&mqtt_url); + + freez((char*)mqtt_conn_params.clientid); + freez((char*)mqtt_conn_params.password); + freez((char*)mqtt_conn_params.username); +#endif + + freez((char *)mqtt_conn_params.will_msg); + + if (!ret) { + last_conn_time_mqtt = now_realtime_sec(); + info("ACLK connection successfully established"); + log_access("ACLK CONNECTED"); + mqtt_connected_actions(client); + return 0; + } + + error_report("Connect failed"); + } + + return 1; +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + + struct aclk_stats_thread *stats_thread = NULL; + + struct aclk_query_threads query_threads; + query_threads.thread_list = NULL; + + ACLK_PROXY_TYPE proxy_type; + aclk_get_proxy(&proxy_type); + if (proxy_type == PROXY_TYPE_SOCKS5) { + error("SOCKS5 proxy is not supported by ACLK-NG yet."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } + + unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers(); + + // This thread is unusual in that it cannot be cancelled by cancel_main_threads() + // as it must notify the far end that it shutdown gracefully and avoid the LWT. + netdata_thread_disable_cancelability(); + +#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) + info("Killing ACLK thread -> cloud functionality has been disabled"); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +#endif + query_threads.count = read_query_thread_count(); + + if (wait_till_cloud_enabled()) + goto exit; + + if (wait_till_agent_claim_ready()) + goto exit; + + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) { + error("Couldn't initialize MQTT_WSS network library"); + goto exit; + } + + // Enable MQTT buffer growth if necessary + // e.g. old cloud architecture clients with huge nodes + // that send JSON payloads of 10 MB as single messages + mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024); + + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled); + if (aclk_stats_enabled) { + stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); + stats_thread->thread = mallocz(sizeof(netdata_thread_t)); + stats_thread->query_thread_count = query_threads.count; + stats_thread->client = mqttwss_client; + aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); + netdata_thread_create( + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread); + } + + // Keep reconnecting and talking until our time has come + // and the Grim Reaper (netdata_exit) calls + do { + if (aclk_attempt_to_connect(mqttwss_client)) + goto exit_full; + + if (unlikely(!query_threads.thread_list)) + aclk_query_threads_start(&query_threads, mqttwss_client); + + if (handle_connection(mqttwss_client)) { + aclk_stats_upd_online(0); + last_disconnect_time = now_realtime_sec(); + aclk_connected = 0; + log_access("ACLK DISCONNECTED"); + } + } while (!netdata_exit); + + aclk_graceful_disconnect(mqttwss_client); + +exit_full: +// Tear Down + QUERY_THREAD_WAKEUP_ALL; + + aclk_query_threads_cleanup(&query_threads); + + if (aclk_stats_enabled) { + netdata_thread_join(*stats_thread->thread, NULL); + aclk_stats_thread_cleanup(); + freez(stats_thread->thread); + freez(stats_thread); + } + free_topic_cache(); + mqtt_wss_destroy(mqttwss_client); +exit: + if (aclk_env) { + aclk_env_t_destroy(aclk_env); + freez(aclk_env); + } + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +} + +void aclk_host_state_update(RRDHOST *host, int cmd) +{ + uuid_t node_id; + int ret; + + if (!aclk_connected) + return; + + ret = get_node_id(&host->host_uuid, &node_id); + if (ret > 0) { + // this means we were not able to check if node_id already present + error("Unable to check for node_id. Ignoring the host state update."); + return; + } + if (ret < 0) { + // node_id not found + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + rrdhost_aclk_state_lock(localhost); + node_instance_creation_t node_instance_creation = { + .claim_id = localhost->aclk_state.claimed_id, + .hops = host->system_info->hops, + .hostname = rrdhost_hostname(host), + .machine_guid = host->machine_guid + }; + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); + rrdhost_aclk_state_unlock(localhost); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; + info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops); + aclk_queue_query(create_query); + return; + } + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + node_instance_connection_t node_state_update = { + .hops = host->system_info->hops, + .live = cmd, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(node_id, (char*)node_state_update.node_id); + + node_state_update.capabilities = aclk_get_agent_capas(); + + rrdhost_aclk_state_lock(localhost); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + rrdhost_aclk_state_unlock(localhost); + + info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, + host->system_info->hops); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); +} + +void aclk_send_node_instances() +{ + struct node_instance_list *list_head = get_node_list(); + struct node_instance_list *list = list_head; + if (unlikely(!list)) { + error_report("Failure to get_node_list from DB!"); + return; + } + while (!uuid_is_null(list->host_id)) { + if (!uuid_is_null(list->node_id)) { + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + node_instance_connection_t node_state_update = { + .live = list->live, + .hops = list->hops, + .queryable = 1, + .session_id = aclk_session_newarch + }; + node_state_update.node_id = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id); + + char host_id[UUID_STR_LEN]; + uuid_unparse_lower(list->host_id, host_id); + + RRDHOST *host = rrdhost_find_by_guid(host_id); + node_state_update.capabilities = aclk_get_node_instance_capas(host); + + rrdhost_aclk_state_lock(localhost); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + rrdhost_aclk_state_unlock(localhost); + info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, + list->live, + list->hops); + + freez((void*)node_state_update.capabilities); + freez((void*)node_state_update.node_id); + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + aclk_queue_query(query); + } else { + aclk_query_t create_query; + create_query = aclk_query_new(REGISTER_NODE); + node_instance_creation_t node_instance_creation = { + .hops = list->hops, + .hostname = list->hostname, + }; + node_instance_creation.machine_guid = mallocz(UUID_STR_LEN); + uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid); + create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; + create_query->data.bin_payload.msg_name = "CreateNodeInstance"; + rrdhost_aclk_state_lock(localhost); + node_instance_creation.claim_id = localhost->aclk_state.claimed_id, + create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); + rrdhost_aclk_state_unlock(localhost); + info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, + list->hops); + freez((void *)node_instance_creation.machine_guid); + aclk_queue_query(create_query); + } + freez(list->hostname); + + list++; + } + freez(list_head); +} + +void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +{ + aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname); +} + +static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) { + buffer_strcat(wb, "\nFailed to get alert streaming status for this host"); + return; + } + buffer_sprintf(wb, + "\n\t\tUpdates: %d" + "\n\t\tBatch ID: %"PRIu64 + "\n\t\tLast Acked Seq ID: %"PRIu64 + "\n\t\tPending Min Seq ID: %"PRIu64 + "\n\t\tPending Max Seq ID: %"PRIu64 + "\n\t\tLast Submitted Seq ID: %"PRIu64, + status.alert_updates, + status.alerts_batch_id, + status.last_acked_sequence_id, + status.pending_min_sequence_id, + status.pending_max_sequence_id, + status.last_submitted_sequence_id + ); +} +#endif /* ENABLE_ACLK */ + +char *aclk_state(void) +{ +#ifndef ENABLE_ACLK + return strdupz("ACLK Available: No"); +#else + BUFFER *wb = buffer_create(1024); + struct tm *tmptr, tmbuf; + char *ret; + + buffer_strcat(wb, + "ACLK Available: Yes\n" + "ACLK Version: 2\n" + "Protocols Supported: Protobuf\n" + ); + buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5); + + char *agent_id = get_agent_claimid(); + if (agent_id == NULL) + buffer_strcat(wb, "No\n"); + else { + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null"); + freez(agent_id); + } + + buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No"); + if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf); + } + if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf); + } + if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf); + } + if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value); + } + + if (aclk_connected) { + buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn); + + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host)); + + buffer_strcat(wb, "\tClaimed ID: "); + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + buffer_strcat(wb, host->aclk_state.claimed_id); + else + buffer_strcat(wb, "null"); + rrdhost_aclk_state_unlock(host); + + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + buffer_strcat(wb, "\n\tNode ID: null\n"); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id); + } + + buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child"); + + if (host != localhost) + buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false"); + + buffer_strcat(wb, "\n\tAlert Streaming Status:"); + fill_alert_status_for_host(wb, host); + } + rrd_unlock(); + } + + ret = strdupz(buffer_tostring(wb)); + buffer_free(wb); + return ret; +#endif /* ENABLE_ACLK */ +} + +#ifdef ENABLE_ACLK +static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) +{ + struct proto_alert_status status; + memset(&status, 0, sizeof(status)); + if (get_proto_alert_status(host, &status)) + return; + + json_object *tmp = json_object_new_int(status.alert_updates); + json_object_object_add(obj, "updates", tmp); + + tmp = json_object_new_int(status.alerts_batch_id); + json_object_object_add(obj, "batch-id", tmp); + + tmp = json_object_new_int(status.last_acked_sequence_id); + json_object_object_add(obj, "last-acked-seq-id", tmp); + + tmp = json_object_new_int(status.pending_min_sequence_id); + json_object_object_add(obj, "pending-min-seq-id", tmp); + + tmp = json_object_new_int(status.pending_max_sequence_id); + json_object_object_add(obj, "pending-max-seq-id", tmp); + + tmp = json_object_new_int(status.last_submitted_sequence_id); + json_object_object_add(obj, "last-submitted-seq-id", tmp); +} + +static json_object *timestamp_to_json(const time_t *t) +{ + struct tm *tmptr, tmbuf; + if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) { + char timebuf[26]; + strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); + return json_object_new_string(timebuf); + } + return NULL; +} +#endif /* ENABLE_ACLK */ + +char *aclk_state_json(void) +{ +#ifndef ENABLE_ACLK + return strdupz("{\"aclk-available\":false}"); +#else + json_object *tmp, *grp, *msg = json_object_new_object(); + + tmp = json_object_new_boolean(1); + json_object_object_add(msg, "aclk-available", tmp); + + tmp = json_object_new_int(2); + json_object_object_add(msg, "aclk-version", tmp); + + grp = json_object_new_array(); + tmp = json_object_new_string("Protobuf"); + json_object_array_add(grp, tmp); + json_object_object_add(msg, "protocols-supported", grp); + + char *agent_id = get_agent_claimid(); + tmp = json_object_new_boolean(agent_id != NULL); + json_object_object_add(msg, "agent-claimed", tmp); + + if (agent_id) { + tmp = json_object_new_string(agent_id); + freez(agent_id); + } else + tmp = NULL; + json_object_object_add(msg, "claimed-id", tmp); + + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL; + json_object_object_add(msg, "cloud-url", tmp); + + tmp = json_object_new_boolean(aclk_connected); + json_object_object_add(msg, "online", tmp); + + tmp = json_object_new_string("Protobuf"); + json_object_object_add(msg, "used-cloud-protocol", tmp); + + tmp = json_object_new_int(5); + json_object_object_add(msg, "mqtt-version", tmp); + + tmp = json_object_new_int(aclk_rcvd_cloud_msgs); + json_object_object_add(msg, "received-app-layer-msgs", tmp); + + tmp = json_object_new_int(aclk_pubacks_per_conn); + json_object_object_add(msg, "received-mqtt-pubacks", tmp); + + tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0); + json_object_object_add(msg, "reconnect-count", tmp); + + json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt)); + json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl)); + json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time)); + json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL); + tmp = NULL; + if (!aclk_connected && last_backoff_value) + tmp = json_object_new_double(last_backoff_value); + json_object_object_add(msg, "last-backoff-value", tmp); + + tmp = json_object_new_boolean(aclk_disable_runtime); + json_object_object_add(msg, "banned-by-cloud", tmp); + + grp = json_object_new_array(); + + RRDHOST *host; + rrd_rdlock(); + rrdhost_foreach_read(host) { + json_object *nodeinstance = json_object_new_object(); + + tmp = json_object_new_string(rrdhost_hostname(host)); + json_object_object_add(nodeinstance, "hostname", tmp); + + tmp = json_object_new_string(host->machine_guid); + json_object_object_add(nodeinstance, "mguid", tmp); + + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) { + tmp = json_object_new_string(host->aclk_state.claimed_id); + json_object_object_add(nodeinstance, "claimed_id", tmp); + } else + json_object_object_add(nodeinstance, "claimed_id", NULL); + rrdhost_aclk_state_unlock(host); + + if (host->node_id == NULL || uuid_is_null(*host->node_id)) { + json_object_object_add(nodeinstance, "node-id", NULL); + } else { + char node_id[GUID_LEN + 1]; + uuid_unparse_lower(*host->node_id, node_id); + tmp = json_object_new_string(node_id); + json_object_object_add(nodeinstance, "node-id", tmp); + } + + tmp = json_object_new_int(host->system_info->hops); + json_object_object_add(nodeinstance, "streaming-hops", tmp); + + tmp = json_object_new_string(host == localhost ? "self" : "child"); + json_object_object_add(nodeinstance, "relationship", tmp); + + tmp = json_object_new_boolean((host->receiver || host == localhost)); + json_object_object_add(nodeinstance, "streaming-online", tmp); + + tmp = json_object_new_object(); + fill_alert_status_for_host_json(tmp, host); + json_object_object_add(nodeinstance, "alert-sync-status", tmp); + + json_object_array_add(grp, nodeinstance); + } + rrd_unlock(); + json_object_object_add(msg, "node-instances", grp); + + char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN)); + json_object_put(msg); + return str; +#endif /* ENABLE_ACLK */ +} + +void add_aclk_host_labels(void) { + DICTIONARY *labels = localhost->rrdlabels; + +#ifdef ENABLE_ACLK + rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); + ACLK_PROXY_TYPE aclk_proxy; + char *proxy_str; + aclk_get_proxy(&aclk_proxy); + + switch(aclk_proxy) { + case PROXY_TYPE_SOCKS5: + proxy_str = "SOCKS5"; + break; + case PROXY_TYPE_HTTP: + proxy_str = "HTTP"; + break; + default: + proxy_str = "none"; + break; + } + + rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO); + rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); +#else + rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK); +#endif +} + +void aclk_queue_node_info(RRDHOST *host) { + struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker; + if (likely(wc)) { + wc->node_info_send = 1; + } +} diff --git a/aclk/aclk.h b/aclk/aclk.h new file mode 100644 index 0000000..6aed548 --- /dev/null +++ b/aclk/aclk.h @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_H +#define ACLK_H + +#include "daemon/common.h" + +#ifdef ENABLE_ACLK +#include "aclk_util.h" +#include "aclk_rrdhost_state.h" + +// How many MQTT PUBACKs we need to get to consider connection +// stable for the purposes of TBEB (truncated binary exponential backoff) +#define ACLK_PUBACKS_CONN_STABLE 3 +#endif /* ENABLE_ACLK */ + +extern int aclk_connected; +extern int aclk_ctx_based; +extern int aclk_disable_runtime; +extern int aclk_stats_enabled; +extern int aclk_kill_link; + +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; + +extern time_t aclk_block_until; + +extern int disconnect_req; + +#ifdef ENABLE_ACLK +void *aclk_main(void *ptr); + +extern netdata_mutex_t aclk_shared_state_mutex; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +extern struct aclk_shared_state { + // To wait for `disconnect` message PUBACK + // when shutting down + // at the same time if > 0 we know link is + // shutting down + int mqtt_shutdown_msg_id; + int mqtt_shutdown_msg_rcvd; +} aclk_shared_state; + +void aclk_host_state_update(RRDHOST *host, int cmd); +void aclk_send_node_instances(void); + +void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); + +#endif /* ENABLE_ACLK */ + +char *aclk_state(void); +char *aclk_state_json(void); +void add_aclk_host_labels(void); +void aclk_queue_node_info(RRDHOST *host); + +#endif /* ACLK_H */ diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c new file mode 100644 index 0000000..7df51a7 --- /dev/null +++ b/aclk/aclk_alarm_api.c @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_alarm_api.h" + +#include "aclk_query_queue.h" + +#include "aclk_util.h" + +#include "aclk.h" + +void aclk_send_alarm_log_health(struct alarm_log_health *log_health) +{ + aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH); + query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH; + query->data.bin_payload.msg_name = "AlarmLogHealth"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry) +{ + size_t payload_size; + char *payload = generate_alarm_log_entry(&payload_size, log_entry); + + aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry"); +} + +void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg) +{ + aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG); + query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG; + query->data.bin_payload.msg_name = "ProvideAlarmConfiguration"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot) +{ + aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT); + query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT; + query->data.bin_payload.msg_name = "AlarmSnapshot"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_alarm_api.h b/aclk/aclk_alarm_api.h new file mode 100644 index 0000000..e3fa92b --- /dev/null +++ b/aclk/aclk_alarm_api.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_ALARM_API_H +#define ACLK_ALARM_API_H + +#include "../daemon/common.h" +#include "schema-wrappers/schema_wrappers.h" + +void aclk_send_alarm_log_health(struct alarm_log_health *log_health); +void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry); +void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg); +void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot); + +#endif /* ACLK_ALARM_API_H */ diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c new file mode 100644 index 0000000..df9d18f --- /dev/null +++ b/aclk/aclk_capas.c @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_capas.h" + +#include "ml/ml.h" + +const struct capability *aclk_get_agent_capas() +{ + static struct capability agent_capabilities[] = { + { .name = "json", .version = 2, .enabled = 0 }, + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = 0, .enabled = 0 }, + { .name = "mc", .version = 0, .enabled = 0 }, + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 1, .enabled = 1 }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + agent_capabilities[2].version = ml_capable() ? 1 : 0; + agent_capabilities[2].enabled = ml_enabled(localhost); + + agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0; + agent_capabilities[3].enabled = enable_metric_correlations; + + return agent_capabilities; +} + +struct capability *aclk_get_node_instance_capas(RRDHOST *host) +{ + struct capability ni_caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(), .enabled = ml_enabled(host) }, + { .name = "mc", + .version = enable_metric_correlations ? metric_correlations_version : 0, + .enabled = enable_metric_correlations }, + { .name = "ctx", .version = 1, .enabled = 1 }, + { .name = "funcs", .version = 0, .enabled = 0 }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) { + ni_caps[4].version = 1; + ni_caps[4].enabled = 1; + } + + struct capability *ret = mallocz(sizeof(ni_caps)); + memcpy(ret, ni_caps, sizeof(ni_caps)); + return ret; +} diff --git a/aclk/aclk_capas.h b/aclk/aclk_capas.h new file mode 100644 index 0000000..c39a197 --- /dev/null +++ b/aclk/aclk_capas.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_CAPAS_H +#define ACLK_CAPAS_H + +#include "daemon/common.h" +#include "libnetdata/libnetdata.h" + +#include "schema-wrappers/capability.h" + +const struct capability *aclk_get_agent_capas(); +struct capability *aclk_get_node_instance_capas(RRDHOST *host); + +#endif /* ACLK_CAPAS_H */ diff --git a/aclk/aclk_contexts_api.c b/aclk/aclk_contexts_api.c new file mode 100644 index 0000000..f334493 --- /dev/null +++ b/aclk/aclk_contexts_api.c @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_query_queue.h" + +#include "aclk_contexts_api.h" + +void aclk_send_contexts_snapshot(contexts_snapshot_t data) +{ + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT; + query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsSnapshot"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_send_contexts_updated(contexts_updated_t data) +{ + aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE); + query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED; + query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size); + query->data.bin_payload.msg_name = "ContextsUpdated"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_update_node_collectors(struct update_node_collectors *collectors) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS; + query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors); + query->data.bin_payload.msg_name = "UpdateNodeCollectors"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} + +void aclk_update_node_info(struct update_node_info *info) +{ + aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO); + query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO; + query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info); + query->data.bin_payload.msg_name = "UpdateNodeInfo"; + QUEUE_IF_PAYLOAD_PRESENT(query); +} diff --git a/aclk/aclk_contexts_api.h b/aclk/aclk_contexts_api.h new file mode 100644 index 0000000..f0b5ec7 --- /dev/null +++ b/aclk/aclk_contexts_api.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_CONTEXTS_API_H +#define ACLK_CONTEXTS_API_H + +#include "schema-wrappers/schema_wrappers.h" + + +void aclk_send_contexts_snapshot(contexts_snapshot_t data); +void aclk_send_contexts_updated(contexts_updated_t data); +void aclk_update_node_collectors(struct update_node_collectors *collectors); +void aclk_update_node_info(struct update_node_info *info); + +#endif /* ACLK_CONTEXTS_API_H */ + diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c new file mode 100644 index 0000000..2bdbb70 --- /dev/null +++ b/aclk/aclk_otp.c @@ -0,0 +1,888 @@ + +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_otp.h" +#include "aclk_util.h" +#include "aclk.h" + +#include "daemon/common.h" + +#include "mqtt_websockets/c-rbuf/include/ringbuffer.h" + +static int aclk_https_request(https_req_t *request, https_req_response_t *response) { + int rc; + // wrapper for ACLK only which loads ACLK specific proxy settings + // then only calls https_request + struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT }; + aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type); + + if (proxy_conf.type == MQTT_WSS_PROXY_HTTP) { + request->proxy_host = (char*)proxy_conf.host; // TODO make it const as well + request->proxy_port = proxy_conf.port; + } + + rc = https_request(request, response); + freez((char*)proxy_conf.host); + return rc; +} + +struct auth_data { + char *client_id; + char *username; + char *passwd; +}; + +#define PARSE_ENV_JSON_CHK_TYPE(it, type, name) \ + if (json_object_get_type(json_object_iter_peek_value(it)) != type) { \ + error("value of key \"%s\" should be %s", name, #type); \ + goto exit; \ + } + +#define JSON_KEY_CLIENTID "clientID" +#define JSON_KEY_USER "username" +#define JSON_KEY_PASS "password" +#define JSON_KEY_TOPICS "topics" + +static int parse_passwd_response(const char *json_str, struct auth_data *auth) { + int rc = 1; + json_object *json; + struct json_object_iterator it; + struct json_object_iterator itEnd; + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http response of /env endpoint"); + return 1; + } + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CLIENTID)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_CLIENTID) + + auth->client_id = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_USER)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_USER) + + auth->username = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_PASS)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_PASS) + + auth->passwd = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TOPICS)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TOPICS) + + if (aclk_generate_topic_cache(json_object_iter_peek_value(&it))) { + error("Failed to generate topic cache!"); + goto exit; + } + json_object_iter_next(&it); + continue; + } + error("Unknown key \"%s\" in passwd response payload. Ignoring", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + if (!auth->client_id) { + error(JSON_KEY_CLIENTID " is compulsory key in /password response"); + goto exit; + } + if (!auth->passwd) { + error(JSON_KEY_PASS " is compulsory in /password response"); + goto exit; + } + if (!auth->username) { + error(JSON_KEY_USER " is compulsory in /password response"); + goto exit; + } + + rc = 0; +exit: + json_object_put(json); + return rc; +} + +#define JSON_KEY_ERTRY "errorNonRetryable" +#define JSON_KEY_EDELAY "errorRetryDelaySeconds" +#define JSON_KEY_EEC "errorCode" +#define JSON_KEY_EMSGKEY "errorMsgKey" +#define JSON_KEY_EMSG "errorMessage" +#if JSON_C_MINOR_VERSION >= 13 +static const char *get_json_str_by_path(json_object *json, const char *path) { + json_object *ptr; + if (json_pointer_get(json, path, &ptr)) { + error("Missing compulsory key \"%s\" in error response", path); + return NULL; + } + if (json_object_get_type(ptr) != json_type_string) { + error("Value of Key \"%s\" in error response should be string", path); + return NULL; + } + return json_object_get_string(ptr); +} + +static int aclk_parse_otp_error(const char *json_str) { + int rc = 1; + json_object *json, *ptr; + const char *ec; + const char *ek; + const char *emsg; + int block_retry = -1, backoff = -1; + + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http response of /env endpoint"); + return 1; + } + + if ((ec = get_json_str_by_path(json, "/" JSON_KEY_EEC)) == NULL) + goto exit; + + if ((ek = get_json_str_by_path(json, "/" JSON_KEY_EMSGKEY)) == NULL) + goto exit; + + if ((emsg = get_json_str_by_path(json, "/" JSON_KEY_EMSG)) == NULL) + goto exit; + + // optional field + if (!json_pointer_get(json, "/" JSON_KEY_ERTRY, &ptr)) { + if (json_object_get_type(ptr) != json_type_boolean) { + error("Error response Key " "/" JSON_KEY_ERTRY " should be of boolean type"); + goto exit; + } + block_retry = json_object_get_boolean(ptr); + } + + // optional field + if (!json_pointer_get(json, "/" JSON_KEY_EDELAY, &ptr)) { + if (json_object_get_type(ptr) != json_type_int) { + error("Error response Key " "/" JSON_KEY_EDELAY " should be of integer type"); + goto exit; + } + backoff = json_object_get_int(ptr); + } + + if (block_retry > 0) + aclk_disable_runtime = 1; + + if (backoff > 0) + aclk_block_until = now_monotonic_sec() + backoff; + + error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); + rc = 0; +exit: + json_object_put(json); + return rc; +} +#else +static int aclk_parse_otp_error(const char *json_str) { + int rc = 1; + int block_retry = -1, backoff = -1; + + const char *ec = NULL; + const char *ek = NULL; + const char *emsg = NULL; + + json_object *json; + struct json_object_iterator it; + struct json_object_iterator itEnd; + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http response of /env endpoint"); + return 1; + } + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSG)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSG) + + emsg = json_object_get_string(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSGKEY)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSGKEY) + + ek = json_object_get_string(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EEC)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EEC) + + ec = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EDELAY)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_int) { + error("value of key " JSON_KEY_EDELAY " should be integer"); + goto exit; + } + + backoff = json_object_get_int(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ERTRY)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_boolean) { + error("value of key " JSON_KEY_ERTRY " should be integer"); + goto exit; + } + + block_retry = json_object_get_boolean(json_object_iter_peek_value(&it)); + json_object_iter_next(&it); + continue; + } + error("Unknown key \"%s\" in error response payload. Ignoring", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + if (block_retry > 0) + aclk_disable_runtime = 1; + + if (backoff > 0) + aclk_block_until = now_monotonic_sec() + backoff; + + error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff); + rc = 0; +exit: + json_object_put(json); + return rc; +} +#endif + +#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 +static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) +{ + EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); + + if (ctx != NULL) { + memset(ctx, 0, sizeof(*ctx)); + } + return ctx; +} +static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) +{ + OPENSSL_free(ctx); + return; +} +#endif + +#define CHALLENGE_LEN 256 +#define CHALLENGE_LEN_BASE64 344 +inline static int base64_decode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) +{ + unsigned char remaining_data[CHALLENGE_LEN]; + EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); + EVP_DecodeInit(ctx); + EVP_DecodeUpdate(ctx, out, outl, in, in_len); + int remainder = 0; + EVP_DecodeFinal(ctx, remaining_data, &remainder); + EVP_ENCODE_CTX_free(ctx); + if (remainder) { + error("Unexpected data at EVP_DecodeFinal"); + return 1; + } + return 0; +} + +inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) +{ + int len; + unsigned char *str = out; + EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); + EVP_EncodeInit(ctx); + EVP_EncodeUpdate(ctx, str, outl, in, in_len); + str += *outl; + EVP_EncodeFinal(ctx, str, &len); + *outl += len; + // if we ever expect longer output than what OpenSSL would pack into single line + // we would have to skip the endlines, until then we can just cut the string short + str = (unsigned char*)strchr((char*)out, '\n'); + if (str) + *str = 0; + EVP_ENCODE_CTX_free(ctx); + return 0; +} + +#define OTP_URL_PREFIX "/api/v1/auth/node/" +int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **challenge, int *challenge_bytes) +{ + int rc = 1; + https_req_t req = HTTPS_REQ_T_INITIALIZER; + https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; + + BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); + + req.host = target->host; + req.port = target->port; + buffer_sprintf(url, "%s/node/%s/challenge", target->path, agent_id); + req.url = (char *)buffer_tostring(url); + + if (aclk_https_request(&req, &resp)) { + error ("ACLK_OTP Challenge failed"); + buffer_free(url); + return 1; + } + if (resp.http_code != 200) { + error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code); + buffer_free(url); + if (resp.payload_size) + aclk_parse_otp_error(resp.payload); + goto cleanup_resp; + } + buffer_free(url); + + info ("ACLK_OTP Got Challenge from Cloud"); + + json_object *json = json_tokener_parse(resp.payload); + if (!json) { + error ("Couldn't parse HTTP GET challenge payload"); + goto cleanup_resp; + } + json_object *challenge_json; + if (!json_object_object_get_ex(json, "challenge", &challenge_json)) { + error ("No key named \"challenge\" in the returned JSON"); + goto cleanup_json; + } + if (!json_object_is_type(challenge_json, json_type_string)) { + error ("\"challenge\" is not a string JSON type"); + goto cleanup_json; + } + const char *challenge_base64; + if (!(challenge_base64 = json_object_get_string(challenge_json))) { + error("Failed to extract challenge from JSON object"); + goto cleanup_json; + } + if (strlen(challenge_base64) != CHALLENGE_LEN_BASE64) { + error("Received Challenge has unexpected length of %zu (expected %d)", strlen(challenge_base64), CHALLENGE_LEN_BASE64); + goto cleanup_json; + } + + *challenge = mallocz((CHALLENGE_LEN_BASE64 / 4) * 3); + base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64)); + if (*challenge_bytes != CHALLENGE_LEN) { + error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN); + freez(*challenge); + *challenge = NULL; + goto cleanup_json; + } + rc = 0; + +cleanup_json: + json_object_put(json); +cleanup_resp: + https_req_response_free(&resp); + return rc; +} + +int aclk_send_otp_response(const char *agent_id, const unsigned char *response, int response_bytes, url_t *target, struct auth_data *mqtt_auth) +{ + int len; + int rc = 1; + https_req_t req = HTTPS_REQ_T_INITIALIZER; + https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; + + req.host = target->host; + req.port = target->port; + req.request_type = HTTP_REQ_POST; + + unsigned char base64[CHALLENGE_LEN_BASE64 + 1]; + memset(base64, 0, CHALLENGE_LEN_BASE64 + 1); + + base64_encode_helper(base64, &len, response, response_bytes); + + BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); + BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20); + + buffer_sprintf(url, "%s/node/%s/password", target->path, agent_id); + buffer_sprintf(resp_json, "{\"response\":\"%s\"}", base64); + + req.url = (char *)buffer_tostring(url); + req.payload = (char *)buffer_tostring(resp_json); + req.payload_size = strlen(req.payload); + + if (aclk_https_request(&req, &resp)) { + error ("ACLK_OTP Password error trying to post result to password"); + goto cleanup_buffers; + } + if (resp.http_code != 201) { + error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code); + if (resp.payload_size) + aclk_parse_otp_error(resp.payload); + goto cleanup_response; + } + info ("ACLK_OTP Got Password from Cloud"); + + if (parse_passwd_response(resp.payload, mqtt_auth)){ + error("Error parsing response of password endpoint"); + goto cleanup_response; + } + + rc = 0; + +cleanup_response: + https_req_response_free(&resp); +cleanup_buffers: + buffer_free(resp_json); + buffer_free(url); + return rc; +} + +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +static int private_decrypt(EVP_PKEY *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted) +#else +static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char **decrypted) +#endif +{ + int result; +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 + size_t outlen = EVP_PKEY_size(p_key); + EVP_PKEY_CTX *ctx = EVP_PKEY_CTX_new(p_key, NULL); + if (!ctx) + return 1; + + if (EVP_PKEY_decrypt_init(ctx) <= 0) + return 1; + + if (EVP_PKEY_CTX_set_rsa_padding(ctx, RSA_PKCS1_OAEP_PADDING) <= 0) + return 1; + + *decrypted = mallocz(outlen); + + if (EVP_PKEY_decrypt(ctx, *decrypted, &outlen, enc_data, data_len) == 1) + result = (int) outlen; + else + result = -1; +#else + *decrypted = mallocz(RSA_size(p_key)); + result = RSA_private_decrypt(data_len, enc_data, *decrypted, p_key, RSA_PKCS1_OAEP_PADDING); +#endif + if (result == -1) + { + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Decryption of the challenge failed: %s", err); + } + return result; +} + +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) +#else +int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) +#endif +{ + unsigned char *challenge = NULL; + int challenge_bytes; + + char *agent_id = get_agent_claimid(); + if (agent_id == NULL) { + error("Agent was not claimed - cannot perform challenge/response"); + return 1; + } + + // Get Challenge + if (aclk_get_otp_challenge(target, agent_id, &challenge, &challenge_bytes)) { + error("Error getting challenge"); + freez(agent_id); + return 1; + } + + // Decrypt Challenge / Get response + unsigned char *response_plaintext; + int response_plaintext_bytes = private_decrypt(p_key, challenge, challenge_bytes, &response_plaintext); + if (response_plaintext_bytes < 0) { + error ("Couldn't decrypt the challenge received"); + freez(response_plaintext); + freez(challenge); + freez(agent_id); + return 1; + } + freez(challenge); + + // Encode and Send Challenge + struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL }; + if (aclk_send_otp_response(agent_id, response_plaintext, response_plaintext_bytes, target, &data)) { + error("Error getting response"); + freez(response_plaintext); + freez(agent_id); + return 1; + } + + *mqtt_pass = data.passwd; + *mqtt_usr = data.username; + *mqtt_id = data.client_id; + + freez(response_plaintext); + freez(agent_id); + return 0; +} + +#define JSON_KEY_ENC "encoding" +#define JSON_KEY_AUTH_ENDPOINT "authEndpoint" +#define JSON_KEY_TRP "transports" +#define JSON_KEY_TRP_TYPE "type" +#define JSON_KEY_TRP_ENDPOINT "endpoint" +#define JSON_KEY_BACKOFF "backoff" +#define JSON_KEY_BACKOFF_BASE "base" +#define JSON_KEY_BACKOFF_MAX "maxSeconds" +#define JSON_KEY_BACKOFF_MIN "minSeconds" +#define JSON_KEY_CAPS "capabilities" + +static int parse_json_env_transport(json_object *json, aclk_transport_desc_t *trp) { + struct json_object_iterator it; + struct json_object_iterator itEnd; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_TYPE)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_TYPE) + if (trp->type != ACLK_TRP_UNKNOWN) { + error(JSON_KEY_TRP_TYPE " set already"); + goto exit; + } + trp->type = aclk_transport_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it))); + if (trp->type == ACLK_TRP_UNKNOWN) { + error(JSON_KEY_TRP_TYPE " unknown type \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + goto exit; + } + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP_ENDPOINT)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_TRP_ENDPOINT) + if (trp->endpoint) { + error(JSON_KEY_TRP_ENDPOINT " set already"); + goto exit; + } + trp->endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + if (!trp->endpoint) { + error (JSON_KEY_TRP_ENDPOINT " is missing from JSON dictionary"); + goto exit; + } + + if (trp->type == ACLK_TRP_UNKNOWN) { + error ("transport type not set"); + goto exit; + } + + return 0; + +exit: + aclk_transport_desc_t_destroy(trp); + return 1; +} + +static int parse_json_env_transports(json_object *json_array, aclk_env_t *env) { + aclk_transport_desc_t *trp; + json_object *obj; + + if (env->transports) { + error("transports have been set already"); + return 1; + } + + env->transport_count = json_object_array_length(json_array); + + env->transports = callocz(env->transport_count , sizeof(aclk_transport_desc_t *)); + + for (size_t i = 0; i < env->transport_count; i++) { + trp = callocz(1, sizeof(aclk_transport_desc_t)); + obj = json_object_array_get_idx(json_array, i); + if (parse_json_env_transport(obj, trp)) { + error("error parsing transport idx %d", (int)i); + freez(trp); + return 1; + } + env->transports[i] = trp; + } + + return 0; +} + +#define MATCHED_CORRECT 1 +#define MATCHED_ERROR -1 +#define NOT_MATCHED 0 +static int parse_json_backoff_int(struct json_object_iterator *it, int *out, const char* name, int min, int max) { + if (!strcmp(json_object_iter_peek_name(it), name)) { + if (json_object_get_type(json_object_iter_peek_value(it)) != json_type_int) { + error("Could not parse \"%s\". Not an integer as expected.", name); + return MATCHED_ERROR; + } + + *out = json_object_get_int(json_object_iter_peek_value(it)); + + if (*out < min || *out > max) { + error("Value of \"%s\"=%d out of range (%d-%d).", name, *out, min, max); + return MATCHED_ERROR; + } + + return MATCHED_CORRECT; + } + return NOT_MATCHED; +} + +static int parse_json_backoff(json_object *json, aclk_backoff_t *backoff) { + struct json_object_iterator it; + struct json_object_iterator itEnd; + int ret; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if ( (ret = parse_json_backoff_int(&it, &backoff->base, JSON_KEY_BACKOFF_BASE, 1, 10)) ) { + if (ret == MATCHED_ERROR) { + return 1; + } + json_object_iter_next(&it); + continue; + } + + if ( (ret = parse_json_backoff_int(&it, &backoff->max_s, JSON_KEY_BACKOFF_MAX, 500, INT_MAX)) ) { + if (ret == MATCHED_ERROR) { + return 1; + } + json_object_iter_next(&it); + continue; + } + + if ( (ret = parse_json_backoff_int(&it, &backoff->min_s, JSON_KEY_BACKOFF_MIN, 0, INT_MAX)) ) { + if (ret == MATCHED_ERROR) { + return 1; + } + json_object_iter_next(&it); + continue; + } + + error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + return 0; +} + +static int parse_json_env_caps(json_object *json, aclk_env_t *env) { + json_object *obj; + const char *str; + + if (env->capabilities) { + error("transports have been set already"); + return 1; + } + + env->capability_count = json_object_array_length(json); + + // empty capabilities list is allowed + if (!env->capability_count) + return 0; + + env->capabilities = callocz(env->capability_count , sizeof(char *)); + + for (size_t i = 0; i < env->capability_count; i++) { + obj = json_object_array_get_idx(json, i); + if (json_object_get_type(obj) != json_type_string) { + error("Capability at index %d not a string!", (int)i); + return 1; + } + str = json_object_get_string(obj); + if (!str) { + error("Error parsing capabilities"); + return 1; + } + env->capabilities[i] = strdupz(str); + } + + return 0; +} + +static int parse_json_env(const char *json_str, aclk_env_t *env) { + json_object *json; + struct json_object_iterator it; + struct json_object_iterator itEnd; + + json = json_tokener_parse(json_str); + if (!json) { + error("JSON-C failed to parse the payload of http response of /env endpoint"); + return 1; + } + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_AUTH_ENDPOINT)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_AUTH_ENDPOINT) + if (env->auth_endpoint) { + error("authEndpoint set already"); + goto exit; + } + env->auth_endpoint = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ENC)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_ENC) + if (env->encoding != ACLK_ENC_UNKNOWN) { + error(JSON_KEY_ENC " set already"); + goto exit; + } + env->encoding = aclk_encoding_type_t_from_str(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TRP)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TRP) + + json_object *now = json_object_iter_peek_value(&it); + parse_json_env_transports(now, env); + + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_BACKOFF)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_object, JSON_KEY_BACKOFF) + + if (parse_json_backoff(json_object_iter_peek_value(&it), &env->backoff)) { + env->backoff.base = 0; + error("Error parsing Backoff parameters in env"); + goto exit; + } + + json_object_iter_next(&it); + continue; + } + + if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CAPS)) { + PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_CAPS) + + if (parse_json_env_caps(json_object_iter_peek_value(&it), env)) { + error("Error parsing capabilities list"); + goto exit; + } + + json_object_iter_next(&it); + continue; + } + + error ("unknown JSON key in dictionary (\"%s\")", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + // Check all compulsory keys have been set + if (env->transport_count < 1) { + error("env has to return at least one transport"); + goto exit; + } + if (!env->auth_endpoint) { + error(JSON_KEY_AUTH_ENDPOINT " is compulsory"); + goto exit; + } + if (env->encoding == ACLK_ENC_UNKNOWN) { + error(JSON_KEY_ENC " is compulsory"); + goto exit; + } + if (!env->backoff.base) { + error(JSON_KEY_BACKOFF " is compulsory"); + goto exit; + } + + json_object_put(json); + return 0; + +exit: + aclk_env_t_destroy(env); + json_object_put(json); + return 1; +} + +int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) { + BUFFER *buf = buffer_create(1024); + + https_req_t req = HTTPS_REQ_T_INITIALIZER; + https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; + + req.request_type = HTTP_REQ_GET; + + char *agent_id = get_agent_claimid(); + if (agent_id == NULL) + { + error("Agent was not claimed - cannot perform challenge/response"); + buffer_free(buf); + return 1; + } + + buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto,ctx&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id); + + freez(agent_id); + + req.host = (char*)aclk_hostname; + req.port = aclk_port; + req.url = buf->buffer; + if (aclk_https_request(&req, &resp)) { + error("Error trying to contact env endpoint"); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + if (resp.http_code != 200) { + error("The HTTP code not 200 OK (Got %d)", resp.http_code); + if (resp.payload_size) + aclk_parse_otp_error(resp.payload); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + + if (!resp.payload || !resp.payload_size) { + error("Unexpected empty payload as response to /env call"); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + + if (parse_json_env(resp.payload, env)) { + error ("error parsing /env message"); + https_req_response_free(&resp); + buffer_free(buf); + return 1; + } + + info("Getting Cloud /env successful"); + + https_req_response_free(&resp); + buffer_free(buf); + return 0; +} diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h new file mode 100644 index 0000000..2d660e5 --- /dev/null +++ b/aclk/aclk_otp.h @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_OTP_H +#define ACLK_OTP_H + +#include "daemon/common.h" + +#include "https_client.h" +#include "aclk_util.h" + +#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300 +int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target); +#else +int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target); +#endif +int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port); + +#endif /* ACLK_OTP_H */ diff --git a/aclk/aclk_proxy.c b/aclk/aclk_proxy.c new file mode 100644 index 0000000..1701eb8 --- /dev/null +++ b/aclk/aclk_proxy.c @@ -0,0 +1,186 @@ +#include "aclk_proxy.h" + +#include "daemon/common.h" + +#define ACLK_PROXY_ENV "env" +#define ACLK_PROXY_CONFIG_VAR "proxy" + +struct { + ACLK_PROXY_TYPE type; + const char *url_str; +} supported_proxy_types[] = { + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, +}; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) +{ + switch (*type) { + case PROXY_DISABLED: + return "disabled"; + case PROXY_TYPE_HTTP: + return "HTTP"; + case PROXY_TYPE_SOCKS5: + return "SOCKS"; + default: + return "Unknown"; + } +} + +static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) +{ + int i = 0; + while (supported_proxy_types[i].url_str) { + if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) + return supported_proxy_types[i].type; + i++; + } + return PROXY_TYPE_UNKNOWN; +} + +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) +{ + if (!string) + return PROXY_TYPE_UNKNOWN; + + while (*string == 0x20) + string++; + + if (!*string) + return PROXY_TYPE_UNKNOWN; + + return aclk_find_proxy(string); +} + +// helper function to censor user&password +// for logging purposes +void safe_log_proxy_censor(char *proxy) +{ + size_t length = strlen(proxy); + char *auth = proxy + length - 1; + char *cur; + + while ((auth >= proxy) && (*auth != '@')) + auth--; + + //if not found or @ is first char do nothing + if (auth <= proxy) + return; + + cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + if (!cur) + cur = proxy; + else + cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + while (cur < auth) { + *cur = 'X'; + cur++; + } +} + +static inline void safe_log_proxy_error(char *str, const char *proxy) +{ + char *log = strdupz(proxy); + safe_log_proxy_censor(log); + error("%s Provided Value:\"%s\"", str, log); + freez(log); +} + +static inline int check_socks_enviroment(const char **proxy) +{ + char *tmp = getenv("socks_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +static inline int check_http_enviroment(const char **proxy) +{ + char *tmp = getenv("http_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) +{ + const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); + *type = PROXY_DISABLED; + + if (strcmp(proxy, "none") == 0) + return proxy; + + if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { + if (check_socks_enviroment(&proxy) == 0) { +#ifdef LWS_WITH_SOCKS5 + *type = PROXY_TYPE_SOCKS5; + return proxy; +#else + safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " + "but Libwebsockets used doesn't have SOCKS5 support built in. " + "Ignoring and checking for other options.", + proxy); +#endif + } + if (check_http_enviroment(&proxy) == 0) + *type = PROXY_TYPE_HTTP; + return proxy; + } + + *type = aclk_verify_proxy(proxy); +#ifndef LWS_WITH_SOCKS5 + if (*type == PROXY_TYPE_SOCKS5) { + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", + proxy); + } +#endif + if (*type == PROXY_TYPE_UNKNOWN) { + *type = PROXY_DISABLED; + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + proxy); + } + + return proxy; +} + +// helper function to read settings only once (static) +// as claiming, challenge/response and ACLK +// read the same thing, no need to parse again +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) +{ + static const char *proxy = NULL; + static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; + + if (proxy_type == PROXY_NOT_SET) + proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); + + *type = proxy_type; + return proxy; +} diff --git a/aclk/aclk_proxy.h b/aclk/aclk_proxy.h new file mode 100644 index 0000000..b4ceb7d --- /dev/null +++ b/aclk/aclk_proxy.h @@ -0,0 +1,22 @@ +#ifndef ACLK_PROXY_H +#define ACLK_PROXY_H + +#include <config.h> + +#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" + +typedef enum aclk_proxy_type { + PROXY_TYPE_UNKNOWN = 0, + PROXY_TYPE_SOCKS5, + PROXY_TYPE_HTTP, + PROXY_DISABLED, + PROXY_NOT_SET, +} ACLK_PROXY_TYPE; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); +void safe_log_proxy_censor(char *proxy); +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); + +#endif /* ACLK_PROXY_H */ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c new file mode 100644 index 0000000..5301c28 --- /dev/null +++ b/aclk/aclk_query.c @@ -0,0 +1,383 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_query.h" +#include "aclk_stats.h" +#include "aclk_tx_msgs.h" + +#define ACLK_QUERY_THREAD_NAME "ACLK_Query" + +#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) + +static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url) +{ + usec_t t; + + t = now_monotonic_high_precision_usec(); + w->response.code = web_client_api_request_v1(host, w, url); + t = now_monotonic_high_precision_usec() - t; + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_q_process_total += t; + aclk_metrics_per_sample.cloud_q_process_count++; + if (aclk_metrics_per_sample.cloud_q_process_max < t) + aclk_metrics_per_sample.cloud_q_process_max = t; + ACLK_STATS_UNLOCK; + } + + return t; +} + +static RRDHOST *node_id_2_rrdhost(const char *node_id) +{ + int res; + uuid_t node_id_bin, host_id_bin; + + RRDHOST *host = find_host_by_node_id((char *)node_id); + if (host) + return host; + + char host_id[UUID_STR_LEN]; + if (uuid_parse(node_id, node_id_bin)) { + error("Couldn't parse UUID %s", node_id); + return NULL; + } + if ((res = get_host_id(&node_id_bin, &host_id_bin))) { + error("node not found rc=%d", res); + return NULL; + } + uuid_unparse_lower(host_id_bin, host_id); + return rrdhost_find_by_guid(host_id); +} + +#define NODE_ID_QUERY "/node/" +// TODO this function should be quarantied and written nicely +// lots of skeletons from initial ACLK Legacy impl. +// quick and dirty from the start +static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) +{ + int retval = 0; + usec_t t; + BUFFER *local_buffer = NULL; + BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE); + RRDHOST *query_host = localhost; + +#ifdef NETDATA_WITH_ZLIB + int z_ret; + BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *start, *end; +#endif + + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = WEB_CLIENT_ACL_ACLK; + + buffer_strcat(log_buffer, query->data.http_api_v2.query); + size_t size = 0; + size_t sent = 0; + w->tv_in = query->created_tv; + now_realtime_timeval(&w->tv_ready); + + if (query->timeout) { + int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000); + if (in_queue > query->timeout) { + log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout); + retval = 1; + w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0); + goto cleanup; + } + } + + if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) { + char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY); + char nodeid[UUID_STR_LEN]; + if (strlen(node_uuid) < (UUID_STR_LEN - 1)) { + error_report(CLOUD_EMSG_MALFORMED_NODE_ID); + retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0); + goto cleanup; + } + strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1); + + query_host = node_id_2_rrdhost(nodeid); + if (!query_host) { + error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid); + retval = 1; + w->response.code = 404; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0); + goto cleanup; + } + } + + char *mysep = strchr(query->data.http_api_v2.query, '?'); + if (mysep) { + url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); + *mysep = '\0'; + } else + url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + + mysep = strrchr(query->data.http_api_v2.query, '/'); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + int stat_idx = aclk_cloud_req_http_type_to_idx(mysep ? mysep + 1 : "other"); + aclk_metrics_per_sample.cloud_req_http_by_type[stat_idx]++; + ACLK_STATS_UNLOCK; + } + + // execute the query + t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop"); + size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len; + sent = size; + +#ifdef NETDATA_WITH_ZLIB + // check if gzip encoding can and should be used + if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) { + start += strlen(WEB_HDR_ACCEPT_ENC); + end = strstr(start, "\x0D\x0A"); + start = strstr(start, "gzip"); + + if (start && start < end) { + w->response.zstream.zalloc = Z_NULL; + w->response.zstream.zfree = Z_NULL; + w->response.zstream.opaque = Z_NULL; + if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { + w->response.zinitialized = 1; + w->response.zoutput = 1; + } else + error("Failed to initialize zlib. Proceeding without compression."); + } + } + + if (w->response.data->len && w->response.zinitialized) { + w->response.zstream.next_in = (Bytef *)w->response.data->buffer; + w->response.zstream.avail_in = w->response.data->len; + do { + w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE; + w->response.zstream.next_out = w->response.zbuffer; + z_ret = deflate(&w->response.zstream, Z_FINISH); + if(z_ret < 0) { + if(w->response.zstream.msg) + error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); + else + error("Unknown error during zlib compression."); + retval = 1; + w->response.code = 500; + aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_ZLIB_ERROR, CLOUD_EMSG_ZLIB_ERROR, NULL, 0); + goto cleanup; + } + int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; + buffer_need_bytes(z_buffer, bytes_to_cpy); + memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy); + z_buffer->len += bytes_to_cpy; + } while(z_ret != Z_STREAM_END); + // so that web_client_build_http_header + // puts correct content length into header + buffer_free(w->response.data); + w->response.data = z_buffer; + z_buffer = NULL; + } +#endif + + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + buffer_strcat(local_buffer, w->response.header_output->buffer); + + if (w->response.data->len) { +#ifdef NETDATA_WITH_ZLIB + if (w->response.zinitialized) { + buffer_need_bytes(local_buffer, w->response.data->len); + memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); + local_buffer->len += w->response.data->len; + sent = sent - size + w->response.data->len; + } else { +#endif + buffer_strcat(local_buffer, w->response.data->buffer); +#ifdef NETDATA_WITH_ZLIB + } +#endif + } + + // send msg. + aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); + + struct timeval tv; + +cleanup: + now_realtime_timeval(&tv); + log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + w->id + , gettid() + , query_thr->idx + , "DATA" + , sent + , size + , size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0) + , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 + , dt_usec(&tv, &w->tv_ready) / 1000.0 + , dt_usec(&tv, &w->tv_in) / 1000.0 + , w->response.code + , strip_control_characters((char *)buffer_tostring(log_buffer)) + ); + +#ifdef NETDATA_WITH_ZLIB + if(w->response.zinitialized) + deflateEnd(&w->response.zstream); + buffer_free(z_buffer); +#endif + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + buffer_free(log_buffer); + return retval; +} + +static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) +{ + // this will be simplified when legacy support is removed + aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); + return 0; +} + +const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok) +{ + switch (qt) { + case HTTP_API_V2: return "http_api_request_v2"; + case REGISTER_NODE: return "register_node"; + case NODE_STATE_UPDATE: return "node_state_update"; + case CHART_DIMS_UPDATE: return "chart_and_dim_update"; + case CHART_CONFIG_UPDATED: return "chart_config_updated"; + case CHART_RESET: return "reset_chart_messages"; + case RETENTION_UPDATED: return "update_retention_info"; + case UPDATE_NODE_INFO: return "update_node_info"; + case ALARM_LOG_HEALTH: return "alarm_log_health"; + case ALARM_PROVIDE_CFG: return "provide_alarm_config"; + case ALARM_SNAPSHOT: return "alarm_snapshot"; + case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; + case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; + default: + if (!unknown_ok) + error_report("Unknown query type used %d", (int) qt); + return "unknown"; + } +} + +static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) +{ + if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) { + error_report("Unknown query in query queue. %u", query->type); + aclk_query_free(query); + return; + } + + worker_is_busy(query->type); + if (query->type == HTTP_API_V2) { + debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); + http_api_v2(query_thr, query); + } else { + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); + send_bin_msg(query_thr, query); + } + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[query_thr->idx]++; + aclk_metrics_per_sample.queries_per_type[query->type]++; + ACLK_STATS_UNLOCK; + } + + aclk_query_free(query); + + worker_is_idle(); +} + +/* Processes messages from queue. Compete for work with other threads + */ +int aclk_query_process_msgs(struct aclk_query_thread *query_thr) +{ + aclk_query_t query; + while ((query = aclk_queue_pop())) + aclk_query_process_msg(query_thr, query); + + return 0; +} + +static void worker_aclk_register(void) { + worker_register("ACLKQUERY"); + for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) { + worker_register_job_name(i, aclk_query_get_name(i, 0)); + } +} + +/** + * Main query processing thread + */ +void *aclk_query_main_thread(void *ptr) +{ + worker_aclk_register(); + + struct aclk_query_thread *query_thr = ptr; + + while (!netdata_exit) { + aclk_query_process_msgs(query_thr); + + worker_is_idle(); + QUERY_THREAD_LOCK; + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + QUERY_THREAD_UNLOCK; + } + + worker_unregister(); + return NULL; +} + +#define TASK_LEN_MAX 22 +void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client) +{ + info("Starting %d query threads.", query_threads->count); + + char thread_name[TASK_LEN_MAX]; + query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); + for (int i = 0; i < query_threads->count; i++) { + query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics + + if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) + error("snprintf encoding error"); + netdata_thread_create( + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i]); + + query_threads->thread_list[i].client = client; + } +} + +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +{ + if (query_threads && query_threads->thread_list) { + for (int i = 0; i < query_threads->count; i++) { + netdata_thread_join(query_threads->thread_list[i].thread, NULL); + } + freez(query_threads->thread_list); + } + aclk_queue_lock(); + aclk_queue_flush(); +} diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h new file mode 100644 index 0000000..c006b01 --- /dev/null +++ b/aclk/aclk_query.h @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_QUERY_H +#define NETDATA_ACLK_QUERY_H + +#include "libnetdata/libnetdata.h" + +#include "mqtt_wss_client.h" + +#include "aclk_query_queue.h" + +extern pthread_cond_t query_cond_wait; +extern pthread_mutex_t query_lock_wait; +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) +#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) + +// TODO +//extern volatile int aclk_connected; + +struct aclk_query_thread { + netdata_thread_t thread; + int idx; + mqtt_wss_client client; +}; + +struct aclk_query_threads { + struct aclk_query_thread *thread_list; + int count; +}; + +void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client); +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); + +const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok); + +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c new file mode 100644 index 0000000..9a45057 --- /dev/null +++ b/aclk/aclk_query_queue.c @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_query_queue.h" +#include "aclk_query.h" +#include "aclk_stats.h" + +static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex) +#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex) + +static struct aclk_query_queue { + aclk_query_t head; + aclk_query_t tail; + int block_push; +} aclk_query_queue = { + .head = NULL, + .tail = NULL, + .block_push = 0 +}; + +static inline int _aclk_queue_query(aclk_query_t query) +{ + now_realtime_timeval(&query->created_tv); + query->created = now_realtime_usec(); + + ACLK_QUEUE_LOCK; + if (aclk_query_queue.block_push) { + ACLK_QUEUE_UNLOCK; + if(!netdata_exit) + error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + aclk_query_free(query); + return 1; + } + if (!aclk_query_queue.head) { + aclk_query_queue.head = query; + aclk_query_queue.tail = query; + ACLK_QUEUE_UNLOCK; + return 0; + } + // TODO deduplication + aclk_query_queue.tail->next = query; + aclk_query_queue.tail = query; + ACLK_QUEUE_UNLOCK; + return 0; + +} + +int aclk_queue_query(aclk_query_t query) +{ + int ret = _aclk_queue_query(query); + if (!ret) { + QUERY_THREAD_WAKEUP; + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_queued++; + ACLK_STATS_UNLOCK; + } + } + return ret; +} + +aclk_query_t aclk_queue_pop(void) +{ + aclk_query_t ret; + + ACLK_QUEUE_LOCK; + if (aclk_query_queue.block_push) { + ACLK_QUEUE_UNLOCK; + if(!netdata_exit) + error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + return NULL; + } + + ret = aclk_query_queue.head; + if (!ret) { + ACLK_QUEUE_UNLOCK; + return ret; + } + + aclk_query_queue.head = ret->next; + if (unlikely(!aclk_query_queue.head)) + aclk_query_queue.tail = aclk_query_queue.head; + ACLK_QUEUE_UNLOCK; + + ret->next = NULL; + return ret; +} + +void aclk_queue_flush(void) +{ + aclk_query_t query = aclk_queue_pop(); + while (query) { + aclk_query_free(query); + query = aclk_queue_pop(); + }; +} + +aclk_query_t aclk_query_new(aclk_query_type_t type) +{ + aclk_query_t query = callocz(1, sizeof(struct aclk_query)); + query->type = type; + return query; +} + +void aclk_query_free(aclk_query_t query) +{ + switch (query->type) { + case HTTP_API_V2: + freez(query->data.http_api_v2.payload); + if (query->data.http_api_v2.query != query->dedup_id) + freez(query->data.http_api_v2.query); + break; + + default: + break; + } + + freez(query->dedup_id); + freez(query->callback_topic); + freez(query->msg_id); + freez(query); +} + +void aclk_queue_lock(void) +{ + ACLK_QUEUE_LOCK; + aclk_query_queue.block_push = 1; + ACLK_QUEUE_UNLOCK; +} + +void aclk_queue_unlock(void) +{ + ACLK_QUEUE_LOCK; + aclk_query_queue.block_push = 0; + ACLK_QUEUE_UNLOCK; +} diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h new file mode 100644 index 0000000..ab94b63 --- /dev/null +++ b/aclk/aclk_query_queue.h @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_QUERY_QUEUE_H +#define NETDATA_ACLK_QUERY_QUEUE_H + +#include "libnetdata/libnetdata.h" +#include "daemon/common.h" +#include "schema-wrappers/schema_wrappers.h" + +#include "aclk_util.h" + +typedef enum { + UNKNOWN = 0, + HTTP_API_V2, + REGISTER_NODE, + NODE_STATE_UPDATE, + CHART_DIMS_UPDATE, + CHART_CONFIG_UPDATED, + CHART_RESET, + RETENTION_UPDATED, + UPDATE_NODE_INFO, + ALARM_LOG_HEALTH, + ALARM_PROVIDE_CFG, + ALARM_SNAPSHOT, + UPDATE_NODE_COLLECTORS, + PROTO_BIN_MESSAGE, + ACLK_QUERY_TYPE_COUNT // always keep this as last +} aclk_query_type_t; + +struct aclk_query_http_api_v2 { + char *payload; + char *query; +}; + +struct aclk_bin_payload { + char *payload; + size_t size; + enum aclk_topics topic; + const char *msg_name; +}; + +typedef struct aclk_query *aclk_query_t; +struct aclk_query { + aclk_query_type_t type; + + // dedup_id is used to deduplicate queries in the list + // if type and dedup_id is the same message is deduplicated + // set dedup_id to NULL to never deduplicate the message + // set dedup_id to constant (e.g. empty string "") to make + // message of this type ever exist only once in the list + char *dedup_id; + char *callback_topic; + char *msg_id; + + struct timeval created_tv; + usec_t created; + int timeout; + aclk_query_t next; + + // TODO maybe remove? + int version; + union { + struct aclk_query_http_api_v2 http_api_v2; + struct aclk_bin_payload bin_payload; + } data; +}; + +aclk_query_t aclk_query_new(aclk_query_type_t type); +void aclk_query_free(aclk_query_t query); + +int aclk_queue_query(aclk_query_t query); +aclk_query_t aclk_queue_pop(void); +void aclk_queue_flush(void); + +void aclk_queue_lock(void); +void aclk_queue_unlock(void); + +#define QUEUE_IF_PAYLOAD_PRESENT(query) \ + if (likely(query->data.bin_payload.payload)) { \ + aclk_queue_query(query); \ + } else { \ + error("Failed to generate payload (%s)", __FUNCTION__); \ + aclk_query_free(query); \ + } + +#endif /* NETDATA_ACLK_QUERY_QUEUE_H */ diff --git a/aclk/aclk_rrdhost_state.h b/aclk/aclk_rrdhost_state.h new file mode 100644 index 0000000..5c8a2dd --- /dev/null +++ b/aclk/aclk_rrdhost_state.h @@ -0,0 +1,11 @@ +#ifndef ACLK_RRDHOST_STATE_H +#define ACLK_RRDHOST_STATE_H + +#include "libnetdata/libnetdata.h" + +typedef struct aclk_rrdhost_state { + char *claimed_id; // Claimed ID if host has one otherwise NULL + char *prev_claimed_id; // Claimed ID if changed (reclaimed) during runtime +} aclk_rrdhost_state; + +#endif /* ACLK_RRDHOST_STATE_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c new file mode 100644 index 0000000..83bc550 --- /dev/null +++ b/aclk/aclk_rx_msgs.c @@ -0,0 +1,551 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_rx_msgs.h" + +#include "aclk_stats.h" +#include "aclk_query_queue.h" +#include "aclk.h" +#include "aclk_capas.h" + +#include "schema-wrappers/proto_2_json.h" + +#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" +#define ACLK_CLOUD_REQ_V2_PREFIX "GET /" + +#define ACLK_V_COMPRESSION 2 + +struct aclk_request { + char *type_id; + char *msg_id; + char *callback_topic; + char *payload; + int version; + int timeout; + int min_version; + int max_version; +}; + +static int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, "msg-id")) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "type")) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "callback-topic")) { + data->callback_topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "payload")) { + if (likely(e->data.string)) { + size_t len = strlen(e->data.string); + data->payload = mallocz(len+1); + if (!url_decode_r(data->payload, e->data.string, len + 1)) + strcpy(data->payload, e->data.string); + } + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, "version")) { + data->version = (int)e->data.number; + break; + } + if (!strcmp(e->name, "timeout")) { + data->timeout = (int)e->data.number; + break; + } + if (!strcmp(e->name, "min-version")) { + data->min_version = (int)e->data.number; + break; + } + if (!strcmp(e->name, "max-version")) { + data->max_version = (int)e->data.number; + break; + } + + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + +static inline int aclk_extract_v2_data(char *payload, char **data) +{ + char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR); + if(!ptr) + return 1; + ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR); + *data = strdupz(ptr); + return 0; +} + +static inline int aclk_v2_payload_get_query(const char *payload, char **query_url) +{ + const char *start, *end; + + // TODO better check of URL + if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { + errno = 0; + error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); + return 1; + } + start = payload + 4; + + if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) { + errno = 0; + error("Doesn't look like HTTP GET request."); + return 1; + } + + *query_url = mallocz((end - start) + 1); + strncpyz(*query_url, start, end - start); + + return 0; +} + +static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + aclk_query_t query; + + errno = 0; + if (cloud_to_agent->version < ACLK_V_COMPRESSION) { + error( + "This handler cannot reply to request with version older than %d, received %d.", + ACLK_V_COMPRESSION, + cloud_to_agent->version); + return 1; + } + + query = aclk_query_new(HTTP_API_V2); + + if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) { + error("Error extracting payload expected after the JSON dictionary."); + goto error; + } + + if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) { + error("Could not extract payload from query"); + goto error; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("Missing callback_topic"); + goto error; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("Missing msg_id"); + goto error; + } + + // aclk_queue_query takes ownership of data pointer + query->callback_topic = cloud_to_agent->callback_topic; + query->timeout = cloud_to_agent->timeout; + // for clarity and code readability as when we process the request + // it would be strange to get URL from `dedup_id` + query->data.http_api_v2.query = query->dedup_id; + query->msg_id = cloud_to_agent->msg_id; + aclk_queue_query(query); + return 0; + +error: + aclk_query_free(query); + return 1; +} + +int aclk_handle_cloud_cmd_message(char *payload) +{ + struct aclk_request cloud_to_agent; + memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); + + if (unlikely(!payload)) { + error_report("ACLK incoming 'cmd' message is empty"); + return 1; + } + + debug(D_ACLK, "ACLK incoming 'cmd' message (%s)", payload); + + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(rc != JSON_OK)) { + error_report("Malformed json request (%s)", payload); + goto err_cleanup; + } + + if (!cloud_to_agent.type_id) { + error_report("Cloud message is missing compulsory key \"type\""); + goto err_cleanup; + } + + // Originally we were expecting to have multiple types of 'cmd' message, + // but after the new protocol was designed we will ever only have 'http' + if (strcmp(cloud_to_agent.type_id, "http")) { + error_report("Only 'http' cmd message is supported"); + goto err_cleanup; + } + + if (likely(!aclk_handle_cloud_http_request_v2(&cloud_to_agent, payload))) { + // aclk_handle_cloud_request takes ownership of the pointers + // (to avoid copying) in case of success + freez(cloud_to_agent.type_id); + return 0; + } + +err_cleanup: + if (cloud_to_agent.payload) + freez(cloud_to_agent.payload); + if (cloud_to_agent.type_id) + freez(cloud_to_agent.type_id); + if (cloud_to_agent.msg_id) + freez(cloud_to_agent.msg_id); + if (cloud_to_agent.callback_topic) + freez(cloud_to_agent.callback_topic); + + return 1; +} + +typedef uint32_t simple_hash_t; +typedef int(*rx_msg_handler)(const char *msg, size_t msg_len); + +int handle_old_proto_cmd(const char *msg, size_t msg_len) +{ + // msg is binary payload in all other cases + // however in this message from old legacy cloud + // we have to convert it to C string + char *str = mallocz(msg_len+1); + memcpy(str, msg, msg_len); + str[msg_len] = 0; + if (aclk_handle_cloud_cmd_message(str)) { + freez(str); + return 1; + } + freez(str); + return 0; +} + +int create_node_instance_result(const char *msg, size_t msg_len) +{ + node_instance_creation_result_t res = parse_create_node_instance_result(msg, msg_len); + if (!res.machine_guid || !res.node_id) { + error_report("Error parsing CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return 1; + } + + debug(D_ACLK, "CreateNodeInstanceResult: guid:%s nodeid:%s", res.machine_guid, res.node_id); + + uuid_t host_id, node_id; + if (uuid_parse(res.machine_guid, host_id)) { + error("Error parsing machine_guid provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return 1; + } + if (uuid_parse(res.node_id, node_id)) { + error("Error parsing node_id provided by CreateNodeInstanceResult"); + freez(res.machine_guid); + freez(res.node_id); + return 1; + } + update_node_id(&host_id, &node_id); + + aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE); + node_instance_connection_t node_state_update = { + .hops = 1, + .live = 0, + .queryable = 1, + .session_id = aclk_session_newarch, + .node_id = res.node_id + }; + + RRDHOST *host = rrdhost_find_by_guid(res.machine_guid); + if (host) { + // not all host must have RRDHOST struct created for them + // if they never connected during runtime of agent + if (host == localhost) { + node_state_update.live = 1; + node_state_update.hops = 0; + } else { + netdata_mutex_lock(&host->receiver_lock); + node_state_update.live = (host->receiver != NULL); + netdata_mutex_unlock(&host->receiver_lock); + node_state_update.hops = host->system_info->hops; + } + } + + node_state_update.capabilities = aclk_get_node_instance_capas(host); + + rrdhost_aclk_state_lock(localhost); + node_state_update.claim_id = localhost->aclk_state.claimed_id; + query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); + rrdhost_aclk_state_unlock(localhost); + + freez((void *)node_state_update.capabilities); + + query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; + query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; + + aclk_queue_query(query); + freez(res.node_id); + freez(res.machine_guid); + return 0; +} + +int send_node_instances(const char *msg, size_t msg_len) +{ + UNUSED(msg); + UNUSED(msg_len); + aclk_send_node_instances(); + return 0; +} + +int stream_charts_and_dimensions(const char *msg, size_t msg_len) +{ + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensions msg"); + return 0; +} + +int charts_and_dimensions_ack(const char *msg, size_t msg_len) +{ + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete StreamChartsAndDimensionsAck msg"); + return 0; +} + +int update_chart_configs(const char *msg, size_t msg_len) +{ + UNUSED(msg); + UNUSED(msg_len); + error_report("Received obsolete UpdateChartConfigs msg"); + return 0; +} + +int start_alarm_streaming(const char *msg, size_t msg_len) +{ + struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); + if (!res.node_id || !res.batch_id) { + error("Error parsing StartAlarmStreaming"); + freez(res.node_id); + return 1; + } + aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + freez(res.node_id); + return 0; +} + +int send_alarm_log_health(const char *msg, size_t msg_len) +{ + char *node_id = parse_send_alarm_log_health(msg, msg_len); + if (!node_id) { + error("Error parsing SendAlarmLogHealth"); + return 1; + } + aclk_send_alarm_health_log(node_id); + freez(node_id); + return 0; +} + +int send_alarm_configuration(const char *msg, size_t msg_len) +{ + char *config_hash = parse_send_alarm_configuration(msg, msg_len); + if (!config_hash || !*config_hash) { + error("Error parsing SendAlarmConfiguration"); + freez(config_hash); + return 1; + } + aclk_send_alarm_configuration(config_hash); + freez(config_hash); + return 0; +} + +int send_alarm_snapshot(const char *msg, size_t msg_len) +{ + struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); + if (!sas->node_id || !sas->claim_id) { + error("Error parsing SendAlarmSnapshot"); + destroy_send_alarm_snapshot(sas); + return 1; + } + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + destroy_send_alarm_snapshot(sas); + return 0; +} + +int handle_disconnect_req(const char *msg, size_t msg_len) +{ + struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); + if (!cmd) + return 1; + if (cmd->permaban) { + error("Cloud Banned This Agent!"); + aclk_disable_runtime = 1; + } + info("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); + if (cmd->reconnect_after_s > 0) { + aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; + info( + "Cloud asks not to reconnect for %u seconds. We shall honor that request", + (unsigned int)cmd->reconnect_after_s); + } + disconnect_req = 1; + freez(cmd->error_description); + freez(cmd); + return 0; +} + +int contexts_checkpoint(const char *msg, size_t msg_len) +{ + aclk_ctx_based = 1; + + struct ctxs_checkpoint *cmd = parse_ctxs_checkpoint(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_checkpoint_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + +int stop_streaming_contexts(const char *msg, size_t msg_len) +{ + if (!aclk_ctx_based) { + error_report("Received StopStreamingContexts message but context based communication was not enabled (Cloud violated the protocol). Ignoring message"); + return 1; + } + + struct stop_streaming_ctxs *cmd = parse_stop_streaming_ctxs(msg, msg_len); + if (!cmd) + return 1; + + rrdcontext_hub_stop_streaming_command(cmd); + + freez(cmd->claim_id); + freez(cmd->node_id); + freez(cmd); + return 0; +} + +typedef struct { + const char *name; + simple_hash_t name_hash; + rx_msg_handler fnc; +} new_cloud_rx_msg_t; + +new_cloud_rx_msg_t rx_msgs[] = { + { .name = "cmd", .name_hash = 0, .fnc = handle_old_proto_cmd }, + { .name = "CreateNodeInstanceResult", .name_hash = 0, .fnc = create_node_instance_result }, + { .name = "SendNodeInstances", .name_hash = 0, .fnc = send_node_instances }, + { .name = "StreamChartsAndDimensions", .name_hash = 0, .fnc = stream_charts_and_dimensions }, + { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack }, + { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs }, + { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming }, + { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health }, + { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, + { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, + { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, + { .name = "ContextsCheckpoint", .name_hash = 0, .fnc = contexts_checkpoint }, + { .name = "StopStreamingContexts", .name_hash = 0, .fnc = stop_streaming_contexts }, + { .name = NULL, .name_hash = 0, .fnc = NULL }, +}; + +new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash) +{ + // we can afford to not compare strings after hash match + // because we check for collisions at initialization in + // aclk_init_rx_msg_handlers() + for (int i = 0; rx_msgs[i].fnc; i++) { + if (rx_msgs[i].name_hash == hash) + return &rx_msgs[i]; + } + return NULL; +} + +const char *rx_handler_get_name(size_t i) +{ + return rx_msgs[i].name; +} + +unsigned int aclk_init_rx_msg_handlers(void) +{ + int i; + for (i = 0; rx_msgs[i].fnc; i++) { + simple_hash_t hash = simple_hash(rx_msgs[i].name); + new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash); + if (unlikely(hdl)) { + // the list of message names changes only by changing + // the source code, therefore fatal is appropriate + fatal("Hash collision. Choose better hash. Added '%s' clashes with existing '%s'", rx_msgs[i].name, hdl->name); + } + rx_msgs[i].name_hash = hash; + } + return i; +} + +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic __maybe_unused) +{ + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_recvd++; + ACLK_STATS_UNLOCK; + } + new_cloud_rx_msg_t *msg_descriptor = find_rx_handler_by_hash(simple_hash(message_type)); + debug(D_ACLK, "Got message named '%s' from cloud", message_type); + if (unlikely(!msg_descriptor)) { + error("Do not know how to handle message of type '%s'. Ignoring", message_type); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; + } + return; + } + + + if (aclklog_enabled) { + if (!strncmp(message_type, "cmd", strlen("cmd"))) { + log_aclk_message_bin(msg, msg_len, 0, topic, msg_descriptor->name); + } else { + char *json = protomsg_to_json(msg, msg_len, msg_descriptor->name); + log_aclk_message_bin(json, strlen(json), 0, topic, msg_descriptor->name); + freez(json); + } + } + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++; + ACLK_STATS_UNLOCK; + } + if (msg_descriptor->fnc(msg, msg_len)) { + error("Error processing message of type '%s'", message_type); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; + } + return; + } +} diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h new file mode 100644 index 0000000..61921fa --- /dev/null +++ b/aclk/aclk_rx_msgs.h @@ -0,0 +1,17 @@ + + +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_RX_MSGS_H +#define ACLK_RX_MSGS_H + +#include "daemon/common.h" +#include "libnetdata/libnetdata.h" + +int aclk_handle_cloud_cmd_message(char *payload); + +const char *rx_handler_get_name(size_t i); +unsigned int aclk_init_rx_msg_handlers(void); +void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len, const char *topic); + +#endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c new file mode 100644 index 0000000..215313f --- /dev/null +++ b/aclk/aclk_stats.c @@ -0,0 +1,408 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_stats.h" + +#include "aclk_query.h" + +netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; + +struct { + int query_thread_count; + unsigned int proto_hdl_cnt; + uint32_t *aclk_proto_rx_msgs_sample; + RRDDIM **rx_msg_dims; +} aclk_stats_cfg; // there is only 1 stats thread at a time + +// data ACLK stats need per query thread +struct aclk_qt_data { + RRDDIM *dim; +} *aclk_qt_data = NULL; + +uint32_t *aclk_queries_per_thread = NULL; +uint32_t *aclk_queries_per_thread_sample = NULL; +uint32_t *aclk_proto_rx_msgs_sample = NULL; + +struct aclk_metrics aclk_metrics = { + .online = 0, +}; + +struct aclk_metrics_per_sample aclk_metrics_per_sample; + +static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent) +{ + static RRDSET *st_aclkstats = NULL; + static RRDDIM *rd_online_status = NULL; + + if (unlikely(!st_aclkstats)) { + st_aclkstats = rrdset_create_localhost( + "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status", + "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online); + + rrdset_done(st_aclkstats); +} + +static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st_query_thread = NULL; + static RRDDIM *rd_queued = NULL; + static RRDDIM *rd_dispatched = NULL; + + if (unlikely(!st_query_thread)) { + st_query_thread = rrdset_create_localhost( + "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s", + "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued); + rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched); + + rrdset_done(st_query_thread); +} + +#ifdef NETDATA_INTERNAL_CHECKS +static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_avg = NULL; + static RRDDIM *rd_max = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms", + "netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + if(per_sample->latency_count) + rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count)); + else + rrddim_set_by_pointer(st, rd_avg, 0); + + rrddim_set_by_pointer(st, rd_max, per_sample->latency_max); + + rrdset_done(st); +} +#endif + +static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_rcvd = NULL; + static RRDDIM *rd_rq_err = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s", + "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err); + rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err); + + rrdset_done(st); +} + +static void aclk_stats_cloud_req_type(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *dims[ACLK_QUERY_TYPE_COUNT]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_processed_query_type", NULL, "aclk", NULL, "Query thread commands processed by their type", "cmd/s", + "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) + dims[i] = rrddim_add(st, aclk_query_get_name(i, 1), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + + } + + for (int i = 0; i < ACLK_QUERY_TYPE_COUNT; i++) + rrddim_set_by_pointer(st, dims[i], per_sample->queries_per_type[i]); + + rrdset_done(st); +} + +static char *cloud_req_http_type_names[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT] = { + "other", + "info", + "data", + "alarms", + "alarm_log", + "chart", + "charts" + // if you change then update `ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT`. +}; + +int aclk_cloud_req_http_type_to_idx(const char *name) +{ + for (int i = 1; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) + if (!strcmp(cloud_req_http_type_names[i], name)) + return i; + return 0; +} + +static void aclk_stats_cloud_req_http_type(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req_http_type", NULL, "aclk", NULL, "Requests received from cloud via HTTP by their type", "req/s", + "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) + rd_rq_types[i] = rrddim_add(st, cloud_req_http_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + + for (int i = 0; i < ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT; i++) + rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_http_by_type[i]); + + rrdset_done(st); +} + +#define MAX_DIM_NAME 22 +static void aclk_stats_query_threads(uint32_t *queries_per_thread) +{ + static RRDSET *st = NULL; + + char dim_name[MAX_DIM_NAME]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", + "netdata", "stats", 200009, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { + if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) + error("snprintf encoding error"); + aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } + + for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { + rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); + } + + rrdset_done(st); +} + +static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_avg = NULL; + static RRDDIM *rd_rq_max = NULL; + static RRDDIM *rd_rq_total = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us", + "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + + if(per_sample->cloud_q_process_count) + rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count)); + else + rrddim_set_by_pointer(st, rd_rq_avg, 0); + rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max); + rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total); + + rrdset_done(st); +} + +const char *rx_handler_get_name(size_t i); +static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) +{ + static RRDSET *st = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_protobuf_rx_types", NULL, "aclk", NULL, "Received new cloud architecture messages by their type.", "msg/s", + "netdata", "stats", 200010, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) { + aclk_stats_cfg.rx_msg_dims[i] = rrddim_add(st, rx_handler_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } + + for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) + rrddim_set_by_pointer(st, aclk_stats_cfg.rx_msg_dims[i], rx_msgs_sample[i]); + + rrdset_done(st); +} + +static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_sent = NULL; + static RRDDIM *rd_recvd = NULL; + static uint64_t sent = 0; + static uint64_t recvd = 0; + + sent += stats->bytes_tx; + recvd += stats->bytes_rx; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_openssl_bytes", NULL, "aclk", NULL, "Received and Sent bytes.", "B/s", + "netdata", "stats", 200011, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_sent = rrddim_add(st, "sent", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL); + rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + } + + rrddim_set_by_pointer(st, rd_sent, sent); + rrddim_set_by_pointer(st, rd_recvd, recvd); + + rrdset_done(st); +} + +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) +{ + aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); + aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt; + aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); + aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); + aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*)); +} + +void aclk_stats_thread_cleanup() +{ + freez(aclk_stats_cfg.rx_msg_dims); + freez(aclk_proto_rx_msgs_sample); + freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample); + freez(aclk_qt_data); + freez(aclk_queries_per_thread); + freez(aclk_queries_per_thread_sample); +} + +void *aclk_stats_main_thread(void *ptr) +{ + struct aclk_stats_thread *args = ptr; + + aclk_stats_cfg.query_thread_count = args->query_thread_count; + + heartbeat_t hb; + heartbeat_init(&hb); + usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; + + struct aclk_metrics_per_sample per_sample; + struct aclk_metrics permanent; + + while (!netdata_exit) { + netdata_thread_testcancel(); + // ------------------------------------------------------------------------ + // Wait for the next iteration point. + + heartbeat_next(&hb, step_ut); + if (netdata_exit) break; + + ACLK_STATS_LOCK; + // to not hold lock longer than necessary, especially not to hold it + // during database rrd* operations + memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); + + memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); + memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); + + memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count); + memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count); + ACLK_STATS_UNLOCK; + + aclk_stats_collect(&per_sample, &permanent); + aclk_stats_query_queue(&per_sample); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_latency(&per_sample); +#endif + + aclk_stats_cloud_req(&per_sample); + aclk_stats_cloud_req_type(&per_sample); + aclk_stats_cloud_req_http_type(&per_sample); + + aclk_stats_query_threads(aclk_queries_per_thread_sample); + + aclk_stats_query_time(&per_sample); + + struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client); + aclk_stats_mqtt_wss(&mqtt_wss_stats); + + aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample); + } + + return 0; +} + +void aclk_stats_upd_online(int online) { + if(!aclk_stats_enabled) + return; + + ACLK_STATS_LOCK; + aclk_metrics.online = online; + + if(!online) + aclk_metrics_per_sample.offline_during_sample = 1; + ACLK_STATS_UNLOCK; +} + +#ifdef NETDATA_INTERNAL_CHECKS +static usec_t pub_time[UINT16_MAX + 1] = {0}; +void aclk_stats_msg_published(uint16_t id) +{ + ACLK_STATS_LOCK; + pub_time[id] = now_boottime_usec(); + ACLK_STATS_UNLOCK; +} + +void aclk_stats_msg_puback(uint16_t id) +{ + ACLK_STATS_LOCK; + usec_t t; + + if (!aclk_stats_enabled) { + ACLK_STATS_UNLOCK; + return; + } + + if (unlikely(!pub_time[id])) { + ACLK_STATS_UNLOCK; + error("Received PUBACK for unknown message?!"); + return; + } + + t = now_boottime_usec() - pub_time[id]; + t /= USEC_PER_MS; + pub_time[id] = 0; + if (aclk_metrics_per_sample.latency_max < t) + aclk_metrics_per_sample.latency_max = t; + + aclk_metrics_per_sample.latency_total += t; + aclk_metrics_per_sample.latency_count++; + ACLK_STATS_UNLOCK; +} +#endif /* NETDATA_INTERNAL_CHECKS */ diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h new file mode 100644 index 0000000..bec9ac2 --- /dev/null +++ b/aclk/aclk_stats.h @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_STATS_H +#define NETDATA_ACLK_STATS_H + +#include "daemon/common.h" +#include "libnetdata/libnetdata.h" +#include "aclk_query_queue.h" +#include "mqtt_wss_client.h" + +#define ACLK_STATS_THREAD_NAME "ACLK_Stats" + +extern netdata_mutex_t aclk_stats_mutex; + +#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex) +#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex) + +// if you change update `cloud_req_http_type_names`. +#define ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT 7 + +int aclk_cloud_req_http_type_to_idx(const char *name); + +struct aclk_stats_thread { + netdata_thread_t *thread; + int query_thread_count; + mqtt_wss_client client; +}; + +// preserve between samples +struct aclk_metrics { + volatile uint8_t online; +}; + +// reset to 0 on every sample +extern struct aclk_metrics_per_sample { + /* in the unlikely event of ACLK disconnecting + and reconnecting under 1 sampling rate + we want to make sure we record the disconnection + despite it being then seemingly longer in graph */ + volatile uint8_t offline_during_sample; + + volatile uint32_t queries_queued; + volatile uint32_t queries_dispatched; + +#ifdef NETDATA_INTERNAL_CHECKS + volatile uint32_t latency_max; + volatile uint32_t latency_total; + volatile uint32_t latency_count; +#endif + + volatile uint32_t cloud_req_recvd; + volatile uint32_t cloud_req_err; + + // query types. + volatile uint32_t queries_per_type[ACLK_QUERY_TYPE_COUNT]; + + // HTTP-specific request types. + volatile uint32_t cloud_req_http_by_type[ACLK_STATS_CLOUD_HTTP_REQ_TYPE_CNT]; + + volatile uint32_t cloud_q_process_total; + volatile uint32_t cloud_q_process_count; + volatile uint32_t cloud_q_process_max; +} aclk_metrics_per_sample; + +extern uint32_t *aclk_proto_rx_msgs_sample; + +extern uint32_t *aclk_queries_per_thread; + +void *aclk_stats_main_thread(void *ptr); +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt); +void aclk_stats_thread_cleanup(); +void aclk_stats_upd_online(int online); + +#ifdef NETDATA_INTERNAL_CHECKS +void aclk_stats_msg_published(uint16_t id); +void aclk_stats_msg_puback(uint16_t id); +#endif /* NETDATA_INTERNAL_CHECKS */ + +#endif /* NETDATA_ACLK_STATS_H */ diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c new file mode 100644 index 0000000..532b964 --- /dev/null +++ b/aclk/aclk_tx_msgs.c @@ -0,0 +1,276 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_tx_msgs.h" +#include "daemon/common.h" +#include "aclk_util.h" +#include "aclk_stats.h" +#include "aclk.h" +#include "aclk_capas.h" + +#include "schema-wrappers/proto_2_json.h" + +#ifndef __GNUC__ +#pragma region aclk_tx_msgs helper functions +#endif + +// version for aclk legacy (old cloud arch) +#define ACLK_VERSION 2 + +static void freez_aclk_publish5a(void *ptr) { + freez(ptr); +} +static void freez_aclk_publish5b(void *ptr) { + freez(ptr); +} + +uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname) +{ +#ifndef ACLK_LOG_CONVERSATION_DIR + UNUSED(msgname); +#endif + uint16_t packet_id; + const char *topic = aclk_get_topic(subtopic); + + if (unlikely(!topic)) { + error("Couldn't get topic. Aborting message send."); + return 0; + } + + mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif + + if (aclklog_enabled) { + char *json = protomsg_to_json(msg, msg_len, msgname); + log_aclk_message_bin(json, strlen(json), 1, topic, msgname); + freez(json); + } + + return packet_id; +} + +// json_object_put returns int unfortunately :D +// we need void(*fnc)(void *); +static void json_object_put_wrapper(void *jsonobj) +{ + json_object_put(jsonobj); +} + +#define TOPIC_MAX_LEN 512 +#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" +static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len) +{ + uint16_t packet_id; + const char *str; + char *full_msg = NULL; + int len; + + if (unlikely(!topic || topic[0] != '/')) { + error ("Full topic required!"); + json_object_put(msg); + return HTTP_RESP_INTERNAL_SERVER_ERROR; + } + + str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + len = strlen(str); + + if (payload_len) { + full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); + + memcpy(full_msg, str, len); + json_object_put(msg); + msg = NULL; + memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); + len += strlen(V2_BIN_PAYLOAD_SEPARATOR); + memcpy(&full_msg[len], payload, payload_len); + len += payload_len; + } + + mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id); + +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif + + return 0; +} + +/* + * Creates universal header common for all ACLK messages. User gets ownership of json object created. + * Usually this is freed by send function after message has been sent. + */ +static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + json_object *tmp; + json_object *obj = json_object_new_object(); + + tmp = json_object_new_string(type); + json_object_object_add(obj, "type", tmp); + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + if (ts_secs == 0) { + ts_us = now_realtime_usec(); + ts_secs = ts_us / USEC_PER_SEC; + ts_us = ts_us % USEC_PER_SEC; + } + + tmp = json_object_new_string(msg_id); + json_object_object_add(obj, "msg-id", tmp); + + tmp = json_object_new_int64(ts_secs); + json_object_object_add(obj, "timestamp", tmp); + +// TODO handle this somehow on older json-c +// tmp = json_object_new_uint64(ts_us); +// probably jso->_to_json_string -> custom function +// jso->o.c_uint64 -> map this with pointer to signed int +// commit that implements json_object_new_uint64 is 3c3b592 +// between 0.14 and 0.15 + tmp = json_object_new_int64(ts_us); + json_object_object_add(obj, "timestamp-offset-usec", tmp); + + tmp = json_object_new_int64(aclk_session_sec); + json_object_object_add(obj, "connect", tmp); + +// TODO handle this somehow see above +// tmp = json_object_new_uint64(0 /* TODO aclk_session_us */); + tmp = json_object_new_int64(aclk_session_us); + json_object_object_add(obj, "connect-offset-usec", tmp); + + tmp = json_object_new_int(version); + json_object_object_add(obj, "version", tmp); + + return obj; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region aclk_tx_msgs message generators +#endif + +void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len) +{ + json_object *tmp, *msg; + msg = create_hdr("http", msg_id, 0, 0, 2); + tmp = json_object_new_int(http_code); + json_object_object_add(msg, "http-code", tmp); + + tmp = json_object_new_int(ec); + json_object_object_add(msg, "error-code", tmp); + + tmp = json_object_new_string(emsg); + json_object_object_add(msg, "error-description", tmp); + + if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { + error("Failed to send cancelation message for http reply"); + } +} + +void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) +{ + json_object *tmp, *msg; + + msg = create_hdr("http", msg_id, 0, 0, 2); + + tmp = json_object_new_int64(t_exec); + json_object_object_add(msg, "t-exec", tmp); + + tmp = json_object_new_int64(created); + json_object_object_add(msg, "t-rx", tmp); + + tmp = json_object_new_int(http_code); + json_object_object_add(msg, "http-code", tmp); + + int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); + + switch (rc) { + case HTTP_RESP_FORBIDDEN: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len); + break; + case HTTP_RESP_INTERNAL_SERVER_ERROR: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len); + break; + case HTTP_RESP_BACKEND_FETCH_FAILED: + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len); + break; + } +} + +uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { + size_t len; + uint16_t pid; + + update_agent_connection_t conn = { + .reachable = (reachable ? 1 : 0), + .lwt = 0, + .session_id = aclk_session_newarch, + .capabilities = aclk_get_agent_capas() + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return 0; + } + if (localhost->aclk_state.prev_claimed_id) + conn.claim_id = localhost->aclk_state.prev_claimed_id; + else + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(&len, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) { + error("Error generating agent::v1::UpdateAgentConnection payload"); + return 0; + } + + pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection"); + if (localhost->aclk_state.prev_claimed_id) { + freez(localhost->aclk_state.prev_claimed_id); + localhost->aclk_state.prev_claimed_id = NULL; + } + return pid; +} + +char *aclk_generate_lwt(size_t *size) { + update_agent_connection_t conn = { + .reachable = 0, + .lwt = 1, + .session_id = aclk_session_newarch, + .capabilities = NULL + }; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("Internal error. Should not come here if not claimed"); + rrdhost_aclk_state_unlock(localhost); + return NULL; + } + conn.claim_id = localhost->aclk_state.claimed_id; + + char *msg = generate_update_agent_connection(size, &conn); + rrdhost_aclk_state_unlock(localhost); + + if (!msg) + error("Error generating agent::v1::UpdateAgentConnection payload for LWT"); + + return msg; +} + +#ifndef __GNUC__ +#pragma endregion +#endif diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h new file mode 100644 index 0000000..31e5924 --- /dev/null +++ b/aclk/aclk_tx_msgs.h @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_TX_MSGS_H +#define ACLK_TX_MSGS_H + +#include <json-c/json.h> +#include "libnetdata/libnetdata.h" +#include "daemon/common.h" +#include "mqtt_wss_client.h" +#include "schema-wrappers/schema_wrappers.h" +#include "aclk_util.h" + +uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); + +void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len); +void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); + +uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable); +char *aclk_generate_lwt(size_t *size); + +#endif diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c new file mode 100644 index 0000000..01eaedc --- /dev/null +++ b/aclk/aclk_util.c @@ -0,0 +1,388 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_util.h" +#include "aclk_proxy.h" + +#include "daemon/common.h" + +usec_t aclk_session_newarch = 0; + +aclk_env_t *aclk_env = NULL; + +int chart_batch_id; + +aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) { + if (!strcmp(str, "json")) { + return ACLK_ENC_JSON; + } + if (!strcmp(str, "proto")) { + return ACLK_ENC_PROTO; + } + return ACLK_ENC_UNKNOWN; +} + +aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) { + if (!strcmp(str, "MQTTv3")) { + return ACLK_TRP_MQTT_3_1_1; + } + if (!strcmp(str, "MQTTv5")) { + return ACLK_TRP_MQTT_5; + } + return ACLK_TRP_UNKNOWN; +} + +void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) { + freez(trp_desc->endpoint); +} + +void aclk_env_t_destroy(aclk_env_t *env) { + freez(env->auth_endpoint); + if (env->transports) { + for (size_t i = 0; i < env->transport_count; i++) { + if(env->transports[i]) { + aclk_transport_desc_t_destroy(env->transports[i]); + freez(env->transports[i]); + env->transports[i] = NULL; + } + } + freez(env->transports); + } + if (env->capabilities) { + for (size_t i = 0; i < env->capability_count; i++) + freez(env->capabilities[i]); + freez(env->capabilities); + } +} + +int aclk_env_has_capa(const char *capa) +{ + for (int i = 0; i < (int) aclk_env->capability_count; i++) { + if (!strcasecmp(capa, aclk_env->capabilities[i])) + return 1; + } + return 0; +} + +#ifdef ACLK_LOG_CONVERSATION_DIR +volatile int aclk_conversation_log_counter = 0; +#endif + +#define ACLK_TOPIC_PREFIX "/agent/" + +struct aclk_topic { + enum aclk_topics topic_id; + // as received from cloud - we keep this for + // eventual topic list update when claim_id changes + char *topic_recvd; + // constructed topic + char *topic; +}; + +// This helps to cache finalized topics (assembled with claim_id) +// to not have to alloc or create buffer and construct topic every +// time message is sent as in old ACLK +static struct aclk_topic **aclk_topic_cache = NULL; +static size_t aclk_topic_cache_items = 0; + +void free_topic_cache(void) +{ + if (aclk_topic_cache) { + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + freez(aclk_topic_cache[i]->topic); + freez(aclk_topic_cache[i]->topic_recvd); + freez(aclk_topic_cache[i]); + } + freez(aclk_topic_cache); + aclk_topic_cache = NULL; + aclk_topic_cache_items = 0; + } +} + +#define JSON_TOPIC_KEY_TOPIC "topic" +#define JSON_TOPIC_KEY_NAME "name" + +struct topic_name { + enum aclk_topics id; + // cloud name - how is it called + // in answer to /password endpoint + const char *name; +} topic_names[] = { + { .id = ACLK_TOPICID_CHART, .name = "chart" }, + { .id = ACLK_TOPICID_ALARMS, .name = "alarms" }, + { .id = ACLK_TOPICID_METADATA, .name = "meta" }, + { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" }, + { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" }, + { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" }, + { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" }, + { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" }, + { .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" }, + { .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" }, + { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" }, + { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" }, + { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" }, + { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" }, + { .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" }, + { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, + { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" }, + { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" }, + { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" }, + { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" }, + { .id = ACLK_TOPICID_UNKNOWN, .name = NULL } +}; + +enum aclk_topics compulsory_topics[] = { +// TODO remove old topics once not needed anymore + ACLK_TOPICID_CHART, //TODO from legacy + ACLK_TOPICID_ALARMS, //TODO from legacy + ACLK_TOPICID_METADATA, //TODO from legacy + ACLK_TOPICID_COMMAND, + ACLK_TOPICID_AGENT_CONN, + ACLK_TOPICID_CMD_NG_V1, + ACLK_TOPICID_CREATE_NODE, + ACLK_TOPICID_NODE_CONN, + ACLK_TOPICID_CHART_DIMS, + ACLK_TOPICID_CHART_CONFIGS_UPDATED, + ACLK_TOPICID_CHART_RESET, + ACLK_TOPICID_RETENTION_UPDATED, + ACLK_TOPICID_NODE_INFO, + ACLK_TOPICID_ALARM_LOG, + ACLK_TOPICID_ALARM_HEALTH, + ACLK_TOPICID_ALARM_CONFIG, + ACLK_TOPICID_ALARM_SNAPSHOT, + ACLK_TOPICID_NODE_COLLECTORS, + ACLK_TOPICID_CTXS_SNAPSHOT, + ACLK_TOPICID_CTXS_UPDATED, + ACLK_TOPICID_UNKNOWN +}; + +static enum aclk_topics topic_name_to_id(const char *name) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (!strcmp(topic->name, name)) { + return topic->id; + } + topic++; + } + return ACLK_TOPICID_UNKNOWN; +} + +static const char *topic_id_to_name(enum aclk_topics tid) { + struct topic_name *topic = topic_names; + while (topic->name) { + if (topic->id == tid) + return topic->name; + topic++; + } + return "unknown"; +} + +#define CLAIM_ID_REPLACE_TAG "#{claim_id}" +static void topic_generate_final(struct aclk_topic *t) { + char *dest; + char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG); + if (!replace_tag) + return; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(!localhost->aclk_state.claimed_id)) { + error("This should never be called if agent not claimed"); + rrdhost_aclk_state_unlock(localhost); + return; + } + + t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id)); + memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd); + dest = t->topic + (replace_tag - t->topic_recvd); + + memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id)); + dest += strlen(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); + replace_tag += strlen(CLAIM_ID_REPLACE_TAG); + strcpy(dest, replace_tag); + dest += strlen(replace_tag); + *dest = 0; +} + +static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic) +{ + struct json_object_iterator it; + struct json_object_iterator itEnd; + + it = json_object_iter_begin(json); + itEnd = json_object_iter_end(json); + + while (!json_object_iter_equal(&it, &itEnd)) { + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string"); + return 1; + } + topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it))); + if (topic->topic_id == ACLK_TOPICID_UNKNOWN) { + debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it))); + } + json_object_iter_next(&it); + continue; + } + if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) { + if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) { + error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string"); + return 1; + } + topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + continue; + } + + error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it)); + json_object_iter_next(&it); + } + + if (!topic->topic_recvd) { + error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC); + return 1; + } + + topic_generate_final(topic); + aclk_topic_cache_items++; + + return 0; +} + +int aclk_generate_topic_cache(struct json_object *json) +{ + json_object *obj; + + size_t array_size = json_object_array_length(json); + if (!array_size) { + error("Empty topic list!"); + return 1; + } + + if (aclk_topic_cache) + free_topic_cache(); + + aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *)); + + for (size_t i = 0; i < array_size; i++) { + obj = json_object_array_get_idx(json, i); + if (json_object_get_type(obj) != json_type_object) { + error("expected json_type_object"); + return 1; + } + aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic)); + if (topic_cache_add_topic(obj, aclk_topic_cache[i])) { + error("failed to parse topic @idx=%d", (int)i); + return 1; + } + } + + for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) { + if (!aclk_get_topic(compulsory_topics[i])) { + error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i])); + return 1; + } + } + + return 0; +} + +/* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ +const char *aclk_get_topic(enum aclk_topics topic) +{ + if (!aclk_topic_cache) { + error("Topic cache not initialized"); + return NULL; + } + + for (size_t i = 0; i < aclk_topic_cache_items; i++) { + if (aclk_topic_cache[i]->topic_id == topic) + return aclk_topic_cache[i]->topic; + } + error("Unknown topic"); + return NULL; +} + +/* + * TBEB with randomness + * + * @param reset 1 - to reset the delay, + * 0 - to advance a step and calculate sleep time in ms + * @param min, max in seconds + * @returns delay in ms + * + */ + +unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) { + static int attempt = -1; + + if (reset) { + attempt = -1; + return 0; + } + + attempt++; + + if (attempt == 0) { + srandom(time(NULL)); + return 0; + } + + unsigned long int delay = pow(base, attempt - 1); + delay *= MSEC_PER_SEC; + + delay += (random() % (MAX(1000, delay/2))); + + if (delay <= min * MSEC_PER_SEC) + return min; + + if (delay >= max * MSEC_PER_SEC) + return max; + + return delay; +} + + +#define HTTP_PROXY_PREFIX "http://" +void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type) +{ + ACLK_PROXY_TYPE pt; + const char *ptr = aclk_get_proxy(&pt); + char *tmp; + char *host; + if (pt != PROXY_TYPE_HTTP) + return; + + *port = 0; + + if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX))) + ptr += strlen(HTTP_PROXY_PREFIX); + + if ((tmp = strchr(ptr, '@'))) + ptr = tmp; + + if ((tmp = strchr(ptr, '/'))) { + host = mallocz((tmp - ptr) + 1); + memcpy(host, ptr, (tmp - ptr)); + host[tmp - ptr] = 0; + } else + host = strdupz(ptr); + + if ((tmp = strchr(host, ':'))) { + *tmp = 0; + tmp++; + *port = atoi(tmp); + } + + if (*port <= 0 || *port > 65535) + *port = 8080; + + *ohost = host; + + if (type) + *type = MQTT_WSS_PROXY_HTTP; +} diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h new file mode 100644 index 0000000..ed715e0 --- /dev/null +++ b/aclk/aclk_util.h @@ -0,0 +1,112 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_UTIL_H +#define ACLK_UTIL_H + +#include "libnetdata/libnetdata.h" +#include "mqtt_wss_client.h" + +#define CLOUD_EC_MALFORMED_NODE_ID 1 +#define CLOUD_EMSG_MALFORMED_NODE_ID "URL requests node_id but there is not enough chars following (for it to be valid uuid)." +#define CLOUD_EC_NODE_NOT_FOUND 2 +#define CLOUD_EMSG_NODE_NOT_FOUND "Node with requested node_id not found" +#define CLOUD_EC_ZLIB_ERROR 3 +#define CLOUD_EMSG_ZLIB_ERROR "Error during zlib compression" +#define CLOUD_EC_REQ_REPLY_TOO_BIG 4 +#define CLOUD_EMSG_REQ_REPLY_TOO_BIG "Request reply produces message bigger than allowed maximum" +#define CLOUD_EC_FAIL_TOPIC 5 +#define CLOUD_EMSG_FAIL_TOPIC "Internal Topic Error" +#define CLOUD_EC_SND_TIMEOUT 6 +#define CLOUD_EMSG_SND_TIMEOUT "Timeout sending binpacked message" + +// Helper stuff which should not have any further inside ACLK dependency +// and are supposed not to be needed outside of ACLK +extern usec_t aclk_session_newarch; + +extern int chart_batch_id; + +typedef enum { + ACLK_ENC_UNKNOWN = 0, + ACLK_ENC_JSON, + ACLK_ENC_PROTO +} aclk_encoding_type_t; + +typedef enum { + ACLK_TRP_UNKNOWN = 0, + ACLK_TRP_MQTT_3_1_1, + ACLK_TRP_MQTT_5 +} aclk_transport_type_t; + +typedef struct { + char *endpoint; + aclk_transport_type_t type; +} aclk_transport_desc_t; + +typedef struct { + int base; + int max_s; + int min_s; +} aclk_backoff_t; + +typedef struct { + char *auth_endpoint; + aclk_encoding_type_t encoding; + + aclk_transport_desc_t **transports; + size_t transport_count; + + char **capabilities; + size_t capability_count; + + aclk_backoff_t backoff; +} aclk_env_t; + +extern aclk_env_t *aclk_env; + +aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str); +aclk_transport_type_t aclk_transport_type_t_from_str(const char *str); + +void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc); +void aclk_env_t_destroy(aclk_env_t *env); +int aclk_env_has_capa(const char *capa); + +enum aclk_topics { + ACLK_TOPICID_UNKNOWN = 0, + ACLK_TOPICID_CHART = 1, + ACLK_TOPICID_ALARMS = 2, + ACLK_TOPICID_METADATA = 3, + ACLK_TOPICID_COMMAND = 4, + ACLK_TOPICID_AGENT_CONN = 5, + ACLK_TOPICID_CMD_NG_V1 = 6, + ACLK_TOPICID_CREATE_NODE = 7, + ACLK_TOPICID_NODE_CONN = 8, + ACLK_TOPICID_CHART_DIMS = 9, + ACLK_TOPICID_CHART_CONFIGS_UPDATED = 10, + ACLK_TOPICID_CHART_RESET = 11, + ACLK_TOPICID_RETENTION_UPDATED = 12, + ACLK_TOPICID_NODE_INFO = 13, + ACLK_TOPICID_ALARM_LOG = 14, + ACLK_TOPICID_ALARM_HEALTH = 15, + ACLK_TOPICID_ALARM_CONFIG = 16, + ACLK_TOPICID_ALARM_SNAPSHOT = 17, + ACLK_TOPICID_NODE_COLLECTORS = 18, + ACLK_TOPICID_CTXS_SNAPSHOT = 19, + ACLK_TOPICID_CTXS_UPDATED = 20 +}; + +const char *aclk_get_topic(enum aclk_topics topic); +int aclk_generate_topic_cache(struct json_object *json); +void free_topic_cache(void); +// TODO +// aclk_topics_reload //when claim id changes + +#ifdef ACLK_LOG_CONVERSATION_DIR +extern volatile int aclk_conversation_log_counter; +#define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST) +#endif + +unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max); +#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0) + +void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type); + +#endif /* ACLK_UTIL_H */ diff --git a/aclk/helpers/mqtt_wss_pal.h b/aclk/helpers/mqtt_wss_pal.h new file mode 100644 index 0000000..5c89f8b --- /dev/null +++ b/aclk/helpers/mqtt_wss_pal.h @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef MQTT_WSS_PAL_H +#define MQTT_WSS_PAL_H + +#include "libnetdata/libnetdata.h" + +#undef OPENSSL_VERSION_095 +#undef OPENSSL_VERSION_097 +#undef OPENSSL_VERSION_110 +#undef OPENSSL_VERSION_111 + +#define mw_malloc(...) mallocz(__VA_ARGS__) +#define mw_calloc(...) callocz(__VA_ARGS__) +#define mw_free(...) freez(__VA_ARGS__) +#define mw_strdup(...) strdupz(__VA_ARGS__) +#define mw_realloc(...) reallocz(__VA_ARGS__) + +#endif /* MQTT_WSS_PAL_H */ diff --git a/aclk/helpers/ringbuffer_pal.h b/aclk/helpers/ringbuffer_pal.h new file mode 100644 index 0000000..2f7e1cb --- /dev/null +++ b/aclk/helpers/ringbuffer_pal.h @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef RINGBUFFER_PAL_H +#define RINGBUFFER_PAL_H + +#include "libnetdata/libnetdata.h" + +#define crbuf_malloc(...) mallocz(__VA_ARGS__) +#define crbuf_free(...) freez(__VA_ARGS__) + +#endif /* RINGBUFFER_PAL_H */ diff --git a/aclk/https_client.c b/aclk/https_client.c new file mode 100644 index 0000000..1a32f83 --- /dev/null +++ b/aclk/https_client.c @@ -0,0 +1,688 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" + +#include "https_client.h" + +#include "mqtt_websockets/c-rbuf/include/ringbuffer.h" + +enum http_parse_state { + HTTP_PARSE_INITIAL = 0, + HTTP_PARSE_HEADERS, + HTTP_PARSE_CONTENT +}; + +static const char *http_req_type_to_str(http_req_type_t req) { + switch (req) { + case HTTP_REQ_GET: + return "GET"; + case HTTP_REQ_POST: + return "POST"; + case HTTP_REQ_CONNECT: + return "CONNECT"; + default: + return "unknown"; + } +} + +typedef struct { + enum http_parse_state state; + int content_length; + int http_code; +} http_parse_ctx; + +#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 } +static inline void http_parse_ctx_clear(http_parse_ctx *ctx) { + ctx->state = HTTP_PARSE_INITIAL; + ctx->content_length = -1; + ctx->http_code = 0; +} + +#define POLL_TO_MS 100 + +#define NEED_MORE_DATA 0 +#define PARSE_SUCCESS 1 +#define PARSE_ERROR -1 +#define HTTP_LINE_TERM "\x0D\x0A" +#define RESP_PROTO "HTTP/1.1 " +#define HTTP_KEYVAL_SEPARATOR ": " +#define HTTP_HDR_BUFFER_SIZE 256 +#define PORT_STR_MAX_BYTES 12 + +static void process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const char *val) +{ + // currently we care only about content-length + // but in future the way this is written + // it can be extended + if (!strcmp("content-length", key)) { + parse_ctx->content_length = atoi(val); + } +} + +static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx) +{ + int idx, idx_end; + char buf_key[HTTP_HDR_BUFFER_SIZE]; + char buf_val[HTTP_HDR_BUFFER_SIZE]; + char *ptr = buf_key; + if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx_end)) { + error("CRLF expected"); + return 1; + } + + char *separator = rbuf_find_bytes(buf, HTTP_KEYVAL_SEPARATOR, strlen(HTTP_KEYVAL_SEPARATOR), &idx); + if (!separator) { + error("Missing Key/Value separator"); + return 1; + } + if (idx >= HTTP_HDR_BUFFER_SIZE) { + error("Key name is too long"); + return 1; + } + + rbuf_pop(buf, buf_key, idx); + buf_key[idx] = 0; + + rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR)); + idx_end -= strlen(HTTP_KEYVAL_SEPARATOR) + idx; + if (idx_end >= HTTP_HDR_BUFFER_SIZE) { + error("Value of key \"%s\" too long", buf_key); + return 1; + } + + rbuf_pop(buf, buf_val, idx_end); + buf_val[idx_end] = 0; + + for (ptr = buf_key; *ptr; ptr++) + *ptr = tolower(*ptr); + + process_http_hdr(parse_ctx, buf_key, buf_val); + + return 0; +} + +static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) +{ + int idx; + char rc[4]; + + do { + if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx)) + return NEED_MORE_DATA; + switch (parse_ctx->state) { + case HTTP_PARSE_INITIAL: + if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) { + error("Expected response to start with \"%s\"", RESP_PROTO); + return PARSE_ERROR; + } + rbuf_bump_tail(buf, strlen(RESP_PROTO)); + if (rbuf_pop(buf, rc, 4) != 4) { + error("Expected HTTP status code"); + return PARSE_ERROR; + } + if (rc[3] != ' ') { + error("Expected space after HTTP return code"); + return PARSE_ERROR; + } + rc[3] = 0; + parse_ctx->http_code = atoi(rc); + if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) { + error("HTTP code not in range 100 to 599"); + return PARSE_ERROR; + } + + rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx); + + rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM)); + + parse_ctx->state = HTTP_PARSE_HEADERS; + break; + case HTTP_PARSE_HEADERS: + if (!idx) { + parse_ctx->state = HTTP_PARSE_CONTENT; + rbuf_bump_tail(buf, strlen(HTTP_LINE_TERM)); + break; + } + if (parse_http_hdr(buf, parse_ctx)) + return PARSE_ERROR; + rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx); + rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM)); + break; + case HTTP_PARSE_CONTENT: + // replies like CONNECT etc. do not have content + if (parse_ctx->content_length < 0) + return PARSE_SUCCESS; + + if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length) + return PARSE_SUCCESS; + return NEED_MORE_DATA; + } + } while(1); +} + +typedef struct https_req_ctx { + https_req_t *request; + + int sock; + rbuf_t buf_rx; + + struct pollfd poll_fd; + + SSL_CTX *ssl_ctx; + SSL *ssl; + + size_t written; + + int self_signed_allowed; + + http_parse_ctx parse_ctx; + + time_t req_start_time; +} https_req_ctx_t; + +static int https_req_check_timedout(https_req_ctx_t *ctx) { + if (now_realtime_sec() > ctx->req_start_time + ctx->request->timeout_s) { + error("request timed out"); + return 1; + } + return 0; +} + +static char *_ssl_err_tos(int err) +{ + switch(err){ + case SSL_ERROR_SSL: + return "SSL_ERROR_SSL"; + case SSL_ERROR_WANT_READ: + return "SSL_ERROR_WANT_READ"; + case SSL_ERROR_WANT_WRITE: + return "SSL_ERROR_WANT_WRITE"; + case SSL_ERROR_NONE: + return "SSL_ERROR_NONE"; + case SSL_ERROR_ZERO_RETURN: + return "SSL_ERROR_ZERO_RETURN"; + case SSL_ERROR_WANT_CONNECT: + return "SSL_ERROR_WANT_CONNECT"; + case SSL_ERROR_WANT_ACCEPT: + return "SSL_ERROR_WANT_ACCEPT"; + } + return "Unknown!!!"; +} + +static int socket_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { + ctx->written = 0; + ctx->poll_fd.events = POLLOUT; + + do { + int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); + if (ret < 0) { + error("poll error"); + return 1; + } + if (ret == 0) { + if (https_req_check_timedout(ctx)) { + error("Poll timed out"); + return 2; + } + continue; + } + + ret = write(ctx->sock, &data[ctx->written], data_len - ctx->written); + if (ret > 0) { + ctx->written += ret; + } else if (errno != EAGAIN && errno != EWOULDBLOCK) { + error("Error writing to socket"); + return 3; + } + } while (ctx->written < data_len); + + return 0; +} + +static int ssl_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { + ctx->written = 0; + ctx->poll_fd.events |= POLLOUT; + + do { + int ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); + if (ret < 0) { + error("poll error"); + return 1; + } + if (ret == 0) { + if (https_req_check_timedout(ctx)) { + error("Poll timed out"); + return 2; + } + continue; + } + ctx->poll_fd.events = 0; + + ret = SSL_write(ctx->ssl, &data[ctx->written], data_len - ctx->written); + if (ret > 0) { + ctx->written += ret; + } else { + ret = SSL_get_error(ctx->ssl, ret); + switch (ret) { + case SSL_ERROR_WANT_READ: + ctx->poll_fd.events |= POLLIN; + break; + case SSL_ERROR_WANT_WRITE: + ctx->poll_fd.events |= POLLOUT; + break; + default: + error("SSL_write Err: %s", _ssl_err_tos(ret)); + return 3; + } + } + } while (ctx->written < data_len); + + return 0; +} + +static inline int https_client_write_all(https_req_ctx_t *ctx, char *data, size_t data_len) { + if (ctx->ssl_ctx) + return ssl_write_all(ctx, data, data_len); + return socket_write_all(ctx, data, data_len); +} + +static int read_parse_response(https_req_ctx_t *ctx) { + int ret; + char *ptr; + size_t size; + + ctx->poll_fd.events = POLLIN; + do { + ret = poll(&ctx->poll_fd, 1, POLL_TO_MS); + if (ret < 0) { + error("poll error"); + return 1; + } + if (ret == 0) { + if (https_req_check_timedout(ctx)) { + error("Poll timed out"); + return 2; + } + if (!ctx->ssl_ctx) + continue; + } + ctx->poll_fd.events = 0; + + ptr = rbuf_get_linear_insert_range(ctx->buf_rx, &size); + + if (ctx->ssl_ctx) + ret = SSL_read(ctx->ssl, ptr, size); + else + ret = read(ctx->sock, ptr, size); + + if (ret > 0) { + rbuf_bump_head(ctx->buf_rx, ret); + } else { + if (ctx->ssl_ctx) { + ret = SSL_get_error(ctx->ssl, ret); + switch (ret) { + case SSL_ERROR_WANT_READ: + ctx->poll_fd.events |= POLLIN; + break; + case SSL_ERROR_WANT_WRITE: + ctx->poll_fd.events |= POLLOUT; + break; + default: + error("SSL_read Err: %s", _ssl_err_tos(ret)); + return 3; + } + } else { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + error("write error"); + return 3; + } + ctx->poll_fd.events |= POLLIN; + } + } + } while (!(ret = parse_http_response(ctx->buf_rx, &ctx->parse_ctx))); + + if (ret != PARSE_SUCCESS) { + error("Error parsing HTTP response"); + return 1; + } + + return 0; +} + +#define TX_BUFFER_SIZE 8192 +#define RX_BUFFER_SIZE (TX_BUFFER_SIZE*2) +static int handle_http_request(https_req_ctx_t *ctx) { + BUFFER *hdr = buffer_create(TX_BUFFER_SIZE); + int rc = 0; + + http_parse_ctx_clear(&ctx->parse_ctx); + + // Prepare data to send + switch (ctx->request->request_type) { + case HTTP_REQ_CONNECT: + buffer_strcat(hdr, "CONNECT "); + break; + case HTTP_REQ_GET: + buffer_strcat(hdr, "GET "); + break; + case HTTP_REQ_POST: + buffer_strcat(hdr, "POST "); + break; + default: + error("Unknown HTTPS request type!"); + rc = 1; + goto err_exit; + } + + if (ctx->request->request_type == HTTP_REQ_CONNECT) { + buffer_strcat(hdr, ctx->request->host); + buffer_sprintf(hdr, ":%d", ctx->request->port); + } else { + buffer_strcat(hdr, ctx->request->url); + } + + buffer_strcat(hdr, " HTTP/1.1\x0D\x0A"); + + //TODO Headers! + if (ctx->request->request_type != HTTP_REQ_CONNECT) { + buffer_sprintf(hdr, "Host: %s\x0D\x0A", ctx->request->host); + } + buffer_strcat(hdr, "User-Agent: Netdata/rocks newhttpclient\x0D\x0A"); + + if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) { + buffer_sprintf(hdr, "Content-Length: %zu\x0D\x0A", ctx->request->payload_size); + } + + buffer_strcat(hdr, "\x0D\x0A"); + + // Send the request + if (https_client_write_all(ctx, hdr->buffer, hdr->len)) { + error("Couldn't write HTTP request header into SSL connection"); + rc = 2; + goto err_exit; + } + + if (ctx->request->request_type == HTTP_REQ_POST && ctx->request->payload && ctx->request->payload_size) { + if (https_client_write_all(ctx, ctx->request->payload, ctx->request->payload_size)) { + error("Couldn't write payload into SSL connection"); + rc = 3; + goto err_exit; + } + } + + // Read The Response + if (read_parse_response(ctx)) { + error("Error reading or parsing response from server"); + rc = 4; + goto err_exit; + } + +err_exit: + buffer_free(hdr); + return rc; +} + +static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) +{ + X509 *err_cert; + int err, depth; + char *err_str; + + if (!preverify_ok) { + err = X509_STORE_CTX_get_error(ctx); + depth = X509_STORE_CTX_get_error_depth(ctx); + err_cert = X509_STORE_CTX_get_current_cert(ctx); + err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0); + + error("Cert Chain verify error:num=%d:%s:depth=%d:%s", err, + X509_verify_cert_error_string(err), depth, err_str); + + free(err_str); + } + +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED + if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT) + { + preverify_ok = 1; + error("Self Signed Certificate Accepted as the agent was built with ACLK_SSL_ALLOW_SELF_SIGNED"); + } +#endif + + return preverify_ok; +} + +int https_request(https_req_t *request, https_req_response_t *response) { + int rc = 1, ret; + char connect_port_str[PORT_STR_MAX_BYTES]; + + const char *connect_host = request->proxy_host ? request->proxy_host : request->host; + int connect_port = request->proxy_host ? request->proxy_port : request->port; + struct timeval timeout = { .tv_sec = request->timeout_s, .tv_usec = 0 }; + + https_req_ctx_t *ctx = callocz(1, sizeof(https_req_ctx_t)); + ctx->req_start_time = now_realtime_sec(); + + ctx->buf_rx = rbuf_create(RX_BUFFER_SIZE); + if (!ctx->buf_rx) { + error("Couldn't allocate buffer for RX data"); + goto exit_req_ctx; + } + + snprintfz(connect_port_str, PORT_STR_MAX_BYTES, "%d", connect_port); + + ctx->sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, connect_host, 0, connect_port_str, &timeout); + if (ctx->sock < 0) { + error("Error connecting TCP socket to \"%s\"", connect_host); + goto exit_buf_rx; + } + + if (fcntl(ctx->sock, F_SETFL, fcntl(ctx->sock, F_GETFL, 0) | O_NONBLOCK) == -1) { + error("Error setting O_NONBLOCK to TCP socket."); + goto exit_sock; + } + + ctx->poll_fd.fd = ctx->sock; + + // Do the CONNECT if proxy is used + if (request->proxy_host) { + https_req_t req = HTTPS_REQ_T_INITIALIZER; + req.request_type = HTTP_REQ_CONNECT; + req.timeout_s = request->timeout_s; + req.host = request->host; + req.port = request->port; + req.url = request->url; + ctx->request = &req; + if (handle_http_request(ctx)) { + error("Failed to CONNECT with proxy"); + goto exit_sock; + } + if (ctx->parse_ctx.http_code != 200) { + error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code); + goto exit_sock; + } + info("Proxy accepted CONNECT upgrade"); + } + ctx->request = request; + + ctx->ssl_ctx = security_initialize_openssl_client(); + if (ctx->ssl_ctx==NULL) { + error("Cannot allocate SSL context"); + goto exit_sock; + } + + if (!SSL_CTX_set_default_verify_paths(ctx->ssl_ctx)) { + error("Error setting default verify paths"); + goto exit_CTX; + } + SSL_CTX_set_verify(ctx->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback); + + ctx->ssl = SSL_new(ctx->ssl_ctx); + if (ctx->ssl==NULL) { + error("Cannot allocate SSL"); + goto exit_CTX; + } + + SSL_set_fd(ctx->ssl, ctx->sock); + ret = SSL_connect(ctx->ssl); + if (ret != -1 && ret != 1) { + error("SSL could not connect"); + goto exit_SSL; + } + if (ret == -1) { + // expected as underlying socket is non blocking! + // consult SSL_connect documentation for details + int ec = SSL_get_error(ctx->ssl, ret); + if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) { + error("Failed to start SSL connection"); + goto exit_SSL; + } + } + + // The actual request here + if (handle_http_request(ctx)) { + error("Couldn't process request"); + goto exit_SSL; + } + response->http_code = ctx->parse_ctx.http_code; + if (ctx->parse_ctx.content_length > 0) { + response->payload_size = ctx->parse_ctx.content_length; + response->payload = mallocz(response->payload_size + 1); + ret = rbuf_pop(ctx->buf_rx, response->payload, response->payload_size); + if (ret != (int)response->payload_size) { + error("Payload size doesn't match remaining data on the buffer!"); + response->payload_size = ret; + } + // normally we take payload as it is and copy it + // but for convenience in cases where payload is sth. like + // json we add terminating zero so that user of the data + // doesn't have to convert to C string (0 terminated) + // other uses still have correct payload_size and can copy + // only exact data without affixed 0x00 + ((char*)response->payload)[response->payload_size] = 0; // mallocz(response->payload_size + 1); + } + info("HTTPS \"%s\" request to \"%s\" finished with HTTP code: %d", http_req_type_to_str(ctx->request->request_type), ctx->request->host, response->http_code); + + rc = 0; + +exit_SSL: + SSL_free(ctx->ssl); +exit_CTX: + SSL_CTX_free(ctx->ssl_ctx); +exit_sock: + close(ctx->sock); +exit_buf_rx: + rbuf_free(ctx->buf_rx); +exit_req_ctx: + freez(ctx); + return rc; +} + +void https_req_response_free(https_req_response_t *res) { + freez(res->payload); +} + +void https_req_response_init(https_req_response_t *res) { + res->http_code = 0; + res->payload = NULL; + res->payload_size = 0; +} + +static inline char *UNUSED_FUNCTION(min_non_null)(char *a, char *b) { + if (!a) + return b; + if (!b) + return a; + return (a < b ? a : b); +} + +#define URI_PROTO_SEPARATOR "://" +#define URL_PARSER_LOG_PREFIX "url_parser " + +static int parse_host_port(url_t *url) { + char *ptr = strrchr(url->host, ':'); + if (ptr) { + size_t port_len = strlen(ptr + 1); + if (!port_len) { + error(URL_PARSER_LOG_PREFIX ": specified but no port number"); + return 1; + } + if (port_len > 5 /* MAX port length is 5digit long in decimal */) { + error(URL_PARSER_LOG_PREFIX "port # is too long"); + return 1; + } + *ptr = 0; + if (!strlen(url->host)) { + error(URL_PARSER_LOG_PREFIX "host empty after removing port"); + return 1; + } + url->port = atoi (ptr + 1); + } + return 0; +} + +static inline void port_by_proto(url_t *url) { + if (url->port) + return; + if (!url->proto) + return; + if (!strcmp(url->proto, "http")) { + url->port = 80; + return; + } + if (!strcmp(url->proto, "https")) { + url->port = 443; + return; + } +} + +#define STRDUPZ_2PTR(dest, start, end) \ + { \ + dest = mallocz(1 + end - start); \ + memcpy(dest, start, end - start); \ + dest[end - start] = 0; \ + } + +int url_parse(const char *url, url_t *parsed) { + const char *start = url; + const char *end = strstr(url, URI_PROTO_SEPARATOR); + + if (end) { + if (end == start) { + error (URL_PARSER_LOG_PREFIX "found " URI_PROTO_SEPARATOR " without protocol specified"); + return 1; + } + + STRDUPZ_2PTR(parsed->proto, start, end) + start = end + strlen(URI_PROTO_SEPARATOR); + } + + end = strchr(start, '/'); + if (!end) + end = start + strlen(start); + + if (start == end) { + error(URL_PARSER_LOG_PREFIX "Host empty"); + return 1; + } + + STRDUPZ_2PTR(parsed->host, start, end); + + if (parse_host_port(parsed)) + return 1; + + if (!*end) { + parsed->path = strdupz("/"); + port_by_proto(parsed); + return 0; + } + + parsed->path = strdupz(end); + port_by_proto(parsed); + return 0; +} + +void url_t_destroy(url_t *url) { + freez(url->host); + freez(url->path); + freez(url->proto); +} diff --git a/aclk/https_client.h b/aclk/https_client.h new file mode 100644 index 0000000..f7bc3d4 --- /dev/null +++ b/aclk/https_client.h @@ -0,0 +1,78 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_HTTPS_CLIENT_H +#define NETDATA_HTTPS_CLIENT_H + +#include "libnetdata/libnetdata.h" + +typedef enum http_req_type { + HTTP_REQ_GET = 0, + HTTP_REQ_POST, + HTTP_REQ_CONNECT +} http_req_type_t; + +typedef struct { + http_req_type_t request_type; + + char *host; + int port; + char *url; + + time_t timeout_s; //timeout in seconds for the network operation (send/recv) + + void *payload; + size_t payload_size; + + char *proxy_host; + int proxy_port; +} https_req_t; + +typedef struct { + int http_code; + + void *payload; + size_t payload_size; +} https_req_response_t; + + +// Non feature complete URL parser +// feel free to extend when needed +// currently implements only what ACLK +// needs +// proto://host[:port]/path +typedef struct { + char *proto; + char *host; + int port; + char* path; +} url_t; + +int url_parse(const char *url, url_t *parsed); +void url_t_destroy(url_t *url); + +void https_req_response_free(https_req_response_t *res); +void https_req_response_init(https_req_response_t *res); + +#define HTTPS_REQ_RESPONSE_T_INITIALIZER \ + { \ + .http_code = 0, \ + .payload = NULL, \ + .payload_size = 0 \ + } + +#define HTTPS_REQ_T_INITIALIZER \ + { \ + .request_type = HTTP_REQ_GET, \ + .host = NULL, \ + .port = 443, \ + .url = NULL, \ + .timeout_s = 30, \ + .payload = NULL, \ + .payload_size = 0, \ + .proxy_host = NULL, \ + .proxy_port = 8080 \ + } + +int https_request(https_req_t *request, https_req_response_t *response); + +#endif /* NETDATA_HTTPS_CLIENT_H */ diff --git a/aclk/schema-wrappers/alarm_config.cc b/aclk/schema-wrappers/alarm_config.cc new file mode 100644 index 0000000..56d7e6f --- /dev/null +++ b/aclk/schema-wrappers/alarm_config.cc @@ -0,0 +1,147 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "alarm_config.h" + +#include "proto/alarm/v1/config.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +using namespace alarms::v1; + +void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg) +{ + freez(cfg->alarm); + freez(cfg->tmpl); + freez(cfg->on_chart); + + freez(cfg->classification); + freez(cfg->type); + freez(cfg->component); + + freez(cfg->os); + freez(cfg->hosts); + freez(cfg->plugin); + freez(cfg->module); + freez(cfg->charts); + freez(cfg->families); + freez(cfg->lookup); + freez(cfg->every); + freez(cfg->units); + + freez(cfg->green); + freez(cfg->red); + + freez(cfg->calculation_expr); + freez(cfg->warning_expr); + freez(cfg->critical_expr); + + freez(cfg->recipient); + freez(cfg->exec); + freez(cfg->delay); + freez(cfg->repeat); + freez(cfg->info); + freez(cfg->options); + freez(cfg->host_labels); + + freez(cfg->p_db_lookup_dimensions); + freez(cfg->p_db_lookup_method); + freez(cfg->p_db_lookup_options); +} + +char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data) +{ + ProvideAlarmConfiguration msg; + AlarmConfiguration *cfg = msg.mutable_config(); + + msg.set_config_hash(data->cfg_hash); + + if (data->cfg.alarm) + cfg->set_alarm(data->cfg.alarm); + if (data->cfg.tmpl) + cfg->set_template_(data->cfg.tmpl); + if(data->cfg.on_chart) + cfg->set_on_chart(data->cfg.on_chart); + + if (data->cfg.classification) + cfg->set_classification(data->cfg.classification); + if (data->cfg.type) + cfg->set_type(data->cfg.type); + if (data->cfg.component) + cfg->set_component(data->cfg.component); + + if (data->cfg.os) + cfg->set_os(data->cfg.os); + if (data->cfg.hosts) + cfg->set_hosts(data->cfg.hosts); + if (data->cfg.plugin) + cfg->set_plugin(data->cfg.plugin); + if(data->cfg.module) + cfg->set_module(data->cfg.module); + if(data->cfg.charts) + cfg->set_charts(data->cfg.charts); + if(data->cfg.families) + cfg->set_families(data->cfg.families); + if(data->cfg.lookup) + cfg->set_lookup(data->cfg.lookup); + if(data->cfg.every) + cfg->set_every(data->cfg.every); + if(data->cfg.units) + cfg->set_units(data->cfg.units); + + if (data->cfg.green) + cfg->set_green(data->cfg.green); + if (data->cfg.red) + cfg->set_red(data->cfg.red); + + if (data->cfg.calculation_expr) + cfg->set_calculation_expr(data->cfg.calculation_expr); + if (data->cfg.warning_expr) + cfg->set_warning_expr(data->cfg.warning_expr); + if (data->cfg.critical_expr) + cfg->set_critical_expr(data->cfg.critical_expr); + + if (data->cfg.recipient) + cfg->set_recipient(data->cfg.recipient); + if (data->cfg.exec) + cfg->set_exec(data->cfg.exec); + if (data->cfg.delay) + cfg->set_delay(data->cfg.delay); + if (data->cfg.repeat) + cfg->set_repeat(data->cfg.repeat); + if (data->cfg.info) + cfg->set_info(data->cfg.info); + if (data->cfg.options) + cfg->set_options(data->cfg.options); + if (data->cfg.host_labels) + cfg->set_host_labels(data->cfg.host_labels); + + cfg->set_p_db_lookup_after(data->cfg.p_db_lookup_after); + cfg->set_p_db_lookup_before(data->cfg.p_db_lookup_before); + if (data->cfg.p_db_lookup_dimensions) + cfg->set_p_db_lookup_dimensions(data->cfg.p_db_lookup_dimensions); + if (data->cfg.p_db_lookup_method) + cfg->set_p_db_lookup_method(data->cfg.p_db_lookup_method); + if (data->cfg.p_db_lookup_options) + cfg->set_p_db_lookup_options(data->cfg.p_db_lookup_options); + cfg->set_p_update_every(data->cfg.p_update_every); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (!msg.SerializeToArray(bin, *len)) + return NULL; + + return bin; +} + +char *parse_send_alarm_configuration(const char *data, size_t len) +{ + SendAlarmConfiguration msg; + if (!msg.ParseFromArray(data, len)) + return NULL; + if (!msg.config_hash().c_str()) + return NULL; + return strdupz(msg.config_hash().c_str()); +} + diff --git a/aclk/schema-wrappers/alarm_config.h b/aclk/schema-wrappers/alarm_config.h new file mode 100644 index 0000000..157fbc6 --- /dev/null +++ b/aclk/schema-wrappers/alarm_config.h @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H +#define ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H + +#include <stdlib.h> +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +struct aclk_alarm_configuration { + char *alarm; + char *tmpl; + char *on_chart; + + char *classification; + char *type; + char *component; + + char *os; + char *hosts; + char *plugin; + char *module; + char *charts; + char *families; + char *lookup; + char *every; + char *units; + + char *green; + char *red; + + char *calculation_expr; + char *warning_expr; + char *critical_expr; + + char *recipient; + char *exec; + char *delay; + char *repeat; + char *info; + char *options; + char *host_labels; + + int32_t p_db_lookup_after; + int32_t p_db_lookup_before; + char *p_db_lookup_dimensions; + char *p_db_lookup_method; + char *p_db_lookup_options; + int32_t p_update_every; +}; + +void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg); + +struct provide_alarm_configuration { + char *cfg_hash; + struct aclk_alarm_configuration cfg; +}; + +char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data); +char *parse_send_alarm_configuration(const char *data, size_t len); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H */ diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc new file mode 100644 index 0000000..f643933 --- /dev/null +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -0,0 +1,253 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "alarm_stream.h" + +#include "proto/alarm/v1/stream.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +using namespace alarms::v1; + +struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len) +{ + struct start_alarm_streaming ret; + memset(&ret, 0, sizeof(ret)); + + StartAlarmStreaming msg; + + if (!msg.ParseFromArray(data, len)) + return ret; + + ret.node_id = strdupz(msg.node_id().c_str()); + ret.batch_id = msg.batch_id(); + ret.start_seq_id = msg.start_sequnce_id(); + + return ret; +} + +char *parse_send_alarm_log_health(const char *data, size_t len) +{ + SendAlarmLogHealth msg; + if (!msg.ParseFromArray(data, len)) + return NULL; + return strdupz(msg.node_id().c_str()); +} + +char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data) +{ + AlarmLogHealth msg; + LogEntries *entries; + + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + msg.set_enabled(data->enabled); + + switch (data->status) { + case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE: + msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE); + break; + case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING: + msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING); + break; + case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED: + msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED); + break; + default: + error("Unknown status of AlarmLogHealth LogEntry"); + return NULL; + } + + entries = msg.mutable_log_entries(); + entries->set_first_sequence_id(data->log_entries.first_seq_id); + entries->set_last_sequence_id(data->log_entries.last_seq_id); + + set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when()); + set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when()); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (!msg.SerializeToArray(bin, *len)) + return NULL; + + return bin; +} + +static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status) +{ + switch (status) { + case aclk_alarm_status::ALARM_STATUS_NULL: + return alarms::v1::ALARM_STATUS_NULL; + case aclk_alarm_status::ALARM_STATUS_UNKNOWN: + return alarms::v1::ALARM_STATUS_UNKNOWN; + case aclk_alarm_status::ALARM_STATUS_REMOVED: + return alarms::v1::ALARM_STATUS_REMOVED; + case aclk_alarm_status::ALARM_STATUS_NOT_A_NUMBER: + return alarms::v1::ALARM_STATUS_NOT_A_NUMBER; + case aclk_alarm_status::ALARM_STATUS_CLEAR: + return alarms::v1::ALARM_STATUS_CLEAR; + case aclk_alarm_status::ALARM_STATUS_WARNING: + return alarms::v1::ALARM_STATUS_WARNING; + case aclk_alarm_status::ALARM_STATUS_CRITICAL: + return alarms::v1::ALARM_STATUS_CRITICAL; + default: + error("Unknown alarm status"); + return alarms::v1::ALARM_STATUS_UNKNOWN; + } +} + +void destroy_alarm_log_entry(struct alarm_log_entry *entry) +{ + //freez(entry->node_id); + //freez(entry->claim_id); + + freez(entry->chart); + freez(entry->name); + freez(entry->family); + + freez(entry->config_hash); + + freez(entry->timezone); + + freez(entry->exec_path); + freez(entry->conf_source); + freez(entry->command); + + freez(entry->value_string); + freez(entry->old_value_string); + + freez(entry->rendered_info); + freez(entry->chart_context); +} + +static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto) +{ + proto->set_node_id(data->node_id); + proto->set_claim_id(data->claim_id); + + proto->set_chart(data->chart); + proto->set_name(data->name); + if (data->family) + proto->set_family(data->family); + + proto->set_batch_id(data->batch_id); + proto->set_sequence_id(data->sequence_id); + proto->set_when(data->when); + + proto->set_config_hash(data->config_hash); + + proto->set_utc_offset(data->utc_offset); + proto->set_timezone(data->timezone); + + proto->set_exec_path(data->exec_path); + proto->set_conf_source(data->conf_source); + proto->set_command(data->command); + + proto->set_duration(data->duration); + proto->set_non_clear_duration(data->non_clear_duration); + + + proto->set_status(aclk_alarm_status_to_proto(data->status)); + proto->set_old_status(aclk_alarm_status_to_proto(data->old_status)); + proto->set_delay(data->delay); + proto->set_delay_up_to_timestamp(data->delay_up_to_timestamp); + + proto->set_last_repeat(data->last_repeat); + proto->set_silenced(data->silenced); + + if (data->value_string) + proto->set_value_string(data->value_string); + if (data->old_value_string) + proto->set_old_value_string(data->old_value_string); + + proto->set_value(data->value); + proto->set_old_value(data->old_value); + + proto->set_updated(data->updated); + + proto->set_rendered_info(data->rendered_info); + + proto->set_chart_context(data->chart_context); +} + +char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) +{ + AlarmLogEntry le; + + fill_alarm_log_entry(data, &le); + + *len = PROTO_COMPAT_MSG_SIZE(le); + char *bin = (char*)mallocz(*len); + if (!le.SerializeToArray(bin, *len)) { + freez(bin); + return NULL; + } + + return bin; +} + +struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len) +{ + SendAlarmSnapshot msg; + if (!msg.ParseFromArray(data, len)) + return NULL; + + struct send_alarm_snapshot *ret = (struct send_alarm_snapshot*)callocz(1, sizeof(struct send_alarm_snapshot)); + if (msg.claim_id().c_str()) + ret->claim_id = strdupz(msg.claim_id().c_str()); + if (msg.node_id().c_str()) + ret->node_id = strdupz(msg.node_id().c_str()); + ret->snapshot_id = msg.snapshot_id(); + ret->sequence_id = msg.sequence_id(); + + return ret; +} + +void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr) +{ + freez(ptr->claim_id); + freez(ptr->node_id); + freez(ptr); +} + +alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data) +{ + AlarmSnapshot *msg = new AlarmSnapshot; + if (unlikely(!msg)) fatal("Cannot allocate memory for AlarmSnapshot"); + + msg->set_node_id(data->node_id); + msg->set_claim_id(data->claim_id); + msg->set_snapshot_id(data->snapshot_id); + msg->set_chunks(data->chunks); + msg->set_chunk(data->chunk); + + // this is handled automatically by add_alarm_log_entry2snapshot function + msg->set_chunk_size(0); + + return msg; +} + +void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data) +{ + AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot; + AlarmLogEntry *alarm_log_entry = alarm_snapshot->add_alarms(); + + fill_alarm_log_entry(data, alarm_log_entry); + + alarm_snapshot->set_chunk_size(alarm_snapshot->chunk_size() + 1); +} + +char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot) +{ + AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot; + *len = PROTO_COMPAT_MSG_SIZE_PTR(alarm_snapshot); + char *bin = (char*)mallocz(*len); + if (!alarm_snapshot->SerializeToArray(bin, *len)) { + delete alarm_snapshot; + return NULL; + } + + delete alarm_snapshot; + return bin; +} diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h new file mode 100644 index 0000000..63911da --- /dev/null +++ b/aclk/schema-wrappers/alarm_stream.h @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H +#define ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H + +#include <stdlib.h> + +#include "database/rrd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +enum alarm_log_status_aclk { + ALARM_LOG_STATUS_UNSPECIFIED = 0, + ALARM_LOG_STATUS_RUNNING = 1, + ALARM_LOG_STATUS_IDLE = 2 +}; + +struct alarm_log_entries { + int64_t first_seq_id; + struct timeval first_when; + + int64_t last_seq_id; + struct timeval last_when; +}; + +struct alarm_log_health { + char *claim_id; + char *node_id; + int enabled; + enum alarm_log_status_aclk status; + struct alarm_log_entries log_entries; +}; + +struct start_alarm_streaming { + char *node_id; + uint64_t batch_id; + uint64_t start_seq_id; +}; + +struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len); +char *parse_send_alarm_log_health(const char *data, size_t len); + +char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data); + +enum aclk_alarm_status { + ALARM_STATUS_NULL = 0, + ALARM_STATUS_UNKNOWN = 1, + ALARM_STATUS_REMOVED = 2, + ALARM_STATUS_NOT_A_NUMBER = 3, + ALARM_STATUS_CLEAR = 4, + ALARM_STATUS_WARNING = 5, + ALARM_STATUS_CRITICAL = 6 +}; + +struct alarm_log_entry { + char *node_id; + char *claim_id; + + char *chart; + char *name; + char *family; + + uint64_t batch_id; + uint64_t sequence_id; + uint64_t when; + + char *config_hash; + + int32_t utc_offset; + char *timezone; + + char *exec_path; + char *conf_source; + char *command; + + uint32_t duration; + uint32_t non_clear_duration; + + enum aclk_alarm_status status; + enum aclk_alarm_status old_status; + uint64_t delay; + uint64_t delay_up_to_timestamp; + + uint64_t last_repeat; + int silenced; + + char *value_string; + char *old_value_string; + + double value; + double old_value; + + // updated alarm entry, when the status of the alarm has been updated by a later entry + int updated; + + // rendered_info + char *rendered_info; + + char *chart_context; +}; + +struct send_alarm_snapshot { + char *node_id; + char *claim_id; + uint64_t snapshot_id; + uint64_t sequence_id; +}; + +struct alarm_snapshot { + char *node_id; + char *claim_id; + uint64_t snapshot_id; + uint32_t chunks; + uint32_t chunk; +}; + +typedef void* alarm_snapshot_proto_ptr_t; + +void destroy_alarm_log_entry(struct alarm_log_entry *entry); + +char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data); + +struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len); +void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr); + +alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data); +void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data); +char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H */ diff --git a/aclk/schema-wrappers/capability.cc b/aclk/schema-wrappers/capability.cc new file mode 100644 index 0000000..af45740 --- /dev/null +++ b/aclk/schema-wrappers/capability.cc @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/aclk/v1/lib.pb.h" + +#include "capability.h" + +void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) { + proto_capa->set_name(c_capa->name); + proto_capa->set_enabled(c_capa->enabled); + proto_capa->set_version(c_capa->version); +} diff --git a/aclk/schema-wrappers/capability.h b/aclk/schema-wrappers/capability.h new file mode 100644 index 0000000..c6085a4 --- /dev/null +++ b/aclk/schema-wrappers/capability.h @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_CAPABILITY_H +#define ACLK_SCHEMA_CAPABILITY_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct capability { + const char *name; + uint32_t version; + int enabled; +}; + +#ifdef __cplusplus +} + +#include "proto/aclk/v1/lib.pb.h" + +void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa); +#endif + +#endif /* ACLK_SCHEMA_CAPABILITY_H */ diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc new file mode 100644 index 0000000..20b40ec --- /dev/null +++ b/aclk/schema-wrappers/connection.cc @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/agent/v1/connection.pb.h" +#include "proto/agent/v1/disconnect.pb.h" +#include "connection.h" + +#include "schema_wrapper_utils.h" + +#include <sys/time.h> +#include <stdlib.h> + +using namespace agent::v1; + +char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data) +{ + UpdateAgentConnection connupd; + + connupd.set_claim_id(data->claim_id); + connupd.set_reachable(data->reachable); + connupd.set_session_id(data->session_id); + + connupd.set_update_source((data->lwt) ? CONNECTION_UPDATE_SOURCE_LWT : CONNECTION_UPDATE_SOURCE_AGENT); + + struct timeval tv; + gettimeofday(&tv, NULL); + + google::protobuf::Timestamp *timestamp = connupd.mutable_updated_at(); + timestamp->set_seconds(tv.tv_sec); + timestamp->set_nanos(tv.tv_usec * 1000); + + if (data->capabilities) { + const struct capability *capa = data->capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + + *len = PROTO_COMPAT_MSG_SIZE(connupd); + char *msg = (char*)mallocz(*len); + if (msg) + connupd.SerializeToArray(msg, *len); + + return msg; +} + +struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) { + DisconnectReq req; + struct disconnect_cmd *res; + + if (!req.ParseFromArray(data, len)) + return NULL; + + res = (struct disconnect_cmd *)callocz(1, sizeof(struct disconnect_cmd)); + + if (!res) + return NULL; + + res->reconnect_after_s = req.reconnect_after_seconds(); + res->permaban = req.permaban(); + res->error_code = req.error_code(); + if (req.error_description().c_str()) { + res->error_description = strdupz(req.error_description().c_str()); + if (!res->error_description) { + freez(res); + return NULL; + } + } + + return res; +} diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h new file mode 100644 index 0000000..0356c7d --- /dev/null +++ b/aclk/schema-wrappers/connection.h @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H +#define ACLK_SCHEMA_WRAPPER_CONNECTION_H + +#include "capability.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char *claim_id; + unsigned int reachable:1; + + int64_t session_id; + + unsigned int lwt:1; + + const struct capability *capabilities; + +// TODO in future optional fields +// > 15 optional fields: +// How long the system was running until connection (only applicable when reachable=true) +// google.protobuf.Duration system_uptime = 15; +// How long the netdata agent was running until connection (only applicable when reachable=true) +// google.protobuf.Duration agent_uptime = 16; + + +} update_agent_connection_t; + +char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data); + +struct disconnect_cmd { + uint64_t reconnect_after_s; + int permaban; + uint32_t error_code; + char *error_description; +}; + +struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONNECTION_H */ diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc new file mode 100644 index 0000000..b04c9d2 --- /dev/null +++ b/aclk/schema-wrappers/context.cc @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/context.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "schema_wrapper_utils.h" + +#include "context.h" + +using namespace context::v1; + +// ContextsSnapshot +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version) +{ + ContextsSnapshot *ctxs_snap = new ContextsSnapshot; + + if (ctxs_snap == NULL) + fatal("Cannot allocate ContextsSnapshot object. OOM"); + + ctxs_snap->set_claim_id(claim_id); + ctxs_snap->set_node_id(node_id); + ctxs_snap->set_version(version); + + return ctxs_snap; +} + +void contexts_snapshot_delete(contexts_snapshot_t snapshot) +{ + delete (ContextsSnapshot *)snapshot; +} + +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version) +{ + ((ContextsSnapshot *)ctxs_snapshot)->set_version(version); +} + +static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx) +{ + ctx->set_id(c_ctx->id); + ctx->set_version(c_ctx->version); + ctx->set_first_entry(c_ctx->first_entry); + ctx->set_last_entry(c_ctx->last_entry); + ctx->set_deleted(c_ctx->deleted); + ctx->set_title(c_ctx->title); + ctx->set_priority(c_ctx->priority); + ctx->set_chart_type(c_ctx->chart_type); + ctx->set_units(c_ctx->units); + ctx->set_family(c_ctx->family); +} + +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + ContextUpdated *ctx = ctxs_snap->add_contexts(); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len) +{ + ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap); + char *bin = (char*)mallocz(*len); + if (!ctxs_snap->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_snap; + return NULL; + } + + delete ctxs_snap; + return bin; +} + +// ContextsUpdated +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at) +{ + ContextsUpdated *ctxs_updated = new ContextsUpdated; + + if (ctxs_updated == NULL) + fatal("Cannot allocate ContextsUpdated object. OOM"); + + ctxs_updated->set_claim_id(claim_id); + ctxs_updated->set_node_id(node_id); + ctxs_updated->set_version_hash(version_hash); + ctxs_updated->set_created_at(created_at); + + return ctxs_updated; +} + +void contexts_updated_delete(contexts_updated_t ctxs_updated) +{ + delete (ContextsUpdated *)ctxs_updated; +} + +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash) +{ + ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash); +} + +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + ContextUpdated *ctx = ctxs_update->add_contextupdates(); + + if (ctx == NULL) + fatal("Cannot allocate ContextUpdated object. OOM"); + + fill_ctx_updated(ctx, ctx_update); +} + +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len) +{ + ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated; + *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update); + char *bin = (char*)mallocz(*len); + if (!ctxs_update->SerializeToArray(bin, *len)) { + freez(bin); + delete ctxs_update; + return NULL; + } + + delete ctxs_update; + return bin; +} diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h new file mode 100644 index 0000000..cbb7701 --- /dev/null +++ b/aclk/schema-wrappers/context.h @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_H + +#include <stdint.h> +#include <sys/types.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* contexts_updated_t; +typedef void* contexts_snapshot_t; + +struct context_updated { + // context id + const char *id; + + uint64_t version; + + uint64_t first_entry; + uint64_t last_entry; + + int deleted; + + const char *title; + uint64_t priority; + const char *chart_type; + const char *units; + const char *family; +}; + +// ContextS Snapshot related +contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version); +void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot); +void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version); +void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update); +char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len); + +// ContextS Updated related +contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at); +void contexts_updated_delete(contexts_updated_t ctxs_updated); +void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash); +void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update); +char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */ diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc new file mode 100644 index 0000000..3bb1956 --- /dev/null +++ b/aclk/schema-wrappers/context_stream.cc @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/context/v1/stream.pb.h" + +#include "context_stream.h" + +#include "libnetdata/libnetdata.h" + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len) +{ + context::v1::StopStreamingContexts msg; + + struct stop_streaming_ctxs *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + + return res; +} + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len) +{ + context::v1::ContextsCheckpoint msg; + + struct ctxs_checkpoint *res; + + if (!msg.ParseFromArray(data, len)) + return NULL; + + res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint)); + + res->claim_id = strdupz(msg.claim_id().c_str()); + res->node_id = strdupz(msg.node_id().c_str()); + res->version_hash = msg.version_hash(); + + return res; +} diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h new file mode 100644 index 0000000..8c691d2 --- /dev/null +++ b/aclk/schema-wrappers/context_stream.h @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H +#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H + +#ifdef __cplusplus +extern "C" { +#endif + +struct stop_streaming_ctxs { + char *claim_id; + char *node_id; + // we omit reason as there is only one defined at this point + // as soon as there is more than one defined in StopStreaminContextsReason + // we should add it + // 0 - RATE_LIMIT_EXCEEDED +}; + +struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len); + +struct ctxs_checkpoint { + char *claim_id; + char *node_id; + + uint64_t version_hash; +}; + +struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len); + + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */ diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc new file mode 100644 index 0000000..db1fa64 --- /dev/null +++ b/aclk/schema-wrappers/node_connection.cc @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/nodeinstance/connection/v1/connection.pb.h" +#include "node_connection.h" + +#include "schema_wrapper_utils.h" + +#include <sys/time.h> +#include <stdlib.h> + +char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data) { + nodeinstance::v1::UpdateNodeInstanceConnection msg; + + if(data->claim_id) + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + + msg.set_liveness(data->live); + msg.set_queryable(data->queryable); + + msg.set_session_id(data->session_id); + msg.set_hops(data->hops); + + struct timeval tv; + gettimeofday(&tv, NULL); + + google::protobuf::Timestamp *timestamp = msg.mutable_updated_at(); + timestamp->set_seconds(tv.tv_sec); + timestamp->set_nanos(tv.tv_usec * 1000); + + if (data->capabilities) { + const struct capability *capa = data->capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h new file mode 100644 index 0000000..dac0d8f --- /dev/null +++ b/aclk/schema-wrappers/node_connection.h @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H +#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H + +#include "capability.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char* claim_id; + const char* node_id; + + unsigned int live:1; + unsigned int queryable:1; + + int64_t session_id; + + int32_t hops; + const struct capability *capabilities; +} node_instance_connection_t; + +char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H */ diff --git a/aclk/schema-wrappers/node_creation.cc b/aclk/schema-wrappers/node_creation.cc new file mode 100644 index 0000000..5ad25b7 --- /dev/null +++ b/aclk/schema-wrappers/node_creation.cc @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/nodeinstance/create/v1/creation.pb.h" +#include "node_creation.h" + +#include "schema_wrapper_utils.h" + +#include <stdlib.h> + +char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data) +{ + nodeinstance::create::v1::CreateNodeInstance msg; + + if (data->claim_id) + msg.set_claim_id(data->claim_id); + msg.set_machine_guid(data->machine_guid); + msg.set_hostname(data->hostname); + msg.set_hops(data->hops); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} + +node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len) +{ + nodeinstance::create::v1::CreateNodeInstanceResult msg; + node_instance_creation_result_t res = { .node_id = NULL, .machine_guid = NULL }; + + if (!msg.ParseFromArray(data, len)) + return res; + + res.node_id = strdupz(msg.node_id().c_str()); + res.machine_guid = strdupz(msg.machine_guid().c_str()); + return res; +} diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h new file mode 100644 index 0000000..7a8c7f7 --- /dev/null +++ b/aclk/schema-wrappers/node_creation.h @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_CREATION_H +#define ACLK_SCHEMA_WRAPPER_NODE_CREATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char *claim_id; + const char *machine_guid; + const char *hostname; + + int32_t hops; +} node_instance_creation_t; + +typedef struct { + char *node_id; + char *machine_guid; +} node_instance_creation_result_t; + +char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data); +node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len); + + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_CREATION_H */ diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc new file mode 100644 index 0000000..5e321f6 --- /dev/null +++ b/aclk/schema-wrappers/node_info.cc @@ -0,0 +1,136 @@ +#include "node_info.h" + +#include "proto/nodeinstance/info/v1/info.pb.h" + +#include "schema_wrapper_utils.h" + +static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data) +{ + google::protobuf::Map<std::string, std::string> *map; + + if (data->name) + info->set_name(data->name); + + if (data->os) + info->set_os(data->os); + if (data->os_name) + info->set_os_name(data->os_name); + if (data->os_version) + info->set_os_version(data->os_version); + + if (data->kernel_name) + info->set_kernel_name(data->kernel_name); + if (data->kernel_version) + info->set_kernel_version(data->kernel_version); + + if (data->architecture) + info->set_architecture(data->architecture); + + info->set_cpus(data->cpus); + + if (data->cpu_frequency) + info->set_cpu_frequency(data->cpu_frequency); + + if (data->memory) + info->set_memory(data->memory); + + if (data->disk_space) + info->set_disk_space(data->disk_space); + + if (data->version) + info->set_version(data->version); + + if (data->release_channel) + info->set_release_channel(data->release_channel); + + if (data->timezone) + info->set_timezone(data->timezone); + + if (data->virtualization_type) + info->set_virtualization_type(data->virtualization_type); + + if (data->container_type) + info->set_container_type(data->container_type); + + if (data->custom_info) + info->set_custom_info(data->custom_info); + + if (data->machine_guid) + info->set_machine_guid(data->machine_guid); + + nodeinstance::info::v1::MachineLearningInfo *ml_info = info->mutable_ml_info(); + ml_info->set_ml_capable(data->ml_info.ml_capable); + ml_info->set_ml_enabled(data->ml_info.ml_enabled); + + map = info->mutable_host_labels(); + rrdlabels_walkthrough_read(data->host_labels_ptr, label_add_to_map_callback, map); + return 0; +} + +char *generate_update_node_info_message(size_t *len, struct update_node_info *info) +{ + nodeinstance::info::v1::UpdateNodeInfo msg; + + msg.set_node_id(info->node_id); + msg.set_claim_id(info->claim_id); + + if (generate_node_info(msg.mutable_data(), &info->data)) + return NULL; + + set_google_timestamp_from_timeval(info->updated_at, msg.mutable_updated_at()); + msg.set_machine_guid(info->machine_guid); + msg.set_child(info->child); + + nodeinstance::info::v1::MachineLearningInfo *ml_info = msg.mutable_ml_info(); + ml_info->set_ml_capable(info->ml_info.ml_capable); + ml_info->set_ml_enabled(info->ml_info.ml_enabled); + + struct capability *capa; + if (info->node_capabilities) { + capa = info->node_capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.mutable_node_info()->add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + if (info->node_instance_capabilities) { + capa = info->node_instance_capabilities; + while (capa->name) { + aclk_lib::v1::Capability *proto_capa = msg.mutable_node_instance_info()->add_capabilities(); + capability_set(proto_capa, capa); + capa++; + } + } + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} + +char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *upd_node_collectors) +{ + nodeinstance::info::v1::UpdateNodeCollectors msg; + + msg.set_node_id(upd_node_collectors->node_id); + msg.set_claim_id(upd_node_collectors->claim_id); + + void *colls; + dfe_start_read(upd_node_collectors->node_collectors, colls) { + struct collector_info *c =(struct collector_info *)colls; + nodeinstance::info::v1::CollectorInfo *col = msg.add_collectors(); + col->set_plugin(c->plugin); + col->set_module(c->module); + } + dfe_done(colls); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (bin) + msg.SerializeToArray(bin, *len); + + return bin; +} diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h new file mode 100644 index 0000000..de4ade7 --- /dev/null +++ b/aclk/schema-wrappers/node_info.h @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPER_NODE_INFO_H +#define ACLK_SCHEMA_WRAPPER_NODE_INFO_H + +#include <stdlib.h> +#include <stdint.h> + +#include "capability.h" +#include "database/rrd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct machine_learning_info { + bool ml_capable; + bool ml_enabled; +}; + +struct aclk_node_info { + const char *name; + + const char *os; + const char *os_name; + const char *os_version; + const char *kernel_name; + const char *kernel_version; + const char *architecture; + uint32_t cpus; + const char *cpu_frequency; + const char *memory; + const char *disk_space; + const char *version; + const char *release_channel; + const char *timezone; + const char *virtualization_type; + const char *container_type; + const char *custom_info; + const char *machine_guid; + + DICTIONARY *host_labels_ptr; + struct machine_learning_info ml_info; +}; + +struct update_node_info { + char *node_id; + char *claim_id; + struct aclk_node_info data; + struct timeval updated_at; + char *machine_guid; + int child; + + struct machine_learning_info ml_info; + + struct capability *node_capabilities; + struct capability *node_instance_capabilities; +}; + +struct collector_info { + const char *module; + const char *plugin; +}; + +struct update_node_collectors { + char *claim_id; + char *node_id; + DICTIONARY *node_collectors; +}; + +char *generate_update_node_info_message(size_t *len, struct update_node_info *info); + +char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *collectors); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPER_NODE_INFO_H */ diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc new file mode 100644 index 0000000..8853b2e --- /dev/null +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -0,0 +1,85 @@ +#include <google/protobuf/message.h> +#include <google/protobuf/util/json_util.h> + +#include "proto/alarm/v1/config.pb.h" +#include "proto/alarm/v1/stream.pb.h" +#include "proto/aclk/v1/lib.pb.h" +#include "proto/agent/v1/connection.pb.h" +#include "proto/agent/v1/disconnect.pb.h" +#include "proto/nodeinstance/connection/v1/connection.pb.h" +#include "proto/nodeinstance/create/v1/creation.pb.h" +#include "proto/nodeinstance/info/v1/info.pb.h" +#include "proto/context/v1/stream.pb.h" +#include "proto/context/v1/context.pb.h" + +#include "libnetdata/libnetdata.h" + +#include "proto_2_json.h" + +using namespace google::protobuf::util; + +static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) +{ +//tx side + if (!strcmp(msgname, "UpdateAgentConnection")) + return new agent::v1::UpdateAgentConnection; + if (!strcmp(msgname, "UpdateNodeInstanceConnection")) + return new nodeinstance::v1::UpdateNodeInstanceConnection; + if (!strcmp(msgname, "CreateNodeInstance")) + return new nodeinstance::create::v1::CreateNodeInstance; + if (!strcmp(msgname, "UpdateNodeInfo")) + return new nodeinstance::info::v1::UpdateNodeInfo; + if (!strcmp(msgname, "AlarmLogHealth")) + return new alarms::v1::AlarmLogHealth; + if (!strcmp(msgname, "ProvideAlarmConfiguration")) + return new alarms::v1::ProvideAlarmConfiguration; + if (!strcmp(msgname, "AlarmSnapshot")) + return new alarms::v1::AlarmSnapshot; + if (!strcmp(msgname, "AlarmLogEntry")) + return new alarms::v1::AlarmLogEntry; + if (!strcmp(msgname, "UpdateNodeCollectors")) + return new nodeinstance::info::v1::UpdateNodeCollectors; + if (!strcmp(msgname, "ContextsUpdated")) + return new context::v1::ContextsUpdated; + if (!strcmp(msgname, "ContextsSnapshot")) + return new context::v1::ContextsSnapshot; + +//rx side + if (!strcmp(msgname, "CreateNodeInstanceResult")) + return new nodeinstance::create::v1::CreateNodeInstanceResult; + if (!strcmp(msgname, "SendNodeInstances")) + return new agent::v1::SendNodeInstances; + if (!strcmp(msgname, "StartAlarmStreaming")) + return new alarms::v1::StartAlarmStreaming; + if (!strcmp(msgname, "SendAlarmLogHealth")) + return new alarms::v1::SendAlarmLogHealth; + if (!strcmp(msgname, "SendAlarmConfiguration")) + return new alarms::v1::SendAlarmConfiguration; + if (!strcmp(msgname, "SendAlarmSnapshot")) + return new alarms::v1::SendAlarmSnapshot; + if (!strcmp(msgname, "DisconnectReq")) + return new agent::v1::DisconnectReq; + if (!strcmp(msgname, "ContextsCheckpoint")) + return new context::v1::ContextsCheckpoint; + if (!strcmp(msgname, "StopStreamingContexts")) + return new context::v1::StopStreamingContexts; + + return NULL; +} + +char *protomsg_to_json(const void *protobin, size_t len, const char *msgname) +{ + google::protobuf::Message *msg = msg_name_to_protomsg(msgname); + if (msg == NULL) + return strdupz("Don't know this message type by name."); + + if (!msg->ParseFromArray(protobin, len)) + return strdupz("Can't parse this message. Malformed or wrong parser used."); + + JsonPrintOptions options; + + std::string output; + google::protobuf::util::MessageToJsonString(*msg, &output, options); + delete msg; + return strdupz(output.c_str()); +} diff --git a/aclk/schema-wrappers/proto_2_json.h b/aclk/schema-wrappers/proto_2_json.h new file mode 100644 index 0000000..3bd9847 --- /dev/null +++ b/aclk/schema-wrappers/proto_2_json.h @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef PROTO_2_JSON_H +#define PROTO_2_JSON_H + +#include <sys/types.h> + +#ifdef __cplusplus +extern "C" { +#endif + +char *protomsg_to_json(const void *protobin, size_t len, const char *msgname); + +#ifdef __cplusplus +} +#endif + +#endif /* PROTO_2_JSON_H */ diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/aclk/schema-wrappers/schema_wrapper_utils.cc new file mode 100644 index 0000000..6573e62 --- /dev/null +++ b/aclk/schema-wrappers/schema_wrapper_utils.cc @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "schema_wrapper_utils.h" + +void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts) +{ + ts->set_nanos(tv.tv_usec*1000); + ts->set_seconds(tv.tv_sec); +} + +void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv) +{ + tv->tv_sec = ts.seconds(); + tv->tv_usec = ts.nanos()/1000; +} + +int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + (void)ls; + auto map = (google::protobuf::Map<std::string, std::string> *)data; + map->insert({name, value}); + return 1; +} diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h new file mode 100644 index 0000000..2815d0f --- /dev/null +++ b/aclk/schema-wrappers/schema_wrapper_utils.h @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef SCHEMA_WRAPPER_UTILS_H +#define SCHEMA_WRAPPER_UTILS_H + +#include "database/rrd.h" + +#include <sys/time.h> +#include <google/protobuf/timestamp.pb.h> +#include <google/protobuf/map.h> + +#if GOOGLE_PROTOBUF_VERSION < 3001000 +#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize(); +#define PROTO_COMPAT_MSG_SIZE_PTR(msg) (size_t)msg->ByteSize(); +#else +#define PROTO_COMPAT_MSG_SIZE(msg) msg.ByteSizeLong(); +#define PROTO_COMPAT_MSG_SIZE_PTR(msg) msg->ByteSizeLong(); +#endif + +void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts); +void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv); +int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data); + +#endif /* SCHEMA_WRAPPER_UTILS_H */ diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h new file mode 100644 index 0000000..a96f7ea --- /dev/null +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +// utility header to include all the message wrappers at once + +#ifndef SCHEMA_WRAPPERS_H +#define SCHEMA_WRAPPERS_H + +#include "connection.h" +#include "node_connection.h" +#include "node_creation.h" +#include "alarm_config.h" +#include "alarm_stream.h" +#include "node_info.h" +#include "capability.h" +#include "context_stream.h" +#include "context.h" + +#endif /* SCHEMA_WRAPPERS_H */ |