diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-03-31 12:59:21 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-03-31 12:59:21 +0000 |
commit | bb8713bbc1c4594366fc735c04910edbf4c61aab (patch) | |
tree | d7da56c0b89aa371dd8ad986995dd145fdf6670a /aclk | |
parent | Releasing debian version 1.29.3-4. (diff) | |
download | netdata-bb8713bbc1c4594366fc735c04910edbf4c61aab.tar.xz netdata-bb8713bbc1c4594366fc735c04910edbf4c61aab.zip |
Merging upstream version 1.30.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk.c | 821 | ||||
-rw-r--r-- | aclk/aclk.h | 100 | ||||
-rw-r--r-- | aclk/aclk_collector_list.c | 193 | ||||
-rw-r--r-- | aclk/aclk_collector_list.h | 39 | ||||
-rw-r--r-- | aclk/aclk_otp.c | 261 | ||||
-rw-r--r-- | aclk/aclk_otp.h | 10 | ||||
-rw-r--r-- | aclk/aclk_query.c | 295 | ||||
-rw-r--r-- | aclk/aclk_query.h | 32 | ||||
-rw-r--r-- | aclk/aclk_query_queue.c | 128 | ||||
-rw-r--r-- | aclk/aclk_query_queue.h | 71 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 343 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.h | 14 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 274 | ||||
-rw-r--r-- | aclk/aclk_stats.h | 64 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.c | 395 | ||||
-rw-r--r-- | aclk/aclk_tx_msgs.h | 24 | ||||
-rw-r--r-- | aclk/aclk_util.c | 347 | ||||
-rw-r--r-- | aclk/aclk_util.h | 52 | ||||
-rw-r--r-- | aclk/https_client.c | 246 | ||||
-rw-r--r-- | aclk/https_client.h | 11 | ||||
-rw-r--r-- | aclk/legacy/aclk_common.c | 1 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_https_client.c | 4 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.c | 4 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.c | 60 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.h | 4 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.c | 36 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.c | 125 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.h | 13 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.c | 3 |
29 files changed, 3951 insertions, 19 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c new file mode 100644 index 000000000..889fa1e4d --- /dev/null +++ b/aclk/aclk.c @@ -0,0 +1,821 @@ +#include "aclk.h" + +#include "aclk_stats.h" +#include "mqtt_wss_client.h" +#include "aclk_otp.h" +#include "aclk_tx_msgs.h" +#include "aclk_query.h" +#include "aclk_query_queue.h" +#include "aclk_util.h" +#include "aclk_rx_msgs.h" +#include "aclk_collector_list.h" + +#ifdef ACLK_LOG_CONVERSATION_DIR +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#endif + +#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable + +//TODO remove most (as in 99.999999999%) of this crap +int aclk_connected = 0; +int aclk_disable_runtime = 0; +int aclk_disable_single_updates = 0; +int aclk_kill_link = 0; + +int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. + +usec_t aclk_session_us = 0; // Used by the mqtt layer +time_t aclk_session_sec = 0; // Used by the mqtt layer + +mqtt_wss_client mqttwss_client; + +netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +struct aclk_shared_state aclk_shared_state = { + .agent_state = AGENT_INITIALIZING, + .last_popcorn_interrupt = 0, + .version_neg = 0, + .version_neg_wait_till = 0, + .mqtt_shutdown_msg_id = -1, + .mqtt_shutdown_msg_rcvd = 0 +}; + +void aclk_single_update_disable() +{ + aclk_disable_single_updates = 1; +} + +void aclk_single_update_enable() +{ + aclk_disable_single_updates = 0; +} + +//ENDTODO + +static RSA *aclk_private_key = NULL; +static int load_private_key() +{ + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + aclk_private_key = NULL; + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); + + long bytes_read; + char *private_key = read_by_filename(filename, &bytes_read); + if (!private_key) { + error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + return 1; + } + debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + + BIO *key_bio = BIO_new_mem_buf(private_key, -1); + if (key_bio==NULL) { + error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + goto biofailed; + } + + aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); + BIO_free(key_bio); + if (aclk_private_key!=NULL) + { + freez(private_key); + return 0; + } + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + +biofailed: + freez(private_key); + return 1; +} + +static int wait_till_cloud_enabled() +{ + info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_setting) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + return 1; + } + return 0; +} + +/** + * Will block until agent is claimed. Returns only if agent claimed + * or if agent needs to shutdown. + * + * @return `0` if agent has been claimed, + * `1` if interrupted due to agent shutting down + */ +static int wait_till_agent_claimed(void) +{ + //TODO prevent malloc and freez + char *agent_id = is_agent_claimed(); + while (likely(!agent_id)) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + return 1; + agent_id = is_agent_claimed(); + } + freez(agent_id); + return 0; +} + +/** + * Checks everything is ready for connection + * agent claimed, cloud url set and private key available + * + * @param aclk_hostname points to location where string pointer to hostname will be set + * @param ackl_port port to int where port will be saved + * + * @return If non 0 returned irrecoverable error happened and ACLK should be terminated + */ +static int wait_till_agent_claim_ready() +{ + int port; + char *hostname = NULL; + while (!netdata_exit) { + if (wait_till_agent_claimed()) + return 1; + + // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. + // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); + return 1; + } + + // We just check configuration is valid here + // TODO make it without malloc/free + if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) { + error("Agent is claimed but the configuration is invalid, please fix"); + freez(hostname); + hostname = NULL; + sleep(5); + continue; + } + freez(hostname); + hostname = NULL; + + if (!load_private_key()) { + sleep(5); + break; + } + } + + return 0; +} + +void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) +{ + switch(log_type) { + case MQTT_WSS_LOG_ERROR: + case MQTT_WSS_LOG_FATAL: + case MQTT_WSS_LOG_WARN: + error("%s", str); + return; + case MQTT_WSS_LOG_INFO: + info("%s", str); + return; + case MQTT_WSS_LOG_DEBUG: + debug(D_ACLK, "%s", str); + return; + default: + error("Unknown log type from mqtt_wss"); + } +} + +//TODO prevent big buffer on stack +#define RX_MSGLEN_MAX 4096 +static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) +{ + char cmsg[RX_MSGLEN_MAX]; + size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); + + if (msglen > RX_MSGLEN_MAX - 1) + error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); + + memcpy(cmsg, + msg, + len); + cmsg[len] = 0; + +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 512 + char filename[FN_MAX_LEN]; + int logfd; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT()); + logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR ); + if(logfd < 0) + error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename); + write(logfd, msg, msglen); + close(logfd); +#endif + + debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg); + + if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic)) + error("Received message on unexpected topic %s", topic); + + if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { + error("Link is shutting down. Ignoring message."); + return; + } + + aclk_handle_cloud_message(cmsg); +} + +static void puback_callback(uint16_t packet_id) +{ + if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) + aclk_reconnect_delay(0); + +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_puback(packet_id); +#endif + + if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { + error("Got PUBACK for shutdown message. Can exit gracefully."); + aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; + } +} + +static int read_query_thread_count() +{ + int threads = MIN(processors/2, 6); + threads = MAX(threads, 2); + threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + if(threads < 1) { + error("You need at least one query thread. Overriding configured setting of \"%d\"", threads); + threads = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads); + } + return threads; +} + +/* Keeps connection alive and handles all network comms. + * Returns on error or when netdata is shutting down. + * @param client instance of mqtt_wss_client + * @returns 0 - Netdata Exits + * >0 - Error happened. Reconnect and start over. + */ +static int handle_connection(mqtt_wss_client client) +{ + time_t last_periodic_query_wakeup = now_monotonic_sec(); + while (!netdata_exit) { + // timeout 1000 to check at least once a second + // for netdata_exit + if (mqtt_wss_service(client, 1000) < 0){ + error("Connection Error or Dropped"); + return 1; + } + + // mqtt_wss_service will return faster than in one second + // if there is enough work to do + time_t now = now_monotonic_sec(); + if (last_periodic_query_wakeup < now) { + // wake up at least one Query Thread at least + // once per second + last_periodic_query_wakeup = now; + QUERY_THREAD_WAKEUP; + } + } + return 0; +} + +inline static int aclk_popcorn_check_bump() +{ + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); + ACLK_SHARED_STATE_UNLOCK; + return 1; + } + ACLK_SHARED_STATE_UNLOCK; + return 0; +} + +static inline void queue_connect_payloads(void) +{ + aclk_query_t query = aclk_query_new(METADATA_INFO); + query->data.metadata_info.host = localhost; + query->data.metadata_info.initial_on_connect = 1; + aclk_queue_query(query); + query = aclk_query_new(METADATA_ALARMS); + query->data.metadata_alarms.initial_on_connect = 1; + aclk_queue_query(query); +} + +static inline void mqtt_connected_actions(mqtt_wss_client client) +{ + // TODO global vars? + usec_t now = now_realtime_usec(); + aclk_session_sec = now / USEC_PER_SEC; + aclk_session_us = now % USEC_PER_SEC; + + mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1); + + aclk_stats_upd_online(1); + aclk_connected = 1; + aclk_pubacks_per_conn = 0; + aclk_hello_msg(client); + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { + error("Sending `connect` payload immediatelly as popcorning was finished already."); + queue_connect_payloads(); + } + ACLK_SHARED_STATE_UNLOCK; +} + +/* Waits until agent is ready or needs to exit + * @param client instance of mqtt_wss_client + * @param query_threads pointer to aclk_query_threads + * structure where to store data about started query threads + * @return 0 - Popcorning Finished - Agent STABLE, + * !0 - netdata_exit + */ +static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads) +{ + time_t elapsed; + int need_wait; + while (!netdata_exit) { + ACLK_SHARED_STATE_LOCK; + if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; + return 0; + } + elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; + if (elapsed >= ACLK_STABLE_TIMEOUT) { + aclk_shared_state.agent_state = AGENT_STABLE; + ACLK_SHARED_STATE_UNLOCK; + error("ACLK localhost popocorn finished"); + if (unlikely(!query_threads->thread_list)) + aclk_query_threads_start(query_threads, client); + queue_connect_payloads(); + return 0; + } + ACLK_SHARED_STATE_UNLOCK; + need_wait = ACLK_STABLE_TIMEOUT - elapsed; + error("ACLK localhost popocorn wait %d seconds longer", need_wait); + sleep(need_wait); + } + return 1; +} + +void aclk_graceful_disconnect(mqtt_wss_client client) +{ + error("Preparing to Gracefully Shutdown the ACLK"); + aclk_queue_lock(); + aclk_queue_flush(); + aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful"); + time_t t = now_monotonic_sec(); + while (!mqtt_wss_service(client, 100)) { + if (now_monotonic_sec() - t >= 2) { + error("Wasn't able to gracefully shutdown ACLK in time!"); + break; + } + if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { + error("MQTT App Layer `disconnect` message sent successfully"); + break; + } + } + aclk_stats_upd_online(0); + aclk_connected = 0; + + error("Attempting to Gracefully Shutdown MQTT/WSS connection"); + mqtt_wss_disconnect(client, 1000); +} + +/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled + * @return 0 - Go ahead and connect (delay expired) + * 1 - netdata_exit + */ +#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4) +static int aclk_block_till_recon_allowed() { + // Handle reconnect exponential backoff + // fnc aclk_reconnect_delay comes from ACLK Legacy @amoss + // but has been modifed slightly (more randomness) + unsigned long recon_delay = aclk_reconnect_delay(1); + info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC); + // we want to wake up from time to time to check netdata_exit + while (recon_delay) + { + if (netdata_exit) + return 1; + if (recon_delay > NETDATA_EXIT_POLL_MS) { + sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS); + recon_delay -= NETDATA_EXIT_POLL_MS; + continue; + } + sleep_usec(recon_delay * USEC_PER_MS); + recon_delay = 0; + } + return 0; +} + +#define HTTP_PROXY_PREFIX "http://" +static void set_proxy(struct mqtt_wss_proxy *out) +{ + ACLK_PROXY_TYPE pt; + const char *ptr = aclk_get_proxy(&pt); + char *tmp; + char *host; + if (pt != PROXY_TYPE_HTTP) + return; + + out->port = 0; + + if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX))) + ptr += strlen(HTTP_PROXY_PREFIX); + + if ((tmp = strchr(ptr, '@'))) + ptr = tmp; + + if ((tmp = strchr(ptr, '/'))) { + host = mallocz((tmp - ptr) + 1); + memcpy(host, ptr, (tmp - ptr)); + host[tmp - ptr] = 0; + } else + host = strdupz(ptr); + + if ((tmp = strchr(host, ':'))) { + *tmp = 0; + tmp++; + out->port = atoi(tmp); + } + + if (out->port <= 0 || out->port > 65535) + out->port = 8080; + + out->host = host; + + out->type = MQTT_WSS_PROXY_HTTP; +} + +/* Attempts to make a connection to MQTT broker over WSS + * @param client instance of mqtt_wss_client + * @return 0 - Successfull Connection, + * <0 - Irrecoverable Error -> Kill ACLK, + * >0 - netdata_exit + */ +#define CLOUD_BASE_URL_READ_RETRY 30 +#ifdef ACLK_SSL_ALLOW_SELF_SIGNED +#define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED +#else +#define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL +#endif +static int aclk_attempt_to_connect(mqtt_wss_client client) +{ + char *aclk_hostname = NULL; + int aclk_port; + +#ifndef ACLK_DISABLE_CHALLENGE + char *mqtt_otp_user = NULL; + char *mqtt_otp_pass = NULL; +#endif + + json_object *lwt; + + while (!netdata_exit) { + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); + return -1; + } + + if (aclk_block_till_recon_allowed()) + return 1; + + info("Attempting connection now"); + if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) { + error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY); + sleep(CLOUD_BASE_URL_READ_RETRY); + continue; + } + + struct mqtt_wss_proxy proxy_conf; + proxy_conf.type = MQTT_WSS_DIRECT; + set_proxy(&proxy_conf); + + struct mqtt_connect_params mqtt_conn_params = { + .clientid = "anon", + .username = "anon", + .password = "anon", + .will_topic = aclk_get_topic(ACLK_TOPICID_METADATA), + .will_msg = NULL, + .will_flags = MQTT_WSS_PUB_QOS2, + .keep_alive = 60 + }; +#ifndef ACLK_DISABLE_CHALLENGE + aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass); + mqtt_conn_params.clientid = mqtt_otp_user; + mqtt_conn_params.username = mqtt_otp_user; + mqtt_conn_params.password = mqtt_otp_pass; +#endif + + lwt = aclk_generate_disconnect(NULL); + mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN); + + mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg); + if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) { + json_object_put(lwt); + freez(aclk_hostname); + aclk_hostname = NULL; + info("MQTTWSS connection succeeded"); + mqtt_connected_actions(client); + return 0; + } + + freez(aclk_hostname); + aclk_hostname = NULL; + json_object_put(lwt); + error("Connect failed\n"); + } + + return 1; +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + + struct aclk_stats_thread *stats_thread = NULL; + + struct aclk_query_threads query_threads; + query_threads.thread_list = NULL; + + ACLK_PROXY_TYPE proxy_type; + aclk_get_proxy(&proxy_type); + if (proxy_type == PROXY_TYPE_SOCKS5) { + error("SOCKS5 proxy is not supported by ACLK-NG yet."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } + + // This thread is unusual in that it cannot be cancelled by cancel_main_threads() + // as it must notify the far end that it shutdown gracefully and avoid the LWT. + netdata_thread_disable_cancelability(); + +#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) + info("Killing ACLK thread -> cloud functionality has been disabled"); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +#endif + aclk_popcorn_check_bump(); // start localhost popcorn timer + query_threads.count = read_query_thread_count(); + + if (wait_till_cloud_enabled()) + goto exit; + + if (wait_till_agent_claim_ready()) + goto exit; + + if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + error("Couldn't initialize MQTT_WSS network library"); + goto exit; + } + + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); + if (aclk_stats_enabled) { + stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); + stats_thread->thread = mallocz(sizeof(netdata_thread_t)); + stats_thread->query_thread_count = query_threads.count; + netdata_thread_create( + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread); + } + + // Keep reconnecting and talking until our time has come + // and the Grim Reaper (netdata_exit) calls + do { + if (aclk_attempt_to_connect(mqttwss_client)) + goto exit_full; + + // warning this assumes the popcorning is relative short (3s) + // if that changes call mqtt_wss_service from within + // to keep OpenSSL, WSS and MQTT connection alive + if (wait_popcorning_finishes(mqttwss_client, &query_threads)) + goto exit_full; + + if (!handle_connection(mqttwss_client)) { + aclk_stats_upd_online(0); + aclk_connected = 0; + } + } while (!netdata_exit); + + aclk_graceful_disconnect(mqttwss_client); + +exit_full: +// Tear Down + QUERY_THREAD_WAKEUP_ALL; + + aclk_query_threads_cleanup(&query_threads); + + if (aclk_stats_enabled) { + netdata_thread_join(*stats_thread->thread, NULL); + aclk_stats_thread_cleanup(); + freez(stats_thread->thread); + freez(stats_thread); + } + free_topic_cache(); + mqtt_wss_destroy(mqttwss_client); +exit: + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +} + +// TODO this is taken over as workaround from old ACLK +// fix this in both old and new ACLK +extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); + +void aclk_alarm_reload(void) +{ + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; + return; + } + ACLK_SHARED_STATE_UNLOCK; + + aclk_queue_query(aclk_query_new(METADATA_ALARMS)); +} + +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +{ + BUFFER *local_buffer; + json_object *msg; + + if (host != localhost) + return 0; + + ACLK_SHARED_STATE_LOCK; + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { + ACLK_SHARED_STATE_UNLOCK; + return 0; + } + ACLK_SHARED_STATE_UNLOCK; + + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); + health_alarm_entry2json_nolock(local_buffer, ae, host); + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); + + msg = json_tokener_parse(local_buffer->buffer); + + struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE); + query->data.alarm_update = msg; + aclk_queue_query(query); + + buffer_free(local_buffer); + return 0; +} + +int aclk_update_chart(RRDHOST *host, char *chart_name, int create) +{ + struct aclk_query *query; + + if (aclk_popcorn_check_bump()) + return 0; + + query = aclk_query_new(create ? CHART_NEW : CHART_DEL); + if(create) { + query->data.chart_add_del.host = host; + query->data.chart_add_del.chart_name = strdupz(chart_name); + } else { + query->data.metadata_info.host = host; + query->data.metadata_info.initial_on_connect = 0; + } + + aclk_queue_query(query); + return 0; +} + +/* + * Add a new collector to the list + * If it exists, update the chart count + */ +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct aclk_query *query; + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(tmp_collector->count != 1)) { + COLLECTOR_UNLOCK; + return; + } + + COLLECTOR_UNLOCK; + + if (aclk_popcorn_check_bump()) + return; + + if (host != localhost) + return; + + query = aclk_query_new(METADATA_INFO); + query->data.metadata_info.host = localhost; //TODO + query->data.metadata_info.initial_on_connect = 0; + aclk_queue_query(query); + + query = aclk_query_new(METADATA_ALARMS); + query->data.metadata_alarms.initial_on_connect = 0; + aclk_queue_query(query); +} + +/* + * Delete a collector from the list + * If the chart count reaches zero the collector will be removed + * from the list by calling del_collector. + * + * This function will release the memory used and schedule + * a cloud update + */ +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct aclk_query *query; + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(!tmp_collector || tmp_collector->count)) { + COLLECTOR_UNLOCK; + return; + } + + debug( + D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", + tmp_collector->count); + + COLLECTOR_UNLOCK; + + _free_collector(tmp_collector); + + if (aclk_popcorn_check_bump()) + return; + + if (host != localhost) + return; + + query = aclk_query_new(METADATA_INFO); + query->data.metadata_info.host = localhost; //TODO + query->data.metadata_info.initial_on_connect = 0; + aclk_queue_query(query); + + query = aclk_query_new(METADATA_ALARMS); + query->data.metadata_alarms.initial_on_connect = 0; + aclk_queue_query(query); +} + +struct label *add_aclk_host_labels(struct label *label) { +#ifdef ENABLE_ACLK + ACLK_PROXY_TYPE aclk_proxy; + char *proxy_str; + aclk_get_proxy(&aclk_proxy); + + switch(aclk_proxy) { + case PROXY_TYPE_SOCKS5: + proxy_str = "SOCKS5"; + break; + case PROXY_TYPE_HTTP: + proxy_str = "HTTP"; + break; + default: + proxy_str = "none"; + break; + } + label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO); + return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); +#else + return label; +#endif +} diff --git a/aclk/aclk.h b/aclk/aclk.h new file mode 100644 index 000000000..29626c7f4 --- /dev/null +++ b/aclk/aclk.h @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_H +#define ACLK_H + +typedef struct aclk_rrdhost_state { + char *claimed_id; // Claimed ID if host has one otherwise NULL +} aclk_rrdhost_state; + +#include "../daemon/common.h" +#include "aclk_util.h" + +// minimum and maximum supported version of ACLK +// in this version of agent +#define ACLK_VERSION_MIN 2 +#define ACLK_VERSION_MAX 2 + +// Version negotiation messages have they own versioning +// this is also used for LWT message as we set that up +// before version negotiation +#define ACLK_VERSION_NEG_VERSION 1 + +// Maximum time to wait for version negotiation before aborting +// and defaulting to oldest supported version +#define VERSION_NEG_TIMEOUT 3 + +#if ACLK_VERSION_MIN > ACLK_VERSION_MAX +#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" +#endif + +// Define ACLK Feature Version Boundaries Here +#define ACLK_V_COMPRESSION 2 + +// How many MQTT PUBACKs we need to get to consider connection +// stable for the purposes of TBEB (truncated binary exponential backoff) +#define ACLK_PUBACKS_CONN_STABLE 3 + +// TODO get rid of this shit +extern int aclk_disable_runtime; +extern int aclk_disable_single_updates; +extern int aclk_kill_link; +extern int aclk_connected; + +extern usec_t aclk_session_us; +extern time_t aclk_session_sec; + +void *aclk_main(void *ptr); +void aclk_single_update_disable(); +void aclk_single_update_enable(); + +#define NETDATA_ACLK_HOOK \ + { .name = "ACLK_Main", \ + .config_section = NULL, \ + .config_name = NULL, \ + .enabled = 1, \ + .thread = NULL, \ + .init_routine = NULL, \ + .start_routine = aclk_main }, + +extern netdata_mutex_t aclk_shared_state_mutex; +#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex) +#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex) + +typedef enum aclk_agent_state { + AGENT_INITIALIZING, + AGENT_STABLE +} ACLK_AGENT_STATE; +extern struct aclk_shared_state { + ACLK_AGENT_STATE agent_state; + time_t last_popcorn_interrupt; + + // read only while ACLK connected + // protect by lock otherwise + int version_neg; + usec_t version_neg_wait_till; + + // To wait for `disconnect` message PUBACK + // when shuting down + // at the same time if > 0 we know link is + // shutting down + int mqtt_shutdown_msg_id; + int mqtt_shutdown_msg_rcvd; +} aclk_shared_state; + +void aclk_alarm_reload(void); +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); + +// TODO this is for bacward compatibility with ACLK legacy +#define ACLK_CMD_CHART 1 +#define ACLK_CMD_CHARTDEL 0 +/* Informs ACLK about created/deleted chart + * @param create 0 - if chart was deleted, other if chart created + */ +int aclk_update_chart(RRDHOST *host, char *chart_name, int create); + +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); + +struct label *add_aclk_host_labels(struct label *label); + +#endif /* ACLK_H */ diff --git a/aclk/aclk_collector_list.c b/aclk/aclk_collector_list.c new file mode 100644 index 000000000..a251a23a8 --- /dev/null +++ b/aclk/aclk_collector_list.c @@ -0,0 +1,193 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +// This is copied from Legacy ACLK, Original Autor: amoss + +// TODO unmess this + +#include "aclk_collector_list.h" + +netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; + +struct _collector *collector_list = NULL; + +/* + * Free a collector structure + */ +void _free_collector(struct _collector *collector) +{ + if (likely(collector->plugin_name)) + freez(collector->plugin_name); + + if (likely(collector->module_name)) + freez(collector->module_name); + + if (likely(collector->hostname)) + freez(collector->hostname); + + freez(collector); +} + +/* + * This will report the collector list + * + */ +#ifdef ACLK_DEBUG +static void _dump_collector_list() +{ + struct _collector *tmp_collector; + + COLLECTOR_LOCK; + + info("DUMPING ALL COLLECTORS"); + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + info("DUMPING ALL COLLECTORS -- nothing found"); + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + + while (tmp_collector) { + info( + "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, + tmp_collector->plugin_name ? tmp_collector->plugin_name : "", + tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); + + tmp_collector = tmp_collector->next; + } + info("DUMPING ALL COLLECTORS DONE"); + COLLECTOR_UNLOCK; +} +#endif + +/* + * This will cleanup the collector list + * + */ +void _reset_collector_list() +{ + struct _collector *tmp_collector, *next_collector; + + COLLECTOR_LOCK; + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + collector_list->count = 0; + collector_list->next = NULL; + + // We broke the link; we can unlock + COLLECTOR_UNLOCK; + + while (tmp_collector) { + next_collector = tmp_collector->next; + _free_collector(tmp_collector); + tmp_collector = next_collector; + } +} + +/* + * Find a collector (if it exists) + * Must lock before calling this + * If last_collector is not null, it will return the previous collector in the linked + * list (used in collector delete) + */ +static struct _collector *_find_collector( + const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) +{ + struct _collector *tmp_collector, *prev_collector; + uint32_t plugin_hash; + uint32_t module_hash; + uint32_t hostname_hash; + + if (unlikely(!collector_list)) { + collector_list = callocz(1, sizeof(struct _collector)); + return NULL; + } + + if (unlikely(!collector_list->next)) + return NULL; + + plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + module_hash = module_name ? simple_hash(module_name) : 1; + hostname_hash = simple_hash(hostname); + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + prev_collector = collector_list; + while (tmp_collector) { + if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && + hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && + (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && + (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { + if (unlikely(last_collector)) + *last_collector = prev_collector; + + return tmp_collector; + } + + prev_collector = tmp_collector; + tmp_collector = tmp_collector->next; + } + + return tmp_collector; +} + +/* + * Called to delete a collector + * It will reduce the count (chart_count) and will remove it + * from the linked list if the count reaches zero + * The structure will be returned to the caller to free + * the resources + * + */ +struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector, *prev_collector = NULL; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); + + if (likely(tmp_collector)) { + --tmp_collector->count; + if (unlikely(!tmp_collector->count)) + prev_collector->next = tmp_collector->next; + } + return tmp_collector; +} + +/* + * Add a new collector (plugin / module) to the list + * If it already exists just update the chart count + * + * Lock before calling + */ +struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); + + if (unlikely(!tmp_collector)) { + tmp_collector = callocz(1, sizeof(struct _collector)); + tmp_collector->hostname_hash = simple_hash(hostname); + tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; + + tmp_collector->hostname = strdupz(hostname); + tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; + tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; + + tmp_collector->next = collector_list->next; + collector_list->next = tmp_collector; + } + tmp_collector->count++; + debug( + D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", + module_name ? module_name : "*", tmp_collector->count); + return tmp_collector; +} diff --git a/aclk/aclk_collector_list.h b/aclk/aclk_collector_list.h new file mode 100644 index 000000000..98d30ba94 --- /dev/null +++ b/aclk/aclk_collector_list.h @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +// This is copied from Legacy ACLK, Original Autor: amoss + +// TODO unmess this + +#ifndef ACLK_COLLECTOR_LIST_H +#define ACLK_COLLECTOR_LIST_H + +#include "libnetdata/libnetdata.h" + +extern netdata_mutex_t collector_mutex; + +#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) +#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) + +/* + * Maintain a list of collectors and chart count + * If all the charts of a collector are deleted + * then a new metadata dataset must be send to the cloud + * + */ +struct _collector { + time_t created; + uint32_t count; //chart count + uint32_t hostname_hash; + uint32_t plugin_hash; + uint32_t module_hash; + char *hostname; + char *plugin_name; + char *module_name; + struct _collector *next; +}; + +struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name); +struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name); +void _reset_collector_list(); +void _free_collector(struct _collector *collector); + +#endif /* ACLK_COLLECTOR_LIST_H */ diff --git a/aclk/aclk_otp.c b/aclk/aclk_otp.c new file mode 100644 index 000000000..fcb9d600c --- /dev/null +++ b/aclk/aclk_otp.c @@ -0,0 +1,261 @@ + +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_otp.h" + +#include "https_client.h" + +#include "../daemon/common.h" + +#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h" + +struct dictionary_singleton { + char *key; + char *result; +}; + +static int json_extract_singleton(JSON_ENTRY *e) +{ + struct dictionary_singleton *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, data->key)) { + data->result = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + case JSON_BOOLEAN: + case JSON_NULL: + break; + } + return 0; +} + +// Base-64 decoder. +// Note: This is non-validating, invalid input will be decoded without an error. +// Challenges are packed into json strings so we don't skip newlines. +// Size errors (i.e. invalid input size or insufficient output space) are caught. +static size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size) +{ + static char lookup[256]; + static int first_time=1; + if (first_time) + { + first_time = 0; + for(int i=0; i<256; i++) + lookup[i] = -1; + for(int i='A'; i<='Z'; i++) + lookup[i] = i-'A'; + for(int i='a'; i<='z'; i++) + lookup[i] = i-'a' + 26; + for(int i='0'; i<='9'; i++) + lookup[i] = i-'0' + 52; + lookup['+'] = 62; + lookup['/'] = 63; + } + if ((input_size & 3) != 0) + { + error("Can't decode base-64 input length %zu", input_size); + return 0; + } + size_t unpadded_size = (input_size/4) * 3; + if ( unpadded_size > output_size ) + { + error("Output buffer size %zu is too small to decode %zu into", output_size, input_size); + return 0; + } + // Don't check padding within full quantums + for (size_t i = 0 ; i < input_size-4 ; i+=4 ) + { + uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + output += 3; + input += 4; + } + // Handle padding only in last quantum + if (input[2] == '=') { + uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]]; + output[0] = value >> 4; + //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]); + return unpadded_size-2; + } + else if (input[3] == '=') { + uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]]; + output[0] = value >> 10; + output[1] = value >> 2; + //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]); + return unpadded_size-1; + } + else + { + uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + return unpadded_size; + } +} + +static size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size) +{ + uint32_t value; + static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + if ((input_size/3+1)*4 >= output_size) + { + error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size); + return 0; + } + size_t count = 0; + while (input_size>3) + { + value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff; + output[0] = lookup[value >> 18]; + output[1] = lookup[(value >> 12) & 0x3f]; + output[2] = lookup[(value >> 6) & 0x3f]; + output[3] = lookup[value & 0x3f]; + //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + output += 4; + input += 3; + input_size -= 3; + count += 4; + } + switch (input_size) + { + case 2: + value = (input[0] << 10) + (input[1] << 2); + output[0] = lookup[(value >> 12) & 0x3f]; + output[1] = lookup[(value >> 6) & 0x3f]; + output[2] = lookup[value & 0x3f]; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 1: + value = input[0] << 4; + output[0] = lookup[(value >> 6) & 0x3f]; + output[1] = lookup[value & 0x3f]; + output[2] = '='; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 0: + break; + } + return count; +} + +static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char *decrypted) +{ + int result = RSA_private_decrypt( data_len, enc_data, decrypted, p_key, RSA_PKCS1_OAEP_PADDING); + if (result == -1) { + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Decryption of the challenge failed: %s", err); + } + return result; +} + +// aclk_get_mqtt_otp is slightly modified original code from @amoss +void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass) +{ + char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + debug(D_ACLK, "Performing challenge-response sequence"); + if (*mqtt_pass != NULL) + { + freez(*mqtt_pass); + *mqtt_pass = NULL; + } + // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge + // TODO - target host? + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + { + error("Agent was not claimed - cannot perform challenge/response"); + goto CLEANUP; + } + char url[1024]; + sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id); + info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url); + if (https_request(HTTP_REQ_GET, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL)) + { + error("Challenge failed: %s", data_buffer); + goto CLEANUP; + } + struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; + + debug(D_ACLK, "Challenge response from cloud: %s", data_buffer); + if (json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK) + { + freez(challenge.result); + error("Could not parse the json response with the challenge: %s", data_buffer); + goto CLEANUP; + } + if (challenge.result == NULL) { + error("Could not retrieve challenge from auth response: %s", data_buffer); + goto CLEANUP; + } + + + size_t challenge_len = strlen(challenge.result); + unsigned char decoded[512]; + size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); + + unsigned char plaintext[4096]={}; + int decrypted_length = private_decrypt(p_key, decoded, decoded_len, plaintext); + freez(challenge.result); + char encoded[512]; + size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); + encoded[encoded_len] = 0; + debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded); + + char response_json[4096]={}; + sprintf(response_json, "{\"response\":\"%s\"}", encoded); + debug(D_ACLK, "Password phase: %s",response_json); + // TODO - host + sprintf(url, "/api/v1/auth/node/%s/password", agent_id); + if (https_request(HTTP_REQ_POST, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json)) + { + error("Challenge-response failed: %s", data_buffer); + goto CLEANUP; + } + + debug(D_ACLK, "Password response from cloud: %s", data_buffer); + + struct dictionary_singleton password = { .key = "password", .result = NULL }; + if (json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK) + { + freez(password.result); + error("Could not parse the json response with the password: %s", data_buffer); + goto CLEANUP; + } + + if (password.result == NULL ) { + error("Could not retrieve password from auth response"); + goto CLEANUP; + } + if (*mqtt_pass != NULL ) + freez(*mqtt_pass); + *mqtt_pass = password.result; + if (*mqtt_usr != NULL) + freez(*mqtt_usr); + *mqtt_usr = agent_id; + agent_id = NULL; + +CLEANUP: + if (agent_id != NULL) + freez(agent_id); + freez(data_buffer); + return; +} diff --git a/aclk/aclk_otp.h b/aclk/aclk_otp.h new file mode 100644 index 000000000..31e81c5a1 --- /dev/null +++ b/aclk/aclk_otp.h @@ -0,0 +1,10 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_OTP_H +#define ACLK_OTP_H + +#include "../daemon/common.h" + +void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass); + +#endif /* ACLK_OTP_H */ diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c new file mode 100644 index 000000000..71c63f647 --- /dev/null +++ b/aclk/aclk_query.c @@ -0,0 +1,295 @@ +#include "aclk_query.h" +#include "aclk_stats.h" +#include "aclk_query_queue.h" +#include "aclk_tx_msgs.h" + +#define ACLK_QUERY_THREAD_NAME "ACLK_Query" + +#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" + +pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; +pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; +#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) +#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) + +typedef struct aclk_query_handler { + aclk_query_type_t type; + char *name; // for logging purposes + int(*fnc)(mqtt_wss_client client, aclk_query_t query); +} aclk_query_handler; + +static int info_metadata(mqtt_wss_client client, aclk_query_t query) +{ + aclk_send_info_metadata(client, + !query->data.metadata_info.initial_on_connect, + query->data.metadata_info.host); + return 0; +} + +static int alarms_metadata(mqtt_wss_client client, aclk_query_t query) +{ + aclk_send_alarm_metadata(client, + !query->data.metadata_info.initial_on_connect); + return 0; +} + +static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url) +{ + usec_t t; + + t = now_monotonic_high_precision_usec(); + w->response.code = web_client_api_request_v1(host, w, url); + t = now_monotonic_high_precision_usec() - t; + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_q_process_total += t; + aclk_metrics_per_sample.cloud_q_process_count++; + if (aclk_metrics_per_sample.cloud_q_process_max < t) + aclk_metrics_per_sample.cloud_q_process_max = t; + ACLK_STATS_UNLOCK; + } + + return t; +} + +static int http_api_v2(mqtt_wss_client client, aclk_query_t query) +{ + int retval = 0; + usec_t t; + BUFFER *local_buffer = NULL; + +#ifdef NETDATA_WITH_ZLIB + int z_ret; + BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *start, *end; +#endif + + struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); + w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() + w->cookie1[0] = 0; // Simulate web_client_create_on_fd() + w->cookie2[0] = 0; // Simulate web_client_create_on_fd() + w->acl = 0x1f; + + char *mysep = strchr(query->data.http_api_v2.query, '?'); + if (mysep) { + url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); + *mysep = '\0'; + } else + url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1); + + mysep = strrchr(query->data.http_api_v2.query, '/'); + + // execute the query + t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop"); + +#ifdef NETDATA_WITH_ZLIB + // check if gzip encoding can and should be used + if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) { + start += strlen(WEB_HDR_ACCEPT_ENC); + end = strstr(start, "\x0D\x0A"); + start = strstr(start, "gzip"); + + if (start && start < end) { + w->response.zstream.zalloc = Z_NULL; + w->response.zstream.zfree = Z_NULL; + w->response.zstream.opaque = Z_NULL; + if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { + w->response.zinitialized = 1; + w->response.zoutput = 1; + } else + error("Failed to initialize zlib. Proceeding without compression."); + } + } + + if (w->response.data->len && w->response.zinitialized) { + w->response.zstream.next_in = (Bytef *)w->response.data->buffer; + w->response.zstream.avail_in = w->response.data->len; + do { + w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE; + w->response.zstream.next_out = w->response.zbuffer; + z_ret = deflate(&w->response.zstream, Z_FINISH); + if(z_ret < 0) { + if(w->response.zstream.msg) + error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); + else + error("Unknown error during zlib compression."); + retval = 1; + goto cleanup; + } + int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; + buffer_need_bytes(z_buffer, bytes_to_cpy); + memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy); + z_buffer->len += bytes_to_cpy; + } while(z_ret != Z_STREAM_END); + // so that web_client_build_http_header + // puts correct content lenght into header + buffer_free(w->response.data); + w->response.data = z_buffer; + z_buffer = NULL; + } +#endif + + now_realtime_timeval(&w->tv_ready); + w->response.data->date = w->tv_ready.tv_sec; + web_client_build_http_header(w); + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + buffer_strcat(local_buffer, w->response.header_output->buffer); + + if (w->response.data->len) { +#ifdef NETDATA_WITH_ZLIB + if (w->response.zinitialized) { + buffer_need_bytes(local_buffer, w->response.data->len); + memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); + local_buffer->len += w->response.data->len; + } else { +#endif + buffer_strcat(local_buffer, w->response.data->buffer); +#ifdef NETDATA_WITH_ZLIB + } +#endif + } + + aclk_http_msg_v2(client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); + +cleanup: +#ifdef NETDATA_WITH_ZLIB + if(w->response.zinitialized) + deflateEnd(&w->response.zstream); + buffer_free(z_buffer); +#endif + buffer_free(w->response.data); + buffer_free(w->response.header); + buffer_free(w->response.header_output); + freez(w); + buffer_free(local_buffer); + return retval; +} + +static int chart_query(mqtt_wss_client client, aclk_query_t query) +{ + aclk_chart_msg(client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); + return 0; +} + +static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query) +{ + aclk_alarm_state_msg(client, query->data.alarm_update); + // aclk_alarm_state_msg frees the json object including the header it generates + query->data.alarm_update = NULL; + return 0; +} + +aclk_query_handler aclk_query_handlers[] = { + { .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 }, + { .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query }, + { .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata }, + { .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata }, + { .type = CHART_NEW, .name = "chart new", .fnc = chart_query }, + { .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata }, + { .type = UNKNOWN, .name = NULL, .fnc = NULL } +}; + + +static void aclk_query_process_msg(struct aclk_query_thread *info, aclk_query_t query) +{ + for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { + if (aclk_query_handlers[i].type == query->type) { + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); + aclk_query_handlers[i].fnc(info->client, query); + aclk_query_free(query); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[info->idx]++; + ACLK_STATS_UNLOCK; + } + return; + } + } + fatal("Unknown query in query queue. %u", query->type); +} + +/* Processes messages from queue. Compete for work with other threads + */ +int aclk_query_process_msgs(struct aclk_query_thread *info) +{ + aclk_query_t query; + while ((query = aclk_queue_pop())) + aclk_query_process_msg(info, query); + + return 0; +} + +/** + * Main query processing thread + */ +void *aclk_query_main_thread(void *ptr) +{ + struct aclk_query_thread *info = ptr; + while (!netdata_exit) { + ACLK_SHARED_STATE_LOCK; + if (unlikely(!aclk_shared_state.version_neg)) { + if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { + ACLK_SHARED_STATE_UNLOCK; + info("Waiting for ACLK Version Negotiation message from Cloud"); + sleep(1); + continue; + } + errno = 0; + error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds." + " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); + aclk_shared_state.version_neg = ACLK_VERSION_MIN; +// When ACLK v3 is implemented you will need this +// aclk_set_rx_handlers(aclk_shared_state.version_neg); + } + ACLK_SHARED_STATE_UNLOCK; + + aclk_query_process_msgs(info); + + QUERY_THREAD_LOCK; + + if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait))) + sleep_usec(USEC_PER_SEC * 1); + + QUERY_THREAD_UNLOCK; + } + return NULL; +} + +#define TASK_LEN_MAX 16 +void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client) +{ + info("Starting %d query threads.", query_threads->count); + + char thread_name[TASK_LEN_MAX]; + query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); + for (int i = 0; i < query_threads->count; i++) { + query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics + + if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) + error("snprintf encoding error"); + netdata_thread_create( + &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread, + &query_threads->thread_list[i]); + + query_threads->thread_list[i].client = client; + } +} + +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) +{ + if (query_threads && query_threads->thread_list) { + for (int i = 0; i < query_threads->count; i++) { + netdata_thread_join(query_threads->thread_list[i].thread, NULL); + } + freez(query_threads->thread_list); + } + aclk_queue_lock(); + aclk_queue_flush(); +} diff --git a/aclk/aclk_query.h b/aclk/aclk_query.h new file mode 100644 index 000000000..43741fb32 --- /dev/null +++ b/aclk/aclk_query.h @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_QUERY_H +#define NETDATA_ACLK_QUERY_H + +#include "libnetdata/libnetdata.h" + +#include "mqtt_wss_client.h" + +extern pthread_cond_t query_cond_wait; +extern pthread_mutex_t query_lock_wait; +#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) +#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) + +// TODO +//extern volatile int aclk_connected; + +struct aclk_query_thread { + netdata_thread_t thread; + int idx; + mqtt_wss_client client; +}; + +struct aclk_query_threads { + struct aclk_query_thread *thread_list; + int count; +}; + +void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client); +void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); + +#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c new file mode 100644 index 000000000..c9461b233 --- /dev/null +++ b/aclk/aclk_query_queue.c @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_query_queue.h" +#include "aclk_query.h" +#include "aclk_stats.h" + +static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER; +#define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex) +#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex) + +static struct aclk_query_queue { + aclk_query_t head; + aclk_query_t tail; + int block_push; +} aclk_query_queue = { + .head = NULL, + .tail = NULL, + .block_push = 0 +}; + +static inline int _aclk_queue_query(aclk_query_t query) +{ + query->created = now_realtime_usec(); + ACLK_QUEUE_LOCK; + if (aclk_query_queue.block_push) { + ACLK_QUEUE_UNLOCK; + if(!netdata_exit) + error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + aclk_query_free(query); + return 1; + } + if (!aclk_query_queue.head) { + aclk_query_queue.head = query; + aclk_query_queue.tail = query; + ACLK_QUEUE_UNLOCK; + return 0; + } + // TODO deduplication + aclk_query_queue.tail->next = query; + aclk_query_queue.tail = query; + ACLK_QUEUE_UNLOCK; + return 0; + +} + +int aclk_queue_query(aclk_query_t query) +{ + int ret = _aclk_queue_query(query); + if (!ret) { + QUERY_THREAD_WAKEUP; + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_queued++; + ACLK_STATS_UNLOCK; + } + } + return ret; +} + +aclk_query_t aclk_queue_pop(void) +{ + aclk_query_t ret; + + ACLK_QUEUE_LOCK; + if (aclk_query_queue.block_push) { + ACLK_QUEUE_UNLOCK; + if(!netdata_exit) + error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown."); + return NULL; + } + + ret = aclk_query_queue.head; + if (!ret) { + ACLK_QUEUE_UNLOCK; + return ret; + } + + aclk_query_queue.head = ret->next; + if (unlikely(!aclk_query_queue.head)) + aclk_query_queue.tail = aclk_query_queue.head; + ACLK_QUEUE_UNLOCK; + + ret->next = NULL; + return ret; +} + +void aclk_queue_flush(void) +{ + aclk_query_t query = aclk_queue_pop(); + while (query) { + aclk_query_free(query); + query = aclk_queue_pop(); + }; +} + +aclk_query_t aclk_query_new(aclk_query_type_t type) +{ + aclk_query_t query = callocz(1, sizeof(struct aclk_query)); + query->type = type; + return query; +} + +void aclk_query_free(aclk_query_t query) +{ + if (query->type == HTTP_API_V2) { + freez(query->data.http_api_v2.payload); + if (query->data.http_api_v2.query != query->dedup_id) + freez(query->data.http_api_v2.query); + } + + if (query->type == CHART_NEW) + freez(query->data.chart_add_del.chart_name); + + if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update) + json_object_put(query->data.alarm_update); + + freez(query->dedup_id); + freez(query->callback_topic); + freez(query->msg_id); + freez(query); +} + +void aclk_queue_lock(void) +{ + ACLK_QUEUE_LOCK; + aclk_query_queue.block_push = 1; + ACLK_QUEUE_UNLOCK; +} diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h new file mode 100644 index 000000000..c46513567 --- /dev/null +++ b/aclk/aclk_query_queue.h @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_QUERY_QUEUE_H +#define NETDATA_ACLK_QUERY_QUEUE_H + +#include "libnetdata/libnetdata.h" +#include "../daemon/common.h" + +typedef enum { + UNKNOWN, + METADATA_INFO, + METADATA_ALARMS, + HTTP_API_V2, + CHART_NEW, + CHART_DEL, + ALARM_STATE_UPDATE +} aclk_query_type_t; + +struct aclk_query_metadata { + RRDHOST *host; + int initial_on_connect; +}; + +struct aclk_query_chart_add_del { + RRDHOST *host; + char* chart_name; +}; + +struct aclk_query_http_api_v2 { + char *payload; + char *query; +}; + +typedef struct aclk_query *aclk_query_t; +struct aclk_query { + aclk_query_type_t type; + + // dedup_id is used to deduplicate queries in the list + // if type and dedup_id is the same message is deduplicated + // set dedup_id to NULL to never deduplicate the message + // set dedup_id to constant (e.g. empty string "") to make + // message of this type ever exist only once in the list + char *dedup_id; + char *callback_topic; + char *msg_id; + + usec_t created; + + aclk_query_t next; + + // TODO maybe remove? + int version; + union { + struct aclk_query_metadata metadata_info; + struct aclk_query_metadata metadata_alarms; + struct aclk_query_http_api_v2 http_api_v2; + struct aclk_query_chart_add_del chart_add_del; + json_object *alarm_update; + } data; +}; + +aclk_query_t aclk_query_new(aclk_query_type_t type); +void aclk_query_free(aclk_query_t query); + +int aclk_queue_query(aclk_query_t query); +aclk_query_t aclk_queue_pop(void); +void aclk_queue_flush(void); + +void aclk_queue_lock(void); + +#endif /* NETDATA_ACLK_QUERY_QUEUE_H */ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c new file mode 100644 index 000000000..fcb8d9968 --- /dev/null +++ b/aclk/aclk_rx_msgs.c @@ -0,0 +1,343 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_rx_msgs.h" + +#include "aclk_stats.h" +#include "aclk_query_queue.h" + +#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" +#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/" + +struct aclk_request { + char *type_id; + char *msg_id; + char *callback_topic; + char *payload; + int version; + int min_version; + int max_version; +}; + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, "msg-id")) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "type")) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "callback-topic")) { + data->callback_topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "payload")) { + if (likely(e->data.string)) { + size_t len = strlen(e->data.string); + data->payload = mallocz(len+1); + if (!url_decode_r(data->payload, e->data.string, len + 1)) + strcpy(data->payload, e->data.string); + } + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, "version")) { + data->version = e->data.number; + break; + } + if (!strcmp(e->name, "min-version")) { + data->min_version = e->data.number; + break; + } + if (!strcmp(e->name, "max-version")) { + data->max_version = e->data.number; + break; + } + + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + +static inline int aclk_extract_v2_data(char *payload, char **data) +{ + char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR); + if(!ptr) + return 1; + ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR); + *data = strdupz(ptr); + return 0; +} + +static inline int aclk_v2_payload_get_query(const char *payload, char **query_url) +{ + const char *start, *end; + + if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) { + errno = 0; + error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); + return 1; + } + start = payload + 4; + + if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) { + errno = 0; + error("Doesn't look like HTTP GET request."); + return 1; + } + + *query_url = mallocz((end - start) + 1); + strncpyz(*query_url, start, end - start); + + return 0; +} + +#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ + if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\ + debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ + ACLK_SHARED_STATE_UNLOCK;\ + return 1;\ + }\ + ACLK_SHARED_STATE_UNLOCK; + +static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + HTTP_CHECK_AGENT_INITIALIZED(); + + aclk_query_t query; + + errno = 0; + if (cloud_to_agent->version < ACLK_V_COMPRESSION) { + error( + "This handler cannot reply to request with version older than %d, received %d.", + ACLK_V_COMPRESSION, + cloud_to_agent->version); + return 1; + } + + query = aclk_query_new(HTTP_API_V2); + + if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) { + error("Error extracting payload expected after the JSON dictionary."); + goto error; + } + + if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) { + error("Could not extract payload from query"); + goto error; + } + + if (unlikely(!cloud_to_agent->callback_topic)) { + error("Missing callback_topic"); + goto error; + } + + if (unlikely(!cloud_to_agent->msg_id)) { + error("Missing msg_id"); + goto error; + } + + // aclk_queue_query takes ownership of data pointer + query->callback_topic = cloud_to_agent->callback_topic; + // for clarity and code readability as when we process the request + // it would be strange to get URL from `dedup_id` + query->data.http_api_v2.query = query->dedup_id; + query->msg_id = cloud_to_agent->msg_id; + aclk_queue_query(query); + return 0; + +error: + aclk_query_free(query); + return 1; +} + +// This handles `version` message from cloud used to negotiate +// protocol version we will use +static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload) +{ + UNUSED(raw_payload); + int version = -1; + errno = 0; + + if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { + error( + "Unsuported version of \"version\" message from cloud. Expected %d, Got %d", + ACLK_VERSION_NEG_VERSION, + cloud_to_agent->version); + return 1; + } + if (unlikely(!cloud_to_agent->min_version)) { + error("Min version missing or 0"); + return 1; + } + if (unlikely(!cloud_to_agent->max_version)) { + error("Max version missing or 0"); + return 1; + } + if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { + error( + "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, + cloud_to_agent->min_version); + return 1; + } + + if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { + error( + "Agent too old for this cloud. Minimum version required by cloud %d." + " Maximum version supported by this agent %d.", + cloud_to_agent->min_version, ACLK_VERSION_MAX); + aclk_kill_link = 1; + aclk_disable_runtime = 1; + return 1; + } + if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { + error( + "Cloud version is too old for this agent. Maximum version supported by cloud %d." + " Minimum (oldest) version supported by this agent %d.", + cloud_to_agent->max_version, ACLK_VERSION_MIN); + aclk_kill_link = 1; + return 1; + } + + version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); + + ACLK_SHARED_STATE_LOCK; + if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) { + errno = 0; + error("The \"version\" message came too late ignoring."); + goto err_cleanup; + } + if (unlikely(aclk_shared_state.version_neg)) { + errno = 0; + error("Version has already been set to %d", aclk_shared_state.version_neg); + goto err_cleanup; + } + aclk_shared_state.version_neg = version; + ACLK_SHARED_STATE_UNLOCK; + + info("Choosing version %d of ACLK", version); + + aclk_set_rx_handlers(version); + + return 0; + +err_cleanup: + ACLK_SHARED_STATE_UNLOCK; + return 1; +} + +typedef struct aclk_incoming_msg_type{ + char *name; + int(*fnc)(struct aclk_request *, char *); +}aclk_incoming_msg_type; + +aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = { + { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, + { .name = "version", .fnc = aclk_handle_version_response }, + { .name = NULL, .fnc = NULL } +}; + +struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression; + +void aclk_set_rx_handlers(int version) +{ +// ACLK_NG ACLK version support starts at 2 +// TODO ACLK v3 + UNUSED(version); + aclk_incoming_msg_types = aclk_incoming_msg_types_compression; +} + +int aclk_handle_cloud_message(char *payload) +{ + struct aclk_request cloud_to_agent; + memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_recvd++; + ACLK_STATS_UNLOCK; + } + + if (unlikely(!payload)) { + errno = 0; + error("ACLK incoming message is empty"); + goto err_cleanup_nojson; + } + + debug(D_ACLK, "ACLK incoming message (%s)", payload); + + int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse); + + if (unlikely(rc != JSON_OK)) { + errno = 0; + error("Malformed json request (%s)", payload); + goto err_cleanup; + } + + if (!cloud_to_agent.type_id) { + errno = 0; + error("Cloud message is missing compulsory key \"type\""); + goto err_cleanup; + } + + if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { + error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); + goto err_cleanup; + } + + for (int i = 0; aclk_incoming_msg_types[i].name; i++) { + if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) { + if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { + // in case of success handler is supposed to clean up after itself + // or as in the case of aclk_handle_cloud_request take + // ownership of the pointers (done to avoid copying) + // see what `aclk_queue_query` parameter `internal` does + + // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! + // msg handlers (namely aclk_handle_version_responce) + // can freely change what aclk_incoming_msg_types points to + // so either exit or restart this for loop + freez(cloud_to_agent.type_id); + return 0; + } + goto err_cleanup; + } + } + + errno = 0; + error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id); + +err_cleanup: + if (cloud_to_agent.payload) + freez(cloud_to_agent.payload); + if (cloud_to_agent.type_id) + freez(cloud_to_agent.type_id); + if (cloud_to_agent.msg_id) + freez(cloud_to_agent.msg_id); + if (cloud_to_agent.callback_topic) + freez(cloud_to_agent.callback_topic); + +err_cleanup_nojson: + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_err++; + ACLK_STATS_UNLOCK; + } + + return 1; +} diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h new file mode 100644 index 000000000..c9f0bd37a --- /dev/null +++ b/aclk/aclk_rx_msgs.h @@ -0,0 +1,14 @@ + + +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_RX_MSGS_H +#define ACLK_RX_MSGS_H + +#include "../daemon/common.h" +#include "libnetdata/libnetdata.h" + +int aclk_handle_cloud_message(char *payload); +void aclk_set_rx_handlers(int version); + +#endif /* ACLK_RX_MSGS_H */ diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c new file mode 100644 index 000000000..b61ac05f7 --- /dev/null +++ b/aclk/aclk_stats.c @@ -0,0 +1,274 @@ +#include "aclk_stats.h" + +netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; + +int aclk_stats_enabled; + +int query_thread_count; + +// data ACLK stats need per query thread +struct aclk_qt_data { + RRDDIM *dim; +} *aclk_qt_data = NULL; + +uint32_t *aclk_queries_per_thread = NULL; +uint32_t *aclk_queries_per_thread_sample = NULL; + +struct aclk_metrics aclk_metrics = { + .online = 0, +}; + +struct aclk_metrics_per_sample aclk_metrics_per_sample; + +static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent) +{ + static RRDSET *st_aclkstats = NULL; + static RRDDIM *rd_online_status = NULL; + + if (unlikely(!st_aclkstats)) { + st_aclkstats = rrdset_create_localhost( + "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status", + "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st_aclkstats); + + rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online); + + rrdset_done(st_aclkstats); +} + +static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st_query_thread = NULL; + static RRDDIM *rd_queued = NULL; + static RRDDIM *rd_dispatched = NULL; + + if (unlikely(!st_query_thread)) { + st_query_thread = rrdset_create_localhost( + "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s", + "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA); + + rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st_query_thread); + + rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued); + rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched); + + rrdset_done(st_query_thread); +} + +#ifdef NETDATA_INTERNAL_CHECKS +static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_avg = NULL; + static RRDDIM *rd_max = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms", + "netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + if(per_sample->latency_count) + rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count)); + else + rrddim_set_by_pointer(st, rd_avg, 0); + + rrddim_set_by_pointer(st, rd_max, per_sample->latency_max); + + rrdset_done(st); +} +#endif + +static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_rcvd = NULL; + static RRDDIM *rd_rq_err = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s", + "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err); + rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err); + + rrdset_done(st); +} + +#define MAX_DIM_NAME 16 +static void aclk_stats_query_threads(uint32_t *queries_per_thread) +{ + static RRDSET *st = NULL; + + char dim_name[MAX_DIM_NAME]; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", + "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < query_thread_count; i++) { + if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) + error("snprintf encoding error"); + aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } else + rrdset_next(st); + + for (int i = 0; i < query_thread_count; i++) { + rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); + } + + rrdset_done(st); +} + +static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_avg = NULL; + static RRDDIM *rd_rq_max = NULL; + static RRDDIM *rd_rq_total = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us", + "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE); + + rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + if(per_sample->cloud_q_process_count) + rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count)); + else + rrddim_set_by_pointer(st, rd_rq_avg, 0); + rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max); + rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total); + + rrdset_done(st); +} + +void aclk_stats_thread_cleanup() +{ + freez(aclk_qt_data); + freez(aclk_queries_per_thread); + freez(aclk_queries_per_thread_sample); +} + +void *aclk_stats_main_thread(void *ptr) +{ + struct aclk_stats_thread *args = ptr; + + query_thread_count = args->query_thread_count; + aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); + aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + + heartbeat_t hb; + heartbeat_init(&hb); + usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; + + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + struct aclk_metrics_per_sample per_sample; + struct aclk_metrics permanent; + + while (!netdata_exit) { + netdata_thread_testcancel(); + // ------------------------------------------------------------------------ + // Wait for the next iteration point. + + heartbeat_next(&hb, step_ut); + if (netdata_exit) break; + + ACLK_STATS_LOCK; + // to not hold lock longer than necessary, especially not to hold it + // during database rrd* operations + memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); + memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + + memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); + memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); + ACLK_STATS_UNLOCK; + + aclk_stats_collect(&per_sample, &permanent); + aclk_stats_query_queue(&per_sample); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_latency(&per_sample); +#endif + + aclk_stats_cloud_req(&per_sample); + aclk_stats_query_threads(aclk_queries_per_thread_sample); + + aclk_stats_query_time(&per_sample); + } + + return 0; +} + +void aclk_stats_upd_online(int online) { + if(!aclk_stats_enabled) + return; + + ACLK_STATS_LOCK; + aclk_metrics.online = online; + + if(!online) + aclk_metrics_per_sample.offline_during_sample = 1; + ACLK_STATS_UNLOCK; +} + +#ifdef NETDATA_INTERNAL_CHECKS +static usec_t pub_time[UINT16_MAX]; +void aclk_stats_msg_published(uint16_t id) +{ + ACLK_STATS_LOCK; + pub_time[id] = now_boottime_usec(); + ACLK_STATS_UNLOCK; +} + +void aclk_stats_msg_puback(uint16_t id) +{ + ACLK_STATS_LOCK; + usec_t t; + + if (!aclk_stats_enabled) { + ACLK_STATS_UNLOCK; + return; + } + + if (unlikely(!pub_time[id])) { + ACLK_STATS_UNLOCK; + error("Received PUBACK for unknown message?!"); + return; + } + + t = now_boottime_usec() - pub_time[id]; + t /= USEC_PER_MS; + pub_time[id] = 0; + if (aclk_metrics_per_sample.latency_max < t) + aclk_metrics_per_sample.latency_max = t; + + aclk_metrics_per_sample.latency_total += t; + aclk_metrics_per_sample.latency_count++; + ACLK_STATS_UNLOCK; +} +#endif /* NETDATA_INTERNAL_CHECKS */ diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h new file mode 100644 index 000000000..33d016965 --- /dev/null +++ b/aclk/aclk_stats.h @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ACLK_STATS_H +#define NETDATA_ACLK_STATS_H + +#include "../daemon/common.h" +#include "libnetdata/libnetdata.h" + +#define ACLK_STATS_THREAD_NAME "ACLK_Stats" + +extern netdata_mutex_t aclk_stats_mutex; + +#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex) +#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex) + +extern int aclk_stats_enabled; + +struct aclk_stats_thread { + netdata_thread_t *thread; + int query_thread_count; +}; + +// preserve between samples +struct aclk_metrics { + volatile uint8_t online; +}; + +// reset to 0 on every sample +extern struct aclk_metrics_per_sample { + /* in the unlikely event of ACLK disconnecting + and reconnecting under 1 sampling rate + we want to make sure we record the disconnection + despite it being then seemingly longer in graph */ + volatile uint8_t offline_during_sample; + + volatile uint32_t queries_queued; + volatile uint32_t queries_dispatched; + +#ifdef NETDATA_INTERNAL_CHECKS + volatile uint32_t latency_max; + volatile uint32_t latency_total; + volatile uint32_t latency_count; +#endif + + volatile uint32_t cloud_req_recvd; + volatile uint32_t cloud_req_err; + + volatile uint32_t cloud_q_process_total; + volatile uint32_t cloud_q_process_count; + volatile uint32_t cloud_q_process_max; +} aclk_metrics_per_sample; + +extern uint32_t *aclk_queries_per_thread; + +void *aclk_stats_main_thread(void *ptr); +void aclk_stats_thread_cleanup(); +void aclk_stats_upd_online(int online); + +#ifdef NETDATA_INTERNAL_CHECKS +void aclk_stats_msg_published(uint16_t id); +void aclk_stats_msg_puback(uint16_t id); +#endif /* NETDATA_INTERNAL_CHECKS */ + +#endif /* NETDATA_ACLK_STATS_H */ diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c new file mode 100644 index 000000000..158fc4e26 --- /dev/null +++ b/aclk/aclk_tx_msgs.c @@ -0,0 +1,395 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "aclk_tx_msgs.h" +#include "../daemon/common.h" +#include "aclk_util.h" +#include "aclk_stats.h" + +#ifndef __GNUC__ +#pragma region aclk_tx_msgs helper functions +#endif + +static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) +{ + uint16_t packet_id; + const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + + mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 1024 + char filename[FN_MAX_LEN]; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); + json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); +#endif +} + +static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic) +{ + uint16_t packet_id; + const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + + mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 1024 + char filename[FN_MAX_LEN]; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); + json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); +#endif + return packet_id; +} + +/* UNUSED now but can be used soon MVP1? +static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic) +{ + if (unlikely(!topic || topic[0] != '/')) { + error ("Full topic required!"); + return; + } + + const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + + mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(); +#endif +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 1024 + char filename[FN_MAX_LEN]; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); + json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); +#endif +} +*/ + +#define TOPIC_MAX_LEN 512 +#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" +static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len) +{ + uint16_t packet_id; + const char *str; + char *full_msg; + int len; + + if (unlikely(!topic || topic[0] != '/')) { + error ("Full topic required!"); + return; + } + + str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN); + len = strlen(str); + + full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len); + + memcpy(full_msg, str, len); + memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR)); + len += strlen(V2_BIN_PAYLOAD_SEPARATOR); + memcpy(&full_msg[len], payload, payload_len); + len += payload_len; + +/* TODO +#ifdef ACLK_LOG_CONVERSATION_DIR +#define FN_MAX_LEN 1024 + char filename[FN_MAX_LEN]; + snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT()); + json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY); +#endif */ + + mqtt_wss_publish_pid(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_msg_published(packet_id); +#endif + freez(full_msg); +} + +/* + * Creates universal header common for all ACLK messages. User gets ownership of json object created. + * Usually this is freed by send function after message has been sent. + */ +static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + json_object *tmp; + json_object *obj = json_object_new_object(); + + tmp = json_object_new_string(type); + json_object_object_add(obj, "type", tmp); + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + if (ts_secs == 0) { + ts_us = now_realtime_usec(); + ts_secs = ts_us / USEC_PER_SEC; + ts_us = ts_us % USEC_PER_SEC; + } + + tmp = json_object_new_string(msg_id); + json_object_object_add(obj, "msg-id", tmp); + + tmp = json_object_new_int64(ts_secs); + json_object_object_add(obj, "timestamp", tmp); + +// TODO handle this somehow on older json-c +// tmp = json_object_new_uint64(ts_us); +// probably jso->_to_json_strinf -> custom function +// jso->o.c_uint64 -> map this with pointer to signed int +// commit that implements json_object_new_uint64 is 3c3b592 +// between 0.14 and 0.15 + tmp = json_object_new_int64(ts_us); + json_object_object_add(obj, "timestamp-offset-usec", tmp); + + tmp = json_object_new_int64(aclk_session_sec); + json_object_object_add(obj, "connect", tmp); + +// TODO handle this somehow see above +// tmp = json_object_new_uint64(0 /* TODO aclk_session_us */); + tmp = json_object_new_int64(aclk_session_us); + json_object_object_add(obj, "connect-offset-usec", tmp); + + tmp = json_object_new_int(version); + json_object_object_add(obj, "version", tmp); + + return obj; +} + +static char *create_uuid() +{ + uuid_t uuid; + char *uuid_str = mallocz(36 + 1); + + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + + return uuid_str; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +#ifndef __GNUC__ +#pragma region aclk_tx_msgs message generators +#endif + +/* + * This will send the /api/v1/info + */ +#define BUFFER_INITIAL_SIZE (1024 * 16) +void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host) +{ + BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); + json_object *msg, *payload, *tmp; + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (metadata_submitted) + msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg); + else + msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + + payload = json_object_new_object(); + json_object_object_add(msg, "payload", payload); + + web_client_api_request_v1_info_fill_buffer(host, local_buffer); + tmp = json_tokener_parse(local_buffer->buffer); + json_object_object_add(payload, "info", tmp); + + buffer_flush(local_buffer); + + charts2json(host, local_buffer, 1, 0); + tmp = json_tokener_parse(local_buffer->buffer); + json_object_object_add(payload, "charts", tmp); + + aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); + + json_object_put(msg); + freez(msg_id); + buffer_free(local_buffer); +} + +// TODO should include header instead +void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); + +void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted) +{ + BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE); + json_object *msg, *payload, *tmp; + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + + if (metadata_submitted) + msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + else + msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + + payload = json_object_new_object(); + json_object_object_add(msg, "payload", payload); + + health_alarms2json(localhost, local_buffer, 1); + tmp = json_tokener_parse(local_buffer->buffer); + json_object_object_add(payload, "configured-alarms", tmp); + + buffer_flush(local_buffer); + + health_active_log_alarms_2json(localhost, local_buffer); + tmp = json_tokener_parse(local_buffer->buffer); + json_object_object_add(payload, "alarms-active", tmp); + + aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS); + + json_object_put(msg); + freez(msg_id); + buffer_free(local_buffer); +} + +void aclk_hello_msg(mqtt_wss_client client) +{ + json_object *tmp, *msg; + + char *msg_id = create_uuid(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + ACLK_SHARED_STATE_UNLOCK; + + //Hello message is versioned separatelly from the rest of the protocol + msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); + + tmp = json_object_new_int(ACLK_VERSION_MIN); + json_object_object_add(msg, "min-version", tmp); + + tmp = json_object_new_int(ACLK_VERSION_MAX); + json_object_object_add(msg, "max-version", tmp); + +#ifdef ACLK_NG + tmp = json_object_new_string("Next Generation"); +#else + tmp = json_object_new_string("Legacy"); +#endif + json_object_object_add(msg, "aclk-implementation", tmp); + + aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA); + + json_object_put(msg); + freez(msg_id); +} + +void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) +{ + json_object *tmp, *msg; + + msg = create_hdr("http", msg_id, 0, 0, 2); + + tmp = json_object_new_int64(t_exec); + json_object_object_add(msg, "t-exec", tmp); + + tmp = json_object_new_int64(created); + json_object_object_add(msg, "t-rx", tmp); + + tmp = json_object_new_int(http_code); + json_object_object_add(msg, "http-code", tmp); + + aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); + json_object_put(msg); +} + +void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart) +{ + json_object *msg, *payload; + BUFFER *tmp_buffer; + RRDSET *st; + + st = rrdset_find(host, chart); + if (!st) + st = rrdset_find_byname(host, chart); + if (!st) { + info("FAILED to find chart %s", chart); + return; + } + + tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE); + rrdset2json(st, tmp_buffer, NULL, NULL, 1); + payload = json_tokener_parse(tmp_buffer->buffer); + if (!payload) { + error("Failed to parse JSON from rrdset2json"); + buffer_free(tmp_buffer); + return; + } + + msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg); + json_object_object_add(msg, "payload", payload); + + aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART); + + buffer_free(tmp_buffer); + json_object_put(msg); +} + +void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg) +{ + // we create header here on purpose (and not send message with it already as `msg` param) + // one is version_neg is guaranteed to be done here + // other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy + // send message with timestamps already to Query Queue they would be incorrect at time + // when query queue would get to send them) + json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg); + json_object_object_add(obj, "payload", msg); + + aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS); + json_object_put(obj); +} + +/* + * Will generate disconnect message. + * @param message if NULL it will generate LWT message (unexpected). + * Otherwise string pointed to by this parameter will be used as + * reason. + */ +json_object *aclk_generate_disconnect(const char *message) +{ + json_object *tmp, *msg; + + msg = create_hdr("disconnect", NULL, 0, 0, 2); + + tmp = json_object_new_string(message ? message : "unexpected"); + json_object_object_add(msg, "payload", tmp); + + return msg; +} + +int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message) +{ + int pid; + json_object *msg = aclk_generate_disconnect(message); + pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA); + json_object_put(msg); + return pid; +} + +#ifndef __GNUC__ +#pragma endregion +#endif diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h new file mode 100644 index 000000000..cb4d44c96 --- /dev/null +++ b/aclk/aclk_tx_msgs.h @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_TX_MSGS_H +#define ACLK_TX_MSGS_H + +#include <json-c/json.h> +#include "libnetdata/libnetdata.h" +#include "../daemon/common.h" +#include "mqtt_wss_client.h" + +void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host); +void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted); + +void aclk_hello_msg(mqtt_wss_client client); + +void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); + +void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart); + +void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg); + +json_object *aclk_generate_disconnect(const char *message); +int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message); + +#endif diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c new file mode 100644 index 000000000..a5347c466 --- /dev/null +++ b/aclk/aclk_util.c @@ -0,0 +1,347 @@ +#include "aclk_util.h" + +#include <stdio.h> + +#include "../daemon/common.h" + +// CentOS 7 has older version that doesn't define this +// same goes for MacOS +#ifndef UUID_STR_LEN +#define UUID_STR_LEN 37 +#endif + +#ifdef ACLK_LOG_CONVERSATION_DIR +volatile int aclk_conversation_log_counter = 0; +#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS) +netdata_mutex_t aclk_conversation_log_mutex = NETDATA_MUTEX_INITIALIZER; +int aclk_get_conv_log_next() +{ + int ret; + netdata_mutex_lock(&aclk_conversation_log_mutex); + ret = aclk_conversation_log_counter++; + netdata_mutex_unlock(&aclk_conversation_log_mutex); + return ret; +} +#endif +#endif + +#define ACLK_TOPIC_PREFIX "/agent/" + +struct aclk_topic { + const char *topic_suffix; + char *topic; +}; + +// This helps to cache finalized topics (assembled with claim_id) +// to not have to alloc or create buffer and construct topic every +// time message is sent as in old ACLK +static struct aclk_topic aclk_topic_cache[] = { + { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_CHART + { .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS + { .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_METADATA + { .topic_suffix = "inbound/cmd", .topic = NULL }, // ACLK_TOPICID_COMMAND + { .topic_suffix = NULL, .topic = NULL } +}; + +void free_topic_cache(void) +{ + struct aclk_topic *tc = aclk_topic_cache; + while (tc->topic_suffix) { + if (tc->topic) { + freez(tc->topic); + tc->topic = NULL; + } + tc++; + } +} + +static inline void generate_topic_cache(void) +{ + struct aclk_topic *tc = aclk_topic_cache; + char *ptr; + if (unlikely(!tc->topic)) { + rrdhost_aclk_state_lock(localhost); + while(tc->topic_suffix) { + tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix)); + ptr = tc->topic; + strcpy(ptr, ACLK_TOPIC_PREFIX); + ptr += strlen(ACLK_TOPIC_PREFIX); + strcpy(ptr, localhost->aclk_state.claimed_id); + ptr += (UUID_STR_LEN - 1); + *ptr++ = '/'; + strcpy(ptr, tc->topic_suffix); + tc++; + } + rrdhost_aclk_state_unlock(localhost); + } +} + +/* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ +const char *aclk_get_topic(enum aclk_topics topic) +{ + generate_topic_cache(); + + return aclk_topic_cache[topic].topic; +} + +int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) +{ + int pos = 0; + if (!strncmp("https://", url, 8)) { + pos = 8; + } else if (!strncmp("http://", url, 7)) { + error("Cannot connect ACLK over %s -> unencrypted link is not supported", url); + return 1; + } + int host_end = pos; + while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':') + host_end++; + if (url[host_end] == 0) { + *aclk_hostname = strdupz(url + pos); + *aclk_port = 443; + info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; + } + if (url[host_end] == ':') { + *aclk_hostname = callocz(host_end - pos + 1, 1); + strncpy(*aclk_hostname, url + pos, host_end - pos); + int port_end = host_end + 1; + while (url[port_end] >= '0' && url[port_end] <= '9') + port_end++; + if (port_end - host_end > 6) { + error("Port specified in %s is invalid", url); + freez(*aclk_hostname); + *aclk_hostname = NULL; + return 1; + } + *aclk_port = atoi(&url[host_end+1]); + } + if (url[host_end] == '/') { + *aclk_port = 443; + *aclk_hostname = callocz(1, host_end - pos + 1); + strncpy(*aclk_hostname, url+pos, host_end - pos); + } + info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); + return 0; +} + +/* + * TBEB with randomness + * + * @param mode 0 - to reset the delay, + * 1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * @returns delay in ms + * + */ +#define ACLK_MAX_BACKOFF_DELAY 1024 +unsigned long int aclk_reconnect_delay(int mode) +{ + static int fail = -1; + unsigned long int delay; + + if (!mode || fail == -1) { + srandom(time(NULL)); + fail = mode - 1; + return 0; + } + + delay = (1 << fail); + + if (delay >= ACLK_MAX_BACKOFF_DELAY) { + delay = ACLK_MAX_BACKOFF_DELAY * 1000; + } else { + fail++; + delay *= 1000; + delay += (random() % (MAX(1000, delay/2))); + } + + return delay; +} + +#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://" +#define ACLK_PROXY_ENV "env" +#define ACLK_PROXY_CONFIG_VAR "proxy" + +struct { + ACLK_PROXY_TYPE type; + const char *url_str; +} supported_proxy_types[] = { + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR }, + { .type = PROXY_TYPE_UNKNOWN, .url_str = NULL }, +}; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type) +{ + switch (*type) { + case PROXY_DISABLED: + return "disabled"; + case PROXY_TYPE_HTTP: + return "HTTP"; + case PROXY_TYPE_SOCKS5: + return "SOCKS"; + default: + return "Unknown"; + } +} + +static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string) +{ + int i = 0; + while (supported_proxy_types[i].url_str) { + if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str))) + return supported_proxy_types[i].type; + i++; + } + return PROXY_TYPE_UNKNOWN; +} + +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string) +{ + if (!string) + return PROXY_TYPE_UNKNOWN; + + while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove) + string++; + + if (!*string) + return PROXY_TYPE_UNKNOWN; + + return aclk_find_proxy(string); +} + +// helper function to censor user&password +// for logging purposes +void safe_log_proxy_censor(char *proxy) +{ + size_t length = strlen(proxy); + char *auth = proxy + length - 1; + char *cur; + + while ((auth >= proxy) && (*auth != '@')) + auth--; + + //if not found or @ is first char do nothing + if (auth <= proxy) + return; + + cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR); + if (!cur) + cur = proxy; + else + cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); + + while (cur < auth) { + *cur = 'X'; + cur++; + } +} + +static inline void safe_log_proxy_error(char *str, const char *proxy) +{ + char *log = strdupz(proxy); + safe_log_proxy_censor(log); + error("%s Provided Value:\"%s\"", str, log); + freez(log); +} + +static inline int check_socks_enviroment(const char **proxy) +{ + char *tmp = getenv("socks_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +static inline int check_http_enviroment(const char **proxy) +{ + char *tmp = getenv("http_proxy"); + + if (!tmp) + return 1; + + if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) { + *proxy = tmp; + return 0; + } + + safe_log_proxy_error( + "Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".", + tmp); + return 1; +} + +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) +{ + const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV); + *type = PROXY_DISABLED; + + if (strcmp(proxy, "none") == 0) + return proxy; + + if (strcmp(proxy, ACLK_PROXY_ENV) == 0) { + if (check_socks_enviroment(&proxy) == 0) { +#ifdef LWS_WITH_SOCKS5 + *type = PROXY_TYPE_SOCKS5; + return proxy; +#else + safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy " + "but Libwebsockets used doesn't have SOCKS5 support built in. " + "Ignoring and checking for other options.", + proxy); +#endif + } + if (check_http_enviroment(&proxy) == 0) + *type = PROXY_TYPE_HTTP; + return proxy; + } + + *type = aclk_verify_proxy(proxy); +#ifndef LWS_WITH_SOCKS5 + if (*type == PROXY_TYPE_SOCKS5) { + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.", + proxy); + } +#endif + if (*type == PROXY_TYPE_UNKNOWN) { + *type = PROXY_DISABLED; + safe_log_proxy_error( + "Config var \"" ACLK_PROXY_CONFIG_VAR + "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", + proxy); + } + + return proxy; +} + +// helper function to read settings only once (static) +// as claiming, challenge/response and ACLK +// read the same thing, no need to parse again +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type) +{ + static const char *proxy = NULL; + static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET; + + if (proxy_type == PROXY_NOT_SET) + proxy = aclk_lws_wss_get_proxy_setting(&proxy_type); + + *type = proxy_type; + return proxy; +} diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h new file mode 100644 index 000000000..c72329791 --- /dev/null +++ b/aclk/aclk_util.h @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#ifndef ACLK_UTIL_H +#define ACLK_UTIL_H + +#include "libnetdata/libnetdata.h" + +// Helper stuff which should not have any further inside ACLK dependency +// and are supposed not to be needed outside of ACLK + +int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); + +enum aclk_topics { + ACLK_TOPICID_CHART = 0, + ACLK_TOPICID_ALARMS = 1, + ACLK_TOPICID_METADATA = 2, + ACLK_TOPICID_COMMAND = 3, +}; + +const char *aclk_get_topic(enum aclk_topics topic); +void free_topic_cache(void); +// TODO +// aclk_topics_reload //when claim id changes + +#ifdef ACLK_LOG_CONVERSATION_DIR +extern volatile int aclk_conversation_log_counter; +#if defined(HAVE_C___ATOMIC) && !defined(NETDATA_NO_ATOMIC_INSTRUCTIONS) +#define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST) +#else +extern netdata_mutex_t aclk_conversation_log_mutex; +int aclk_get_conv_log_next(); +#define ACLK_GET_CONV_LOG_NEXT() aclk_get_conv_log_next() +#endif +#endif + +unsigned long int aclk_reconnect_delay(int mode); + +typedef enum aclk_proxy_type { + PROXY_TYPE_UNKNOWN = 0, + PROXY_TYPE_SOCKS5, + PROXY_TYPE_HTTP, + PROXY_DISABLED, + PROXY_NOT_SET, +} ACLK_PROXY_TYPE; + +const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); +ACLK_PROXY_TYPE aclk_verify_proxy(const char *string); +const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type); +void safe_log_proxy_censor(char *proxy); +int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); +const char *aclk_get_proxy(ACLK_PROXY_TYPE *type); + +#endif /* ACLK_UTIL_H */ diff --git a/aclk/https_client.c b/aclk/https_client.c new file mode 100644 index 000000000..1b9546d77 --- /dev/null +++ b/aclk/https_client.c @@ -0,0 +1,246 @@ +#include "libnetdata/libnetdata.h" + +#include "https_client.h" + +#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h" + +enum http_parse_state { + HTTP_PARSE_INITIAL = 0, + HTTP_PARSE_HEADERS, + HTTP_PARSE_CONTENT +}; + +typedef struct { + enum http_parse_state state; + int content_length; + int http_code; +} http_parse_ctx; + +#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 } + +#define NEED_MORE_DATA 0 +#define PARSE_SUCCESS 1 +#define PARSE_ERROR -1 +#define HTTP_LINE_TERM "\x0D\x0A" +#define RESP_PROTO "HTTP/1.1 " +#define HTTP_KEYVAL_SEPARATOR ": " +#define HTTP_HDR_BUFFER_SIZE 256 +#define PORT_STR_MAX_BYTES 7 + +static void process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const char *val) +{ + // currently we care only about content-length + // but in future the way this is written + // it can be extended + if (!strcmp("content-length", key)) { + parse_ctx->content_length = atoi(val); + } +} + +static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx) +{ + int idx, idx_end; + char buf_key[HTTP_HDR_BUFFER_SIZE]; + char buf_val[HTTP_HDR_BUFFER_SIZE]; + char *ptr = buf_key; + if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx_end)) { + error("CRLF expected"); + return 1; + } + + char *separator = rbuf_find_bytes(buf, HTTP_KEYVAL_SEPARATOR, strlen(HTTP_KEYVAL_SEPARATOR), &idx); + if (!separator) { + error("Missing Key/Value separator"); + return 1; + } + if (idx >= HTTP_HDR_BUFFER_SIZE) { + error("Key name is too long"); + return 1; + } + + rbuf_pop(buf, buf_key, idx); + buf_key[idx] = 0; + + rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR)); + idx_end -= strlen(HTTP_KEYVAL_SEPARATOR) + idx; + if (idx_end >= HTTP_HDR_BUFFER_SIZE) { + error("Value of key \"%s\" too long", buf_key); + return 1; + } + + rbuf_pop(buf, buf_val, idx_end); + buf_val[idx_end] = 0; + + rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR)); + + for (ptr = buf_key; *ptr; ptr++) + *ptr = tolower(*ptr); + + process_http_hdr(parse_ctx, buf_key, buf_val); + + return 0; +} + +static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) +{ + int idx; + char rc[4]; + + do { + if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx)) + return NEED_MORE_DATA; + switch (parse_ctx->state) { + case HTTP_PARSE_INITIAL: + if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) { + error("Expected response to start with \"%s\"", RESP_PROTO); + return PARSE_ERROR; + } + rbuf_bump_tail(buf, strlen(RESP_PROTO)); + if (rbuf_pop(buf, rc, 4) != 4) { + error("Expected HTTP status code"); + return PARSE_ERROR; + } + if (rc[3] != ' ') { + error("Expected space after HTTP return code"); + return PARSE_ERROR; + } + rc[3] = 0; + parse_ctx->http_code = atoi(rc); + if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) { + error("HTTP code not in range 100 to 599"); + return PARSE_ERROR; + } + + rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx); + + rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM)); + + parse_ctx->state = HTTP_PARSE_HEADERS; + break; + case HTTP_PARSE_HEADERS: + if (!idx) { + parse_ctx->state = HTTP_PARSE_CONTENT; + rbuf_bump_tail(buf, strlen(HTTP_LINE_TERM)); + break; + } + if (parse_http_hdr(buf, parse_ctx)) + return PARSE_ERROR; + rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx); + rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM)); + break; + case HTTP_PARSE_CONTENT: + if (parse_ctx->content_length < 0) { + error("content-length missing and http headers ended"); + return PARSE_ERROR; + } + if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length) + return PARSE_SUCCESS; + return NEED_MORE_DATA; + } + } while(1); +} + +int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload) +{ + struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; + char sport[PORT_STR_MAX_BYTES]; + size_t len = 0; + int rc = 1; + int ret; + char *ptr; + http_parse_ctx parse_ctx = HTTP_PARSE_CTX_INITIALIZER; + + rbuf_t buffer = rbuf_create(b_size); + if (!buffer) + return 1; + + snprintf(sport, PORT_STR_MAX_BYTES, "%d", port); + + if (payload != NULL) + len = strlen(payload); + + snprintf( + b, + b_size, + "%s %s HTTP/1.1\r\nHost: %s\r\nAccept: application/json\r\nContent-length: %zu\r\nAccept-Language: en-us\r\n" + "User-Agent: Netdata/rocks\r\n\r\n", + (method == HTTP_REQ_GET ? "GET" : "POST"), url, host, len); + + if (payload != NULL) + strncat(b, payload, b_size - len); + + len = strlen(b); + + debug(D_ACLK, "Sending HTTPS req (%zu bytes): '%s'", len, b); + int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, sport, &timeout); + + if (unlikely(sock == -1)) { + error("Handshake failed"); + goto exit_buf; + } + + SSL_CTX *ctx = security_initialize_openssl_client(); + if (ctx==NULL) { + error("Cannot allocate SSL context"); + goto exit_sock; + } + // Certificate chain: not updating the stores - do we need private CA roots? + // Calls to SSL_CTX_load_verify_locations would go here. + SSL *ssl = SSL_new(ctx); + if (ssl==NULL) { + error("Cannot allocate SSL"); + goto exit_CTX; + } + SSL_set_fd(ssl, sock); + ret = SSL_connect(ssl); + if (ret != 1) { + error("SSL_connect() failed with err=%d", ret); + goto exit_SSL; + } + + ret = SSL_write(ssl, b, len); + if (ret <= 0) + { + error("SSL_write() failed with err=%d", ret); + goto exit_SSL; + } + + b[0] = 0; + + do { + ptr = rbuf_get_linear_insert_range(buffer, &len); + ret = SSL_read(ssl, ptr, len - 1); + if (ret) + rbuf_bump_head(buffer, ret); + if (ret <= 0) + { + error("No response available - SSL_read()=%d", ret); + goto exit_FULL; + } + } while (!(ret = parse_http_response(buffer, &parse_ctx))); + + if (ret != PARSE_SUCCESS) { + error("Error parsing HTTP response"); + goto exit_FULL; + } + + if (parse_ctx.http_code < 200 || parse_ctx.http_code >= 300) { + error("HTTP Response not Success (got %d)", parse_ctx.http_code); + goto exit_FULL; + } + + len = rbuf_pop(buffer, b, b_size); + b[MIN(len, b_size-1)] = 0; + + rc = 0; +exit_FULL: +exit_SSL: + SSL_free(ssl); +exit_CTX: + SSL_CTX_free(ctx); +exit_sock: + close(sock); +exit_buf: + rbuf_free(buffer); + return rc; +} diff --git a/aclk/https_client.h b/aclk/https_client.h new file mode 100644 index 000000000..0d2e0dba7 --- /dev/null +++ b/aclk/https_client.h @@ -0,0 +1,11 @@ +#ifndef NETDATA_HTTPS_CLIENT_H +#define NETDATA_HTTPS_CLIENT_H + +typedef enum http_req_type { + HTTP_REQ_GET, + HTTP_REQ_POST +} http_req_type_t; + +int https_request(http_req_type_t method, char *host, int port, char *url, char *b, size_t b_size, char *payload); + +#endif /* NETDATA_HTTPS_CLIENT_H */ diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c index d7188b1f0..43455393a 100644 --- a/aclk/legacy/aclk_common.c +++ b/aclk/legacy/aclk_common.c @@ -252,6 +252,7 @@ struct label *add_aclk_host_labels(struct label *label) { proxy_str = "none"; break; } + label = add_label_to_list(label, "_aclk_impl", "Legacy", LABEL_SOURCE_AUTO); return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO); #else return label; diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c index c1856ed2c..f41a230db 100644 --- a/aclk/legacy/aclk_lws_https_client.c +++ b/aclk/legacy/aclk_lws_https_client.c @@ -3,7 +3,11 @@ #define ACLK_LWS_HTTPS_CLIENT_INTERNAL #include "aclk_lws_https_client.h" +#ifndef ACLK_NG #include "aclk_common.h" +#else +#include "../aclk.h" +#endif #include "aclk_lws_wss_client.h" diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c index f06df3f42..df221dd60 100644 --- a/aclk/legacy/aclk_lws_wss_client.c +++ b/aclk/legacy/aclk_lws_wss_client.c @@ -348,6 +348,7 @@ static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, return 1; } +#ifdef ACLK_TRP_DEBUG_VERBOSE static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) { switch (reason) { @@ -377,12 +378,11 @@ static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) return "LWS_CALLBACK_EVENT_WAIT_CANCELLED"; default: // Not using an internal buffer here for thread-safety with unknown calling context. -#ifdef ACLK_TRP_DEBUG_VERBOSE error("Unknown LWS callback %u", reason); -#endif return "unknown"; } } +#endif void aclk_lws_wss_fail_report() { diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c index 7ab534f16..27ad9ac16 100644 --- a/aclk/legacy/aclk_query.c +++ b/aclk/legacy/aclk_query.c @@ -22,6 +22,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; struct aclk_query { usec_t created; + struct timeval tv_in; usec_t created_boot_time; time_t run_after; // Delay run until after this time ACLK_CMD cmd; // What command is this @@ -30,6 +31,7 @@ struct aclk_query { char *msg_id; // msg_id generated by the cloud (NULL if internal) char *query; // The actual query u_char deleted; // Mark deleted for garbage collect + int idx; // index of query thread struct aclk_query *next; }; @@ -62,6 +64,7 @@ static void aclk_query_free(struct aclk_query *this_query) freez(this_query->query); if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) { struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data; + freez(del->query_endpoint); freez(del->data); freez(del); } @@ -236,7 +239,8 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run new_query->data = data; new_query->next = NULL; - new_query->created = now_realtime_usec(); + now_realtime_timeval(&new_query->tv_in); + new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec; new_query->created_boot_time = now_boottime_usec(); new_query->run_after = run_after; @@ -324,6 +328,7 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli #pragma region ACLK_QUERY #endif + static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) { usec_t t = now_boottime_usec(); @@ -359,8 +364,11 @@ static int aclk_execute_query(struct aclk_query *this_query) mysep = strrchr(this_query->query, '/'); // TODO: handle bad response perhaps in a different way. For now it does to the payload - aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + w->tv_in = this_query->tv_in; now_realtime_timeval(&w->tv_ready); + aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + size_t size = w->response.data->len; + size_t sent = size; w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -382,6 +390,24 @@ static int aclk_execute_query(struct aclk_query *this_query) aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); + struct timeval tv; + now_realtime_timeval(&tv); + + log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + w->id + , gettid() + , this_query->idx + , "DATA" + , sent + , size + , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) + , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 + , dt_usec(&tv, &w->tv_ready) / 1000.0 + , dt_usec(&tv, &w->tv_in) / 1000.0 + , w->response.code + , strip_control_characters(this_query->query) + ); + buffer_free(w->response.data); buffer_free(w->response.header); buffer_free(w->response.header_output); @@ -426,7 +452,11 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) mysep = strrchr(this_query->query, '/'); // execute the query + w->tv_in = this_query->tv_in; + now_realtime_timeval(&w->tv_ready); t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); + size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len; + size_t sent = size; #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used @@ -475,7 +505,6 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) } #endif - now_realtime_timeval(&w->tv_ready); w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -492,6 +521,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) buffer_need_bytes(local_buffer, w->response.data->len); memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); local_buffer->len += w->response.data->len; + sent = sent - size + w->response.data->len; } else { #endif buffer_strcat(local_buffer, w->response.data->buffer); @@ -502,6 +532,23 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id); + struct timeval tv; + now_realtime_timeval(&tv); + + log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", + w->id + , gettid() + , this_query->idx + , "DATA" + , sent + , size + , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) + , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 + , dt_usec(&tv, &w->tv_ready) / 1000.0 + , dt_usec(&tv, &w->tv_in) / 1000.0 + , w->response.code + , strip_control_characters(this_query->query) + ); cleanup: #ifdef NETDATA_WITH_ZLIB if(w->response.zinitialized) @@ -550,6 +597,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info) query_count++; host = (RRDHOST*)this_query->data; + this_query->idx = t_info->idx; debug( D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic, @@ -629,6 +677,12 @@ static int aclk_process_query(struct aclk_query_thread *t_info) aclk_metrics_per_sample.queries_dispatched++; aclk_queries_per_thread[t_info->idx]++; ACLK_STATS_UNLOCK; + + if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) { + getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]); + getrusage_called_this_tick[t_info->idx]++; + } + } aclk_query_free(this_query); diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h index 53eef1392..026985c8d 100644 --- a/aclk/legacy/aclk_query.h +++ b/aclk/legacy/aclk_query.h @@ -8,8 +8,11 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable +#define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread. + extern pthread_cond_t query_cond_wait; extern pthread_mutex_t query_lock_wait; +extern uint8_t *getrusage_called_this_tick; #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait) #define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait) @@ -28,6 +31,7 @@ struct aclk_query_threads { struct aclk_cloud_req_v2 { char *data; RRDHOST *host; + char *query_endpoint; }; void *aclk_query_main_thread(void *ptr); diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c index 99fa9d987..2681445b4 100644 --- a/aclk/legacy/aclk_rx_msgs.c +++ b/aclk/legacy/aclk_rx_msgs.c @@ -25,7 +25,7 @@ static inline int aclk_extract_v2_data(char *payload, char **data) #define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref)) static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req) { - const char *start, *end, *ptr; + const char *start, *end, *ptr, *query_type; char uuid_str[UUID_STR_LEN]; uuid_t uuid; @@ -66,6 +66,8 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); return 1; } + ptr += strlen(ACLK_CLOUD_REQ_V2_PREFIX); + query_type = ptr; if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) { errno = 0; @@ -73,6 +75,11 @@ static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, return 1; } + if(!(ptr = strchr(ptr, '?')) || ptr > end) + ptr = end; + cloud_req->query_endpoint = mallocz((ptr - query_type) + 1); + strncpyz(cloud_req->query_endpoint, query_type, ptr - query_type); + req->payload = mallocz((end - start) + 1); strncpyz(req->payload, start, end - start); @@ -122,6 +129,13 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_v1++; + aclk_metrics_per_sample.cloud_req_ok++; + ACLK_STATS_UNLOCK; + } + return 0; } @@ -131,6 +145,7 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha struct aclk_cloud_req_v2 *cloud_req; char *data; + int stat_idx; errno = 0; if (cloud_to_agent->version < ACLK_V_COMPRESSION) { @@ -165,6 +180,10 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha goto cleanup; } + // we do this here due to cloud_req being taken over by query thread + // which if crazy quick can free it after aclk_queue_query + stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint); + // aclk_queue_query takes ownership of data pointer if (unlikely(aclk_queue_query( cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, @@ -173,8 +192,17 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha goto cleanup; } + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.cloud_req_v2++; + aclk_metrics_per_sample.cloud_req_ok++; + aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; + ACLK_STATS_UNLOCK; + } + return 0; cleanup: + freez(cloud_req->query_endpoint); freez(cloud_req->data); freez(cloud_req); return 1; @@ -289,12 +317,6 @@ int aclk_handle_cloud_message(char *payload) struct aclk_request cloud_to_agent; memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_req_recvd++; - ACLK_STATS_UNLOCK; - } - if (unlikely(!payload)) { errno = 0; error("ACLK incoming message is empty"); diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c index 2a57cd6f0..7124380a2 100644 --- a/aclk/legacy/aclk_stats.c +++ b/aclk/legacy/aclk_stats.c @@ -11,8 +11,17 @@ struct aclk_qt_data { RRDDIM *dim; } *aclk_qt_data = NULL; +// ACLK per query thread cpu stats +struct aclk_cpu_data { + RRDDIM *user; + RRDDIM *system; + RRDSET *st; +} *aclk_cpu_data = NULL; + uint32_t *aclk_queries_per_thread = NULL; uint32_t *aclk_queries_per_thread_sample = NULL; +struct rusage *rusage_per_thread; +uint8_t *getrusage_called_this_tick = NULL; struct aclk_metrics aclk_metrics = { .online = 0, @@ -153,7 +162,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample) static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; - static RRDDIM *rd_rq_rcvd = NULL; + static RRDDIM *rd_rq_ok = NULL; static RRDDIM *rd_rq_err = NULL; if (unlikely(!st)) { @@ -161,17 +170,82 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample) "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s", "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_ok = rrddim_add(st, "accepted", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_err = rrddim_add(st, "rejected", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } else rrdset_next(st); - rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err); + rrddim_set_by_pointer(st, rd_rq_ok, per_sample->cloud_req_ok); rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err); rrdset_done(st); } +static void aclk_stats_cloud_req_version(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st = NULL; + static RRDDIM *rd_rq_v1 = NULL; + static RRDDIM *rd_rq_v2 = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req_version", NULL, "aclk", NULL, "Requests received from cloud by their version", "req/s", + "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + rd_rq_v1 = rrddim_add(st, "v1", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + rd_rq_v2 = rrddim_add(st, "v2+", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + rrddim_set_by_pointer(st, rd_rq_v1, per_sample->cloud_req_v1); + rrddim_set_by_pointer(st, rd_rq_v2, per_sample->cloud_req_v2); + + rrdset_done(st); +} + +static char *cloud_req_type_names[ACLK_STATS_CLOUD_REQ_TYPE_CNT] = { + "other", + "info", + "data", + "alarms", + "alarm_log", + "chart", + "charts" + // if you change update: + // #define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7 +}; + +int aclk_cloud_req_type_to_idx(const char *name) +{ + for (int i = 1; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++) + if (!strcmp(cloud_req_type_names[i], name)) + return i; + return 0; +} + +static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample) +{ + static RRDSET *st; + static int initialized = 0; + static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_REQ_TYPE_CNT]; + + if (unlikely(!initialized)) { + initialized = 1; + st = rrdset_create_localhost( + "netdata", "aclk_cloud_req_cmd", NULL, "aclk", NULL, "Requests received from cloud by their type (api endpoint queried)", "req/s", + "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++) + rd_rq_types[i] = rrddim_add(st, cloud_req_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(st); + + for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++) + rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_by_type[i]); + + rrdset_done(st); +} + #define MAX_DIM_NAME 16 static void aclk_stats_query_threads(uint32_t *queries_per_thread) { @@ -182,7 +256,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) if (unlikely(!st)) { st = rrdset_create_localhost( "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", - "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED); for (int i = 0; i < query_thread_count; i++) { if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) @@ -222,11 +296,42 @@ static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct rrdset_done(metric->st); } +static void aclk_stats_cpu_threads(void) +{ + char id[100 + 1]; + char title[100 + 1]; + + for (int i = 0; i < query_thread_count; i++) { + if (unlikely(!aclk_cpu_data[i].st)) { + + snprintfz(id, 100, "aclk_thread%d_cpu", i); + snprintfz(title, 100, "Cpu Usage For Thread No %d", i); + + aclk_cpu_data[i].st = rrdset_create_localhost( + "netdata", id, NULL, "aclk", NULL, title, "milliseconds/s", + "netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + aclk_cpu_data[i].user = rrddim_add(aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + aclk_cpu_data[i].system = rrddim_add(aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + + } else + rrdset_next(aclk_cpu_data[i].st); + } + + for (int i = 0; i < query_thread_count; i++) { + rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec); + rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec); + rrdset_done(aclk_cpu_data[i].st); + } +} + void aclk_stats_thread_cleanup() { freez(aclk_qt_data); freez(aclk_queries_per_thread); freez(aclk_queries_per_thread_sample); + freez(aclk_cpu_data); + freez(rusage_per_thread); } void *aclk_stats_main_thread(void *ptr) @@ -235,8 +340,11 @@ void *aclk_stats_main_thread(void *ptr) query_thread_count = args->query_thread_count; aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_cpu_data = callocz(query_thread_count, sizeof(struct aclk_cpu_data)); aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + rusage_per_thread = callocz(query_thread_count, sizeof(struct rusage)); + getrusage_called_this_tick = callocz(query_thread_count, sizeof(uint8_t)); heartbeat_t hb; heartbeat_init(&hb); @@ -264,6 +372,7 @@ void *aclk_stats_main_thread(void *ptr) memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); + memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * query_thread_count); ACLK_STATS_UNLOCK; aclk_stats_collect(&per_sample, &permanent); @@ -273,8 +382,14 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_read_q(&per_sample); aclk_stats_cloud_req(&per_sample); + aclk_stats_cloud_req_version(&per_sample); + + aclk_stats_cloud_req_cmd(&per_sample); + aclk_stats_query_threads(aclk_queries_per_thread_sample); + aclk_stats_cpu_threads(); + #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency); #endif diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h index 7e74fdf88..5e50a2272 100644 --- a/aclk/legacy/aclk_stats.h +++ b/aclk/legacy/aclk_stats.h @@ -55,6 +55,11 @@ extern struct aclk_mat_metrics { void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); +#define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7 +// if you change update cloud_req_type_names + +int aclk_cloud_req_type_to_idx(const char *name); + // reset to 0 on every sample extern struct aclk_metrics_per_sample { /* in the unlikely event of ACLK disconnecting @@ -72,9 +77,14 @@ extern struct aclk_metrics_per_sample { volatile uint32_t read_q_added; volatile uint32_t read_q_consumed; - volatile uint32_t cloud_req_recvd; + volatile uint32_t cloud_req_ok; volatile uint32_t cloud_req_err; + volatile uint16_t cloud_req_v1; + volatile uint16_t cloud_req_v2; + + volatile uint16_t cloud_req_by_type[ACLK_STATS_CLOUD_REQ_TYPE_CNT]; + #ifdef NETDATA_INTERNAL_CHECKS struct aclk_metric_mat_data latency; #endif @@ -83,6 +93,7 @@ extern struct aclk_metrics_per_sample { } aclk_metrics_per_sample; extern uint32_t *aclk_queries_per_thread; +extern struct rusage *rusage_per_thread; void *aclk_stats_main_thread(void *ptr); void aclk_stats_thread_cleanup(); diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c index e51a01308..5767df3a7 100644 --- a/aclk/legacy/agent_cloud_link.c +++ b/aclk/legacy/agent_cloud_link.c @@ -189,7 +189,8 @@ unsigned long int aclk_reconnect_delay(int mode) delay = ACLK_MAX_BACKOFF_DELAY * 1000; } else { fail++; - delay = (delay * 1000) + (random() % 1000); + delay *= 1000; + delay += (random() % (MAX(1000, delay/2))); } return delay; |