diff options
Diffstat (limited to 'aclk/legacy')
-rw-r--r-- | aclk/legacy/Makefile.am | 19 | ||||
-rw-r--r-- | aclk/legacy/aclk_common.c | 53 | ||||
-rw-r--r-- | aclk/legacy/aclk_common.h | 51 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_https_client.c | 244 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_https_client.h | 18 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.c | 622 | ||||
-rw-r--r-- | aclk/legacy/aclk_lws_wss_client.h | 92 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.c | 843 | ||||
-rw-r--r-- | aclk/legacy/aclk_query.h | 41 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.c | 388 | ||||
-rw-r--r-- | aclk/legacy/aclk_rx_msgs.h | 13 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.c | 411 | ||||
-rw-r--r-- | aclk/legacy/aclk_stats.h | 100 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.c | 1502 | ||||
-rw-r--r-- | aclk/legacy/agent_cloud_link.h | 85 | ||||
-rw-r--r-- | aclk/legacy/mqtt.c | 370 | ||||
-rw-r--r-- | aclk/legacy/mqtt.h | 25 | ||||
-rw-r--r-- | aclk/legacy/tests/fake-charts.d.plugin | 24 | ||||
-rw-r--r-- | aclk/legacy/tests/install-fake-charts.d.sh.in | 6 | ||||
-rwxr-xr-x | aclk/legacy/tests/launch-paho.sh | 4 | ||||
-rw-r--r-- | aclk/legacy/tests/paho-inspection.py | 59 | ||||
-rw-r--r-- | aclk/legacy/tests/paho.Dockerfile | 14 |
22 files changed, 0 insertions, 4984 deletions
diff --git a/aclk/legacy/Makefile.am b/aclk/legacy/Makefile.am deleted file mode 100644 index 1cd876b4..00000000 --- a/aclk/legacy/Makefile.am +++ /dev/null @@ -1,19 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -CLEANFILES = \ - tests/install-fake-charts.d.sh \ - $(NULL) - -include $(top_srcdir)/build/subst.inc -SUFFIXES = .in - -#sbin_SCRIPTS = \ -# tests/install-fake-charts.d.sh \ -# $(NULL) - -dist_noinst_SCRIPTS = tests/install-fake-charts.d.sh -dist_noinst_DATA = tests/install-fake-charts.d.sh.in - diff --git a/aclk/legacy/aclk_common.c b/aclk/legacy/aclk_common.c deleted file mode 100644 index 7f8368e4..00000000 --- a/aclk/legacy/aclk_common.c +++ /dev/null @@ -1,53 +0,0 @@ -#include "aclk_common.h" - -#include "daemon/common.h" - -#ifdef ENABLE_ACLK -#include <libwebsockets.h> -#endif - -netdata_mutex_t legacy_aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER; - -struct legacy_aclk_shared_state legacy_aclk_shared_state = { - .version_neg = 0, - .version_neg_wait_till = 0 -}; - -int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port) -{ - int pos = 0; - if (!strncmp("https://", url, 8)) { - pos = 8; - } else if (!strncmp("http://", url, 7)) { - error("Cannot connect ACLK over %s -> unencrypted link is not supported", url); - return 1; - } - int host_end = pos; - while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':') - host_end++; - if (url[host_end] == 0) { - *aclk_hostname = strdupz(url + pos); - *aclk_port = 443; - info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); - return 0; - } - if (url[host_end] == ':') { - *aclk_hostname = callocz(host_end - pos + 1, 1); - strncpy(*aclk_hostname, url + pos, host_end - pos); - int port_end = host_end + 1; - while (url[port_end] >= '0' && url[port_end] <= '9') - port_end++; - if (port_end - host_end > 6) { - error("Port specified in %s is invalid", url); - return 0; - } - *aclk_port = atoi(&url[host_end+1]); - } - if (url[host_end] == '/') { - *aclk_port = 443; - *aclk_hostname = callocz(1, host_end - pos + 1); - strncpy(*aclk_hostname, url+pos, host_end - pos); - } - info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url); - return 0; -} diff --git a/aclk/legacy/aclk_common.h b/aclk/legacy/aclk_common.h deleted file mode 100644 index 080680ff..00000000 --- a/aclk/legacy/aclk_common.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef ACLK_COMMON_H -#define ACLK_COMMON_H - -#include "../aclk_rrdhost_state.h" -#include "daemon/common.h" - -extern netdata_mutex_t legacy_aclk_shared_state_mutex; -#define legacy_aclk_shared_state_LOCK netdata_mutex_lock(&legacy_aclk_shared_state_mutex) -#define legacy_aclk_shared_state_UNLOCK netdata_mutex_unlock(&legacy_aclk_shared_state_mutex) - -// minimum and maximum supported version of ACLK -// in this version of agent -#define ACLK_VERSION_MIN 2 -#define ACLK_VERSION_MAX 3 - -// Version negotiation messages have they own versioning -// this is also used for LWT message as we set that up -// before version negotiation -#define ACLK_VERSION_NEG_VERSION 1 - -// Maximum time to wait for version negotiation before aborting -// and defaulting to oldest supported version -#define VERSION_NEG_TIMEOUT 3 - -#if ACLK_VERSION_MIN > ACLK_VERSION_MAX -#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN" -#endif - -// Define ACLK Feature Version Boundaries Here -#define ACLK_V_COMPRESSION 2 -#define ACLK_V_CHILDRENSTATE 3 - -#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING) -#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update) - -extern struct legacy_aclk_shared_state { - // optimization to avoid looping through hosts - // every time Query Thread wakes up - RRDHOST *next_popcorn_host; - - // read only while ACLK connected - // protect by lock otherwise - int version_neg; - usec_t version_neg_wait_till; -} legacy_aclk_shared_state; - -const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type); - -int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port); - -#endif //ACLK_COMMON_H diff --git a/aclk/legacy/aclk_lws_https_client.c b/aclk/legacy/aclk_lws_https_client.c deleted file mode 100644 index 8a490c6f..00000000 --- a/aclk/legacy/aclk_lws_https_client.c +++ /dev/null @@ -1,244 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#define ACLK_LWS_HTTPS_CLIENT_INTERNAL -#include "aclk_lws_https_client.h" -#include "aclk_common.h" -#include "aclk_lws_wss_client.h" - -#define SMALL_BUFFER 16 - -struct simple_hcc_data { - char *data; - size_t data_size; - size_t written; - char lws_work_buffer[1024 + LWS_PRE]; - char *payload; - int response_code; - int done; -}; - -static int simple_https_client_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) -{ - UNUSED(user); - int n; - char *ptr; - char buffer[SMALL_BUFFER]; - struct simple_hcc_data *perconn_data = lws_get_opaque_user_data(wsi); - - switch (reason) { - case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ: - debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ"); - if (perconn_data->data_size - 1 - perconn_data->written < len) - return 1; - memcpy(&perconn_data->data[perconn_data->written], in, len); - perconn_data->written += len; - return 0; - case LWS_CALLBACK_RECEIVE_CLIENT_HTTP: - debug(D_ACLK, "LWS_CALLBACK_RECEIVE_CLIENT_HTTP"); - if(!perconn_data) { - error("Missing Per Connect Data"); - return -1; - } - n = sizeof(perconn_data->lws_work_buffer) - LWS_PRE; - ptr = perconn_data->lws_work_buffer + LWS_PRE; - if (lws_http_client_read(wsi, &ptr, &n) < 0) - return -1; - perconn_data->data[perconn_data->written] = '\0'; - return 0; - case LWS_CALLBACK_WSI_DESTROY: - debug(D_ACLK, "LWS_CALLBACK_WSI_DESTROY"); - if(perconn_data) - perconn_data->done = 1; - return 0; - case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: - debug(D_ACLK, "LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP"); - if(perconn_data) - perconn_data->response_code = lws_http_client_http_response(wsi); - return 0; - case LWS_CALLBACK_CLOSED_CLIENT_HTTP: - debug(D_ACLK, "LWS_CALLBACK_CLOSED_CLIENT_HTTP"); - return 0; - case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: - debug(D_ACLK, "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS"); - return 0; - case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: - debug(D_ACLK, "LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER"); - if(perconn_data && perconn_data->payload) { - unsigned char **p = (unsigned char **)in, *end = (*p) + len; - snprintfz(buffer, SMALL_BUFFER, "%zu", strlen(perconn_data->payload)); - if (lws_add_http_header_by_token(wsi, - WSI_TOKEN_HTTP_CONTENT_LENGTH, - (unsigned char *)buffer, strlen(buffer), p, end)) - return -1; - if (lws_add_http_header_by_token(wsi, - WSI_TOKEN_HTTP_CONTENT_TYPE, - (unsigned char *)ACLK_CONTENT_TYPE_JSON, - strlen(ACLK_CONTENT_TYPE_JSON), p, end)) - return -1; - lws_client_http_body_pending(wsi, 1); - lws_callback_on_writable(wsi); - } - return 0; - case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE: - debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_WRITEABLE"); - if(perconn_data && perconn_data->payload) { - n = strlen(perconn_data->payload); - if(perconn_data->data_size < (size_t)LWS_PRE + n + 1) { - error("Buffer given is not big enough"); - return 1; - } - - memcpy(&perconn_data->data[LWS_PRE], perconn_data->payload, n); - if(n != lws_write(wsi, (unsigned char*)&perconn_data->data[LWS_PRE], n, LWS_WRITE_HTTP)) { - error("lws_write error"); - perconn_data->data[0] = 0; - return 1; - } - lws_client_http_body_pending(wsi, 0); - // clean for subsequent reply read - perconn_data->data[0] = 0; - } - return 0; - case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: - debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL"); - return 0; - case LWS_CALLBACK_WSI_CREATE: - debug(D_ACLK, "LWS_CALLBACK_WSI_CREATE"); - return 0; - case LWS_CALLBACK_PROTOCOL_INIT: - debug(D_ACLK, "LWS_CALLBACK_PROTOCOL_INIT"); - return 0; - case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: - debug(D_ACLK, "LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL"); - return 0; - case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: - debug(D_ACLK, "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"); - return 0; - case LWS_CALLBACK_GET_THREAD_ID: - debug(D_ACLK, "LWS_CALLBACK_GET_THREAD_ID"); - return 0; - case LWS_CALLBACK_EVENT_WAIT_CANCELLED: - debug(D_ACLK, "LWS_CALLBACK_EVENT_WAIT_CANCELLED"); - return 0; - case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: - debug(D_ACLK, "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION"); - return 0; - case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: - debug(D_ACLK, "LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH"); - return 0; - default: - debug(D_ACLK, "Unknown callback %d", (int)reason); - return 0; - } -} - -static const struct lws_protocols protocols[] = { - { - "http", - simple_https_client_callback, - 0, - 0, - 0, - 0, - 0 - }, - { NULL, NULL, 0, 0, 0, 0, 0 } -}; - -static void simple_hcc_log_divert(int level, const char *line) -{ - UNUSED(level); - error("Libwebsockets: %s", line); -} - -int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload) -{ - info("%s %s", __func__, method); - - struct lws_context_creation_info info; - struct lws_client_connect_info i; - struct lws_context *context; - - struct simple_hcc_data *data = callocz(1, sizeof(struct simple_hcc_data)); - data->data = b; - data->data[0] = 0; - data->data_size = b_size; - data->payload = payload; - - int n = 0; - time_t timestamp; - - struct lws_vhost *vhost; - - memset(&info, 0, sizeof info); - - info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; - info.port = CONTEXT_PORT_NO_LISTEN; - info.protocols = protocols; - - - context = lws_create_context(&info); - if (!context) { - error("Error creating LWS context"); - freez(data); - return 1; - } - - lws_set_log_level(LLL_ERR | LLL_WARN, simple_hcc_log_divert); - - lws_service(context, 0); - - memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */ - i.context = context; - -#ifdef ACLK_SSL_ALLOW_SELF_SIGNED - i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE; - info("Disabling SSL certificate checks"); -#else - i.ssl_connection = LCCSCF_USE_SSL; -#endif -#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0 -#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM. - i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; -#endif - - i.port = port; - i.address = host; - i.path = url; - - i.host = i.address; - i.origin = i.address; - i.method = method; - i.opaque_user_data = data; - i.alpn = "http/1.1"; - - i.protocol = protocols[0].name; - - vhost = lws_get_vhost_by_name(context, "default"); - if(!vhost) - fatal("Could not find the default LWS vhost."); - - //set up proxy - aclk_wss_set_proxy(vhost); - - lws_client_connect_via_info(&i); - - // libwebsockets handle connection timeouts already - // this adds additional safety in case of bug in LWS - timestamp = now_monotonic_sec(); - while( n >= 0 && !data->done && !netdata_exit) { - n = lws_service(context, 0); - if( now_monotonic_sec() - timestamp > SEND_HTTPS_REQUEST_TIMEOUT ) { - data->data[0] = 0; - data->done = 1; - error("Servicing LWS took too long."); - } - } - - lws_context_destroy(context); - - n = data->response_code; - - freez(data); - return (n < 200 || n >= 300); -} diff --git a/aclk/legacy/aclk_lws_https_client.h b/aclk/legacy/aclk_lws_https_client.h deleted file mode 100644 index 5f30a37f..00000000 --- a/aclk/legacy/aclk_lws_https_client.h +++ /dev/null @@ -1,18 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_LWS_HTTPS_CLIENT_H -#define NETDATA_LWS_HTTPS_CLIENT_H - -#include "daemon/common.h" -#include "libnetdata/libnetdata.h" - -#define DATAMAXLEN 1024*16 - -#ifdef ACLK_LWS_HTTPS_CLIENT_INTERNAL -#define ACLK_CONTENT_TYPE_JSON "application/json" -#define SEND_HTTPS_REQUEST_TIMEOUT 30 -#endif - -int aclk_send_https_request(char *method, char *host, int port, char *url, char *b, size_t b_size, char *payload); - -#endif /* NETDATA_LWS_HTTPS_CLIENT_H */ diff --git a/aclk/legacy/aclk_lws_wss_client.c b/aclk/legacy/aclk_lws_wss_client.c deleted file mode 100644 index 012f2a8c..00000000 --- a/aclk/legacy/aclk_lws_wss_client.c +++ /dev/null @@ -1,622 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "aclk_lws_wss_client.h" - -#include "libnetdata/libnetdata.h" -#include "daemon/common.h" -#include "aclk_common.h" -#include "aclk_stats.h" -#include "../aclk_proxy.h" - -extern int aclk_shutting_down; - -static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); - -struct aclk_lws_wss_perconnect_data { - int todo; -}; - -static struct aclk_lws_wss_engine_instance *engine_instance = NULL; - -void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len) -{ - if (write_len != NULL && write_len_bytes != NULL) - { - *write_len = 0; - *write_len_bytes = 0; - if (engine_instance != NULL) - { - aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); - - struct lws_wss_packet_buffer *write_b; - size_t w,wb; - for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) - { - w++; - wb += write_b->data_size - write_b->written; - } - *write_len = w; - *write_len_bytes = wb; - aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); - } - } - else if (write_len != NULL) - { - *write_len = 0; - if (engine_instance != NULL) - { - aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); - - struct lws_wss_packet_buffer *write_b; - size_t w; - for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next) - w++; - *write_len = w; - aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); - } - } - if (read_len != NULL) - { - *read_len = 0; - if (engine_instance != NULL) - { - aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); - *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); - aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); - } - } -} - -static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size) -{ - struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); - if (data) { - new->data = mallocz(LWS_PRE + size); - memcpy(new->data + LWS_PRE, data, size); - new->data_size = size; - new->written = 0; - } - return new; -} - -static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item) -{ - struct lws_wss_packet_buffer *tail = *list; - if (!*list) { - *list = item; - return; - } - while (tail->next) { - tail = tail->next; - } - tail->next = item; -} - -static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list) -{ - struct lws_wss_packet_buffer *ret = *list; - if (ret != NULL) - *list = ret->next; - - return ret; -} - -static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item) -{ - freez(item->data); - freez(item); -} - -static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer) -{ - size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL); - if (elems > 0) - lws_ring_consume(ringbuffer, NULL, NULL, elems); -} - -static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list) -{ - struct lws_wss_packet_buffer *i; - while ((i = lws_wss_packet_buffer_pop(list)) != NULL) { - lws_wss_packet_buffer_free(i); - } - *list = NULL; -} - -static inline void aclk_lws_wss_clear_io_buffers() -{ - aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); - _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer); - aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); - aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); - _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head); - aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); -} - -static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback, - sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 }, - { NULL, NULL, 0, 0, 0, 0, 0 } }; - -static void aclk_lws_wss_log_divert(int level, const char *line) -{ - switch (level) { - case LLL_ERR: - error("Libwebsockets Error: %s", line); - break; - case LLL_WARN: - debug(D_ACLK, "Libwebsockets Warn: %s", line); - break; - default: - error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line); - } -} - -static int aclk_lws_wss_client_init( char *target_hostname, int target_port) -{ - static int lws_logging_initialized = 0; - - if (unlikely(!lws_logging_initialized)) { - lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); - lws_logging_initialized = 1; - } - - if (!target_hostname) - return 1; - - engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance)); - - engine_instance->host = target_hostname; - engine_instance->port = target_port; - - - aclk_lws_mutex_init(&engine_instance->write_buf_mutex); - aclk_lws_mutex_init(&engine_instance->read_buf_mutex); - - engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL); - if (!engine_instance->read_ringbuffer) - goto failure_cleanup; - - return 0; - -failure_cleanup: - freez(engine_instance); - return 1; -} - -void aclk_lws_wss_destroy_context() -{ - if (!engine_instance) - return; - if (!engine_instance->lws_context) - return; - lws_context_destroy(engine_instance->lws_context); - engine_instance->lws_context = NULL; -} - - -void aclk_lws_wss_client_destroy() -{ - if (engine_instance == NULL) - return; - - aclk_lws_wss_destroy_context(); - engine_instance->lws_wsi = NULL; - - aclk_lws_wss_clear_io_buffers(); - -#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED - pthread_mutex_destroy(&engine_instance->write_buf_mutex); - pthread_mutex_destroy(&engine_instance->read_buf_mutex); -#endif -} - -#ifdef LWS_WITH_SOCKS5 -static int aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks) -{ - char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR); - - if (!proxy) - return -1; - - proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR); - - if (!*proxy) - return -1; - - return lws_set_socks(vhost, proxy); -} -#endif - -void aclk_wss_set_proxy(struct lws_vhost *vhost) -{ - const char *proxy; - ACLK_PROXY_TYPE proxy_type; - char *log; - - proxy = aclk_get_proxy(&proxy_type); - -#ifdef LWS_WITH_SOCKS5 - lws_set_socks(vhost, ":"); -#endif - lws_set_proxy(vhost, ":"); - - if (proxy_type == PROXY_TYPE_UNKNOWN) { - error("Unknown proxy type"); - return; - } - - if (proxy_type == PROXY_TYPE_SOCKS5 || proxy_type == PROXY_TYPE_HTTP) { - log = strdupz(proxy); - safe_log_proxy_censor(log); - info("Connecting using %s proxy:\"%s\"", aclk_proxy_type_to_s(&proxy_type), log); - freez(log); - } - if (proxy_type == PROXY_TYPE_SOCKS5) { -#ifdef LWS_WITH_SOCKS5 - if (aclk_wss_set_socks(vhost, proxy)) - error("LWS failed to accept socks proxy."); - return; -#else - fatal("We have no SOCKS5 support but we made it here. Programming error!"); -#endif - } - if (proxy_type == PROXY_TYPE_HTTP) { - if (lws_set_proxy(vhost, proxy)) - error("LWS failed to accept http proxy."); - return; - } - if (proxy_type != PROXY_DISABLED) - error("Unknown proxy type"); -} - -// Return code indicates if connection attempt has started async. -int aclk_lws_wss_connect(char *host, int port) -{ - struct lws_client_connect_info i; - struct lws_vhost *vhost; - int n; - - if (!engine_instance) { - if (aclk_lws_wss_client_init(host, port)) - return 1; // Propagate failure - } - - if (!engine_instance->lws_context) - { - // First time through (on this connection), create the context - struct lws_context_creation_info info; - memset(&info, 0, sizeof(struct lws_context_creation_info)); - info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; - info.port = CONTEXT_PORT_NO_LISTEN; - info.protocols = protocols; - engine_instance->lws_context = lws_create_context(&info); - if (!engine_instance->lws_context) - { - error("Failed to create lws_context, ACLK will not function"); - return 1; - } - return 0; - // PROTOCOL_INIT callback will call again. - } - - for (n = 0; n < ACLK_LWS_CALLBACK_HISTORY; n++) - engine_instance->lws_callback_history[n] = 0; - - if (engine_instance->lws_wsi) { - error("Already Connected. Only one connection supported at a time."); - return 0; - } - - memset(&i, 0, sizeof(i)); - i.context = engine_instance->lws_context; - i.port = engine_instance->port; - i.address = engine_instance->host; - i.path = "/mqtt"; - i.host = engine_instance->host; - i.protocol = "mqtt"; - - // from LWS docu: - // If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is - // created; you're expected to create your own vhosts afterwards using - // lws_create_vhost(). Otherwise a vhost named "default" is also created - // using the information in the vhost-related members, for compatibility. - vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default"); - if(!vhost) - fatal("Could not find the default LWS vhost."); - - aclk_wss_set_proxy(vhost); - -#ifdef ACLK_SSL_ALLOW_SELF_SIGNED - i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE; - info("Disabling SSL certificate checks"); -#else - i.ssl_connection = LCCSCF_USE_SSL; -#endif -#if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0 -#warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM. - i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; -#endif - lws_client_connect_via_info(&i); - return 0; -} - -static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len) -{ - if (lws_ring_insert(buffer, data, len) != len) { - error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding."); - return 0; - } - return 1; -} - -#ifdef ACLK_TRP_DEBUG_VERBOSE -static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) -{ - switch (reason) { - case LWS_CALLBACK_CLIENT_WRITEABLE: - return "LWS_CALLBACK_CLIENT_WRITEABLE"; - case LWS_CALLBACK_CLIENT_RECEIVE: - return "LWS_CALLBACK_CLIENT_RECEIVE"; - case LWS_CALLBACK_PROTOCOL_INIT: - return "LWS_CALLBACK_PROTOCOL_INIT"; - case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: - return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED"; - case LWS_CALLBACK_USER: - return "LWS_CALLBACK_USER"; - case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR"; - case LWS_CALLBACK_CLIENT_CLOSED: - return "LWS_CALLBACK_CLIENT_CLOSED"; - case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: - return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE"; - case LWS_CALLBACK_WSI_DESTROY: - return "LWS_CALLBACK_WSI_DESTROY"; - case LWS_CALLBACK_CLIENT_ESTABLISHED: - return "LWS_CALLBACK_CLIENT_ESTABLISHED"; - case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: - return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION"; - case LWS_CALLBACK_EVENT_WAIT_CANCELLED: - return "LWS_CALLBACK_EVENT_WAIT_CANCELLED"; - default: - // Not using an internal buffer here for thread-safety with unknown calling context. - error("Unknown LWS callback %u", reason); - return "unknown"; - } -} -#endif - -void aclk_lws_wss_fail_report() -{ - int i; - int anything_to_send = 0; - BUFFER *buf; - - if (netdata_anonymous_statistics_enabled <= 0) - return; - - // guess - most of the callback will be 1-99 + ',' + \0 - buf = buffer_create((ACLK_LWS_CALLBACK_HISTORY * 2) + 10); - - for (i = 0; i < ACLK_LWS_CALLBACK_HISTORY; i++) - if (engine_instance->lws_callback_history[i]) { - buffer_sprintf(buf, "%s%d", (i ? "," : ""), engine_instance->lws_callback_history[i]); - anything_to_send = 1; - } - - if (anything_to_send) - send_statistics("ACLK_CONN_FAIL", "FAIL", buffer_tostring(buf)); - - buffer_free(buf); -} - -static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) -{ - UNUSED(user); - struct lws_wss_packet_buffer *data; - int retval = 0; - static int lws_shutting_down = 0; - int i; - - for (i = ACLK_LWS_CALLBACK_HISTORY - 1; i > 0; i--) - engine_instance->lws_callback_history[i] = engine_instance->lws_callback_history[i - 1]; - engine_instance->lws_callback_history[0] = (int)reason; - - if (unlikely(aclk_shutting_down && !lws_shutting_down)) { - lws_shutting_down = 1; - retval = -1; - engine_instance->upstream_reconnect_request = 0; - } - - // Callback servicing is forced when we are closed from above. - if (engine_instance->upstream_reconnect_request) { - error("Closing lws connection due to libmosquitto error."); - char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; - lws_close_reason( - wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error, - strlen(upstream_connection_error)); - retval = -1; - engine_instance->upstream_reconnect_request = 0; - } - - // Don't log to info - volume is proportional to message flow on ACLK. - switch (reason) { - case LWS_CALLBACK_CLIENT_WRITEABLE: - aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); - data = engine_instance->write_buffer_head; - if (likely(data)) { - size_t bytes_left = data->data_size - data->written; - if ( bytes_left > FRAGMENT_SIZE) - bytes_left = FRAGMENT_SIZE; - int n = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY); - if (n>=0) { - data->written += n; - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.write_q_consumed += n; - LEGACY_ACLK_STATS_UNLOCK; - } - } - //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc); - if (data->written == data->data_size) - { - lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head); - lws_wss_packet_buffer_free(data); - } - if (engine_instance->write_buffer_head) - lws_callback_on_writable(engine_instance->lws_wsi); - } - aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); - return retval; - - case LWS_CALLBACK_CLIENT_RECEIVE: - aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); - if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len)) - retval = 1; - aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.read_q_added += len; - LEGACY_ACLK_STATS_UNLOCK; - } - - // to future myself -> do not call this while read lock is active as it will eventually - // want to acquire same lock later in aclk_lws_wss_client_read() function - aclk_lws_connection_data_received(); - return retval; - - case LWS_CALLBACK_WSI_CREATE: - case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: - case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: - case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: - case LWS_CALLBACK_GET_THREAD_ID: // ? - case LWS_CALLBACK_EVENT_WAIT_CANCELLED: - case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: - // Expected and safe to ignore. -#ifdef ACLK_TRP_DEBUG_VERBOSE - debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason)); -#endif - return retval; - - default: - // Pass to next switch, this case removes compiler warnings. - break; - } - // Log to info - volume is proportional to connection attempts. -#ifdef ACLK_TRP_DEBUG_VERBOSE - info("Processing callback %s", aclk_lws_callback_name(reason)); -#endif - switch (reason) { - case LWS_CALLBACK_PROTOCOL_INIT: - aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection - break; - case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: - if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi) - error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi); - engine_instance->lws_wsi = wsi; - break; - case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - error( - "Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host, - engine_instance->port, (in ? (char *)in : "not given")); - // Fall-through - case LWS_CALLBACK_CLIENT_CLOSED: - case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: - engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback - aclk_lws_connection_closed(); - return -1; // the callback response is ignored, hope the above remains true - case LWS_CALLBACK_WSI_DESTROY: - aclk_lws_wss_clear_io_buffers(); - if (!engine_instance->websocket_connection_up) - aclk_lws_wss_fail_report(); - engine_instance->lws_wsi = NULL; - engine_instance->websocket_connection_up = 0; - aclk_lws_connection_closed(); - break; - case LWS_CALLBACK_CLIENT_ESTABLISHED: - engine_instance->websocket_connection_up = 1; - aclk_lws_connection_established(engine_instance->host, engine_instance->port); - break; - - default: -#ifdef ACLK_TRP_DEBUG_VERBOSE - error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason)); -#endif - break; - } - return retval; //0-OK, other connection should be closed! -} - -int aclk_lws_wss_client_write(void *buf, size_t count) -{ - if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) { - aclk_lws_mutex_lock(&engine_instance->write_buf_mutex); - lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count)); - aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex); - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.write_q_added += count; - LEGACY_ACLK_STATS_UNLOCK; - } - - lws_callback_on_writable(engine_instance->lws_wsi); - return count; - } - return 0; -} - -int aclk_lws_wss_client_read(void *buf, size_t count) -{ - size_t data_to_be_read = count; - - aclk_lws_mutex_lock(&engine_instance->read_buf_mutex); - size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL); - if (unlikely(readable_byte_count == 0)) { - errno = EAGAIN; - data_to_be_read = -1; - goto abort; - } - - if (readable_byte_count < data_to_be_read) - data_to_be_read = readable_byte_count; - - data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read); - if (data_to_be_read == readable_byte_count) - engine_instance->data_to_read = 0; - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.read_q_consumed += data_to_be_read; - LEGACY_ACLK_STATS_UNLOCK; - } - -abort: - aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex); - return data_to_be_read; -} - -void aclk_lws_wss_service_loop() -{ - if (engine_instance) - { - /*if (engine_instance->lws_wsi) { - lws_cancel_service(engine_instance->lws_context); - lws_callback_on_writable(engine_instance->lws_wsi); - }*/ - lws_service(engine_instance->lws_context, 0); - } -} - -// in case the MQTT connection disconnect while lws transport is still operational -// we should drop connection and reconnect -// this function should be called when that happens to notify lws of that situation -void aclk_lws_wss_mqtt_layer_disconnect_notif() -{ - if (!engine_instance) - return; - if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) { - engine_instance->upstream_reconnect_request = 1; - lws_callback_on_writable( - engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written. - } -} diff --git a/aclk/legacy/aclk_lws_wss_client.h b/aclk/legacy/aclk_lws_wss_client.h deleted file mode 100644 index c68649cf..00000000 --- a/aclk/legacy/aclk_lws_wss_client.h +++ /dev/null @@ -1,92 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ACLK_LWS_WSS_CLIENT_H -#define ACLK_LWS_WSS_CLIENT_H - -#include <libwebsockets.h> - -#include "libnetdata/libnetdata.h" - -// This is as define because ideally the ACLK at high level -// can do mosquitto writes and reads only from one thread -// which is cleaner implementation IMHO -// in such case this mutexes are not necessary and life -// is simpler -#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1 - -#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES (128 * 1024) - -#define ACLK_LWS_CALLBACK_HISTORY 10 - -#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED -#define aclk_lws_mutex_init(x) netdata_mutex_init(x) -#define aclk_lws_mutex_lock(x) netdata_mutex_lock(x) -#define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x) -#else -#define aclk_lws_mutex_init(x) -#define aclk_lws_mutex_lock(x) -#define aclk_lws_mutex_unlock(x) -#endif - -struct aclk_lws_wss_engine_callbacks { - void (*connection_established_callback)(); - void (*data_rcvd_callback)(); - void (*data_writable_callback)(); - void (*connection_closed)(); -}; - -struct lws_wss_packet_buffer { - unsigned char *data; - size_t data_size, written; - struct lws_wss_packet_buffer *next; -}; - -struct aclk_lws_wss_engine_instance { - //target host/port for connection - char *host; - int port; - - //internal data - struct lws_context *lws_context; - struct lws *lws_wsi; - -#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED - netdata_mutex_t write_buf_mutex; - netdata_mutex_t read_buf_mutex; -#endif - - struct lws_wss_packet_buffer *write_buffer_head; - struct lws_ring *read_ringbuffer; - - //flags to be read by engine user - int websocket_connection_up; - - // currently this is by default disabled - - int data_to_read; - int upstream_reconnect_request; - - int lws_callback_history[ACLK_LWS_CALLBACK_HISTORY]; -}; - -void aclk_lws_wss_client_destroy(); -void aclk_lws_wss_destroy_context(); - -int aclk_lws_wss_connect(char *host, int port); - -int aclk_lws_wss_client_write(void *buf, size_t count); -int aclk_lws_wss_client_read(void *buf, size_t count); -void aclk_lws_wss_service_loop(); - -void aclk_lws_wss_mqtt_layer_disconnect_notif(); - -// Notifications inside the layer above -void aclk_lws_connection_established(); -void aclk_lws_connection_data_received(); -void aclk_lws_connection_closed(); -void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); - -void aclk_wss_set_proxy(struct lws_vhost *vhost); - -#define FRAGMENT_SIZE 4096 -#endif diff --git a/aclk/legacy/aclk_query.c b/aclk/legacy/aclk_query.c deleted file mode 100644 index 21eae11f..00000000 --- a/aclk/legacy/aclk_query.c +++ /dev/null @@ -1,843 +0,0 @@ -#include "aclk_common.h" -#include "aclk_query.h" -#include "aclk_stats.h" -#include "aclk_rx_msgs.h" -#include "agent_cloud_link.h" - -#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" - -#define ACLK_QUERY_THREAD_NAME "ACLK_Query" - -pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER; -pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER; -#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait) -#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait) - -#ifndef __GNUC__ -#pragma region ACLK_QUEUE -#endif - -static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; -#define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex) -#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex) - -struct aclk_query { - usec_t created; - struct timeval tv_in; - usec_t created_boot_time; - time_t run_after; // Delay run until after this time - ACLK_CMD cmd; // What command is this - char *topic; // Topic to respond to - char *data; // Internal data (NULL if request from the cloud) - char *msg_id; // msg_id generated by the cloud (NULL if internal) - char *query; // The actual query - u_char deleted; // Mark deleted for garbage collect - int idx; // index of query thread - struct aclk_query *next; -}; - -struct aclk_query_queue { - struct aclk_query *aclk_query_head; - struct aclk_query *aclk_query_tail; - unsigned int count; -} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 }; - - -unsigned int aclk_query_size() -{ - int r; - ACLK_QUEUE_LOCK; - r = aclk_queue.count; - ACLK_QUEUE_UNLOCK; - return r; -} - -/* - * Free a query structure when done - */ -static void aclk_query_free(struct aclk_query *this_query) -{ - if (unlikely(!this_query)) - return; - - freez(this_query->topic); - if (likely(this_query->query)) - freez(this_query->query); - if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) { - struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data; - freez(del->query_endpoint); - freez(del->data); - freez(del); - } - if (likely(this_query->msg_id)) - freez(this_query->msg_id); - freez(this_query); -} - -/* - * Get the next query to process - NULL if nothing there - * The caller needs to free memory by calling aclk_query_free() - * - * topic - * query - * The structure itself - * - */ -static struct aclk_query *aclk_queue_pop() -{ - struct aclk_query *this_query; - - ACLK_QUEUE_LOCK; - - if (likely(!aclk_queue.aclk_query_head)) { - ACLK_QUEUE_UNLOCK; - return NULL; - } - - this_query = aclk_queue.aclk_query_head; - - // Get rid of the deleted entries - while (this_query && this_query->deleted) { - aclk_queue.count--; - - aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; - - if (likely(!aclk_queue.aclk_query_head)) { - aclk_queue.aclk_query_tail = NULL; - } - - aclk_query_free(this_query); - - this_query = aclk_queue.aclk_query_head; - } - - if (likely(!this_query)) { - ACLK_QUEUE_UNLOCK; - return NULL; - } - - if (!this_query->deleted && this_query->run_after > now_realtime_sec()) { - info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec()); - ACLK_QUEUE_UNLOCK; - return NULL; - } - - aclk_queue.count--; - aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next; - - if (likely(!aclk_queue.aclk_query_head)) { - aclk_queue.aclk_query_tail = NULL; - } - - ACLK_QUEUE_UNLOCK; - return this_query; -} - -// Returns the entry after which we need to create a new entry to run at the specified time -// If NULL is returned we need to add to HEAD -// Need to have a QUERY lock before calling this - -static struct aclk_query *aclk_query_find_position(time_t time_to_run) -{ - struct aclk_query *tmp_query, *last_query; - - // Quick check if we will add to the end - if (likely(aclk_queue.aclk_query_tail)) { - if (aclk_queue.aclk_query_tail->run_after <= time_to_run) - return aclk_queue.aclk_query_tail; - } - - last_query = NULL; - tmp_query = aclk_queue.aclk_query_head; - - while (tmp_query) { - if (tmp_query->run_after > time_to_run) - return last_query; - last_query = tmp_query; - tmp_query = tmp_query->next; - } - return last_query; -} - -// Need to have a QUERY lock before calling this -static struct aclk_query * -aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query) -{ - struct aclk_query *tmp_query, *prev_query; - UNUSED(cmd); - - tmp_query = aclk_queue.aclk_query_head; - prev_query = NULL; - while (tmp_query) { - if (likely(!tmp_query->deleted)) { - if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) { - if ((!data || data == tmp_query->data) && - (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) { - if (likely(last_query)) - *last_query = prev_query; - return tmp_query; - } - } - } - prev_query = tmp_query; - tmp_query = tmp_query->next; - } - return NULL; -} - -/* - * Add a query to execute, the result will be send to the specified topic - */ - -int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd) -{ - struct aclk_query *new_query, *tmp_query; - - // Ignore all commands while we wait for the agent to initialize - if (unlikely(!aclk_connected)) - return 1; - - run_after = now_realtime_sec() + run_after; - - ACLK_QUEUE_LOCK; - struct aclk_query *last_query = NULL; - - tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query); - if (unlikely(tmp_query)) { - if (tmp_query->run_after == run_after) { - ACLK_QUEUE_UNLOCK; - LEGACY_QUERY_THREAD_WAKEUP; - return 0; - } - - if (last_query) - last_query->next = tmp_query->next; - else - aclk_queue.aclk_query_head = tmp_query->next; - - debug(D_ACLK, "Removing double entry"); - aclk_query_free(tmp_query); - aclk_queue.count--; - } - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.queries_queued++; - LEGACY_ACLK_STATS_UNLOCK; - } - - new_query = callocz(1, sizeof(struct aclk_query)); - new_query->cmd = aclk_cmd; - if (internal) { - new_query->topic = strdupz(topic); - if (likely(query)) - new_query->query = strdupz(query); - } else { - new_query->topic = topic; - new_query->query = query; - new_query->msg_id = msg_id; - } - - new_query->data = data; - new_query->next = NULL; - now_realtime_timeval(&new_query->tv_in); - new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec; - new_query->created_boot_time = now_boottime_usec(); - new_query->run_after = run_after; - - debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); - - tmp_query = aclk_query_find_position(run_after); - - if (tmp_query) { - new_query->next = tmp_query->next; - tmp_query->next = new_query; - if (tmp_query == aclk_queue.aclk_query_tail) - aclk_queue.aclk_query_tail = new_query; - aclk_queue.count++; - ACLK_QUEUE_UNLOCK; - LEGACY_QUERY_THREAD_WAKEUP; - return 0; - } - - new_query->next = aclk_queue.aclk_query_head; - aclk_queue.aclk_query_head = new_query; - aclk_queue.count++; - - ACLK_QUEUE_UNLOCK; - LEGACY_QUERY_THREAD_WAKEUP; - return 0; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -#ifndef __GNUC__ -#pragma region Helper Functions -#endif - -/* - * Take a buffer, encode it and rewrite it - * - */ - -static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines) -{ - char *tmp_buffer = mallocz(content_size * 2); - char *dst = tmp_buffer; - while (content_size > 0) { - switch (*src) { - case '\n': - if (keep_newlines) - { - *dst++ = '\\'; - *dst++ = 'n'; - } - break; - case '\t': - break; - case 0x01 ... 0x08: - case 0x0b ... 0x1F: - *dst++ = '\\'; - *dst++ = 'u'; - *dst++ = '0'; - *dst++ = '0'; - *dst++ = (*src < 0x0F) ? '0' : '1'; - *dst++ = to_hex(*src); - break; - case '\"': - *dst++ = '\\'; - *dst++ = *src; - break; - default: - *dst++ = *src; - } - src++; - content_size--; - } - *dst = '\0'; - - return tmp_buffer; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -#ifndef __GNUC__ -#pragma region ACLK_QUERY -#endif - - -static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) -{ - usec_t t = now_boottime_usec(); - legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); - - w->response.code = web_client_api_request_v1(host, w, url); - t = now_boottime_usec() - t; - - legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t); - - return t; -} - -static int aclk_execute_query(struct aclk_query *this_query) -{ - if (strncmp(this_query->query, "/api/v1/", 8) == 0) { - struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() - w->cookie1[0] = 0; // Simulate web_client_create_on_fd() - w->cookie2[0] = 0; // Simulate web_client_create_on_fd() - w->acl = 0x1f; - - char *mysep = strchr(this_query->query, '?'); - if (mysep) { - strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE); - *mysep = '\0'; - } else - strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE); - - mysep = strrchr(this_query->query, '/'); - - // TODO: handle bad response perhaps in a different way. For now it does to the payload - w->tv_in = this_query->tv_in; - now_realtime_timeval(&w->tv_ready); - aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); - size_t size = w->response.data->len; - size_t sent = size; - w->response.data->date = w->tv_ready.tv_sec; - web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready - BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0); - char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1); - - buffer_sprintf( - local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}", - w->response.code, encoded_response, encoded_header); - - buffer_sprintf(local_buffer, "\n}"); - - debug(D_ACLK, "Response:%s", encoded_header); - - aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id); - - struct timeval tv; - now_realtime_timeval(&tv); - - log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", - w->id - , gettid() - , this_query->idx - , "DATA" - , sent - , size - , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) - , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 - , dt_usec(&tv, &w->tv_ready) / 1000.0 - , dt_usec(&tv, &w->tv_in) / 1000.0 - , w->response.code - , strip_control_characters(this_query->query) - ); - - buffer_free(w->response.data); - buffer_free(w->response.header); - buffer_free(w->response.header_output); - freez(w); - buffer_free(local_buffer); - freez(encoded_response); - freez(encoded_header); - return 0; - } - return 1; -} - -static int aclk_execute_query_v2(struct aclk_query *this_query) -{ - int retval = 0; - usec_t t; - BUFFER *local_buffer = NULL; - struct aclk_cloud_req_v2 *cloud_req = (struct aclk_cloud_req_v2 *)this_query->data; - -#ifdef NETDATA_WITH_ZLIB - int z_ret; - BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - char *start, *end; -#endif - - struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client)); - w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - strcpy(w->origin, "*"); // Simulate web_client_create_on_fd() - w->cookie1[0] = 0; // Simulate web_client_create_on_fd() - w->cookie2[0] = 0; // Simulate web_client_create_on_fd() - w->acl = 0x1f; - - char *mysep = strchr(this_query->query, '?'); - if (mysep) { - url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); - *mysep = '\0'; - } else - url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1); - - mysep = strrchr(this_query->query, '/'); - - // execute the query - w->tv_in = this_query->tv_in; - now_realtime_timeval(&w->tv_ready); - t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); - size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len; - size_t sent = size; - -#ifdef NETDATA_WITH_ZLIB - // check if gzip encoding can and should be used - if ((start = strstr(cloud_req->data, WEB_HDR_ACCEPT_ENC))) { - start += strlen(WEB_HDR_ACCEPT_ENC); - end = strstr(start, "\x0D\x0A"); - start = strstr(start, "gzip"); - - if (start && start < end) { - w->response.zstream.zalloc = Z_NULL; - w->response.zstream.zfree = Z_NULL; - w->response.zstream.opaque = Z_NULL; - if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) { - w->response.zinitialized = 1; - w->response.zoutput = 1; - } else - error("Failed to initialize zlib. Proceeding without compression."); - } - } - - if (w->response.data->len && w->response.zinitialized) { - w->response.zstream.next_in = (Bytef *)w->response.data->buffer; - w->response.zstream.avail_in = w->response.data->len; - do { - w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE; - w->response.zstream.next_out = w->response.zbuffer; - z_ret = deflate(&w->response.zstream, Z_FINISH); - if(z_ret < 0) { - if(w->response.zstream.msg) - error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg); - else - error("Unknown error during zlib compression."); - retval = 1; - goto cleanup; - } - int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out; - buffer_need_bytes(z_buffer, bytes_to_cpy); - memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy); - z_buffer->len += bytes_to_cpy; - } while(z_ret != Z_STREAM_END); - // so that web_client_build_http_header - // puts correct content length into header - buffer_free(w->response.data); - w->response.data = z_buffer; - z_buffer = NULL; - } -#endif - - w->response.data->date = w->tv_ready.tv_sec; - web_client_build_http_header(w); - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - local_buffer->contenttype = CT_APPLICATION_JSON; - - aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code); - buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A"); - buffer_strcat(local_buffer, w->response.header_output->buffer); - - if (w->response.data->len) { -#ifdef NETDATA_WITH_ZLIB - if (w->response.zinitialized) { - buffer_need_bytes(local_buffer, w->response.data->len); - memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len); - local_buffer->len += w->response.data->len; - sent = sent - size + w->response.data->len; - } else { -#endif - buffer_strcat(local_buffer, w->response.data->buffer); -#ifdef NETDATA_WITH_ZLIB - } -#endif - } - - aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id); - - struct timeval tv; - now_realtime_timeval(&tv); - - log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", - w->id - , gettid() - , this_query->idx - , "DATA" - , sent - , size - , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0) - , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0 - , dt_usec(&tv, &w->tv_ready) / 1000.0 - , dt_usec(&tv, &w->tv_in) / 1000.0 - , w->response.code - , strip_control_characters(this_query->query) - ); -cleanup: -#ifdef NETDATA_WITH_ZLIB - if(w->response.zinitialized) - deflateEnd(&w->response.zstream); - buffer_free(z_buffer); -#endif - buffer_free(w->response.data); - buffer_free(w->response.header); - buffer_free(w->response.header_output); - freez(w); - buffer_free(local_buffer); - return retval; -} - -#define ACLK_HOST_PTR_COMPULSORY(x) \ - if (unlikely(!host)) { \ - errno = 0; \ - error(x " needs host pointer"); \ - break; \ - } - -/* - * This function will fetch the next pending command and process it - * - */ -static int aclk_process_query(struct aclk_query_thread *t_info) -{ - struct aclk_query *this_query; - static long int query_count = 0; - ACLK_METADATA_STATE meta_state; - RRDHOST *host; - - if (!aclk_connected) - return 0; - - this_query = aclk_queue_pop(); - if (likely(!this_query)) { - return 0; - } - - if (unlikely(this_query->deleted)) { - debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query); - aclk_query_free(this_query); - return 1; - } - query_count++; - - host = (RRDHOST*)this_query->data; - this_query->idx = t_info->idx; - - debug( - D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic, - this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS); - - switch (this_query->cmd) { - case ACLK_CMD_ONCONNECT: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); -#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { - error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); - break; - } -#else -#warning "This check became unnecessary. Remove" -#endif - - debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"", - host->hostname, - host->machine_guid); - - rrdhost_aclk_state_lock(host); - meta_state = host->aclk_state.metadata; - host->aclk_state.metadata = ACLK_METADATA_SENT; - rrdhost_aclk_state_unlock(host); - aclk_send_metadata(meta_state, host); - break; - - case ACLK_CMD_CHART: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART"); - - debug(D_ACLK, "EXECUTING a chart update command"); - aclk_send_single_chart(host, this_query->query); - break; - - case ACLK_CMD_CHARTDEL: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL"); - - debug(D_ACLK, "EXECUTING a chart delete command"); - //TODO: This send the info metadata for now - legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host); - break; - - case ACLK_CMD_ALARM: - debug(D_ACLK, "EXECUTING an alarm update command"); - aclk_send_message(this_query->topic, this_query->query, this_query->msg_id); - break; - - case ACLK_CMD_CLOUD: - debug(D_ACLK, "EXECUTING a cloud command"); - aclk_execute_query(this_query); - break; - case ACLK_CMD_CLOUD_QUERY_2: - debug(D_ACLK, "EXECUTING Cloud Query v2"); - aclk_execute_query_v2(this_query); - break; - - case ACLK_CMD_CHILD_CONNECT: - case ACLK_CMD_CHILD_DISCONNECT: - ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT"); - - debug( - D_ACLK, "Execution Child %s command", - this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect"); - aclk_send_info_child_connection(host, this_query->cmd); - break; - - default: - errno = 0; - error("Unknown ACLK Query Command"); - break; - } - debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.queries_dispatched++; - legacy_aclk_queries_per_thread[t_info->idx]++; - LEGACY_ACLK_STATS_UNLOCK; - - if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) { - getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]); - getrusage_called_this_tick[t_info->idx]++; - } - - } - - aclk_query_free(this_query); - - return 1; -} - -void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads) -{ - if (query_threads && query_threads->thread_list) { - for (int i = 0; i < query_threads->count; i++) { - netdata_thread_join(query_threads->thread_list[i].thread, NULL); - } - freez(query_threads->thread_list); - } - - struct aclk_query *this_query; - - do { - this_query = aclk_queue_pop(); - aclk_query_free(this_query); - } while (this_query); -} - -#define TASK_LEN_MAX 22 -void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads) -{ - info("Starting %d query threads.", query_threads->count); - - char thread_name[TASK_LEN_MAX]; - query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread)); - for (int i = 0; i < query_threads->count; i++) { - query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics - - if(unlikely(snprintfz(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0)) - error("snprintf encoding error"); - netdata_thread_create( - &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread, - &query_threads->thread_list[i]); - } -} - -/** - * Checks and updates popcorning state of rrdhost - * returns actual/updated popcorning state - */ - -ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host) -{ - rrdhost_aclk_state_lock(host); - ACLK_AGENT_STATE ret = host->aclk_state.state; - if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ - rrdhost_aclk_state_unlock(host); - return ret; - } - - if (!host->aclk_state.t_last_popcorn_update){ - rrdhost_aclk_state_unlock(host); - return ret; - } - - time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update; - - if (t_diff >= ACLK_STABLE_TIMEOUT) { - host->aclk_state.state = ACLK_HOST_STABLE; - host->aclk_state.t_last_popcorn_update = 0; - rrdhost_aclk_state_unlock(host); - info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff); - return ACLK_HOST_STABLE; - } - - rrdhost_aclk_state_unlock(host); - return ret; -} - -/** - * Main query processing thread - * - * On startup wait for the agent collectors to initialize - * Expect at least a time of ACLK_STABLE_TIMEOUT seconds - * of no new collectors coming in in order to mark the agent - * as stable (set agent_state = AGENT_STABLE) - */ -void *legacy_aclk_query_main_thread(void *ptr) -{ - struct aclk_query_thread *info = ptr; - - while (!netdata_exit) { - if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) { -#ifdef ACLK_DEBUG - _dump_collector_list(); -#endif - break; - } - sleep_usec(USEC_PER_SEC * 1); - } - - while (!netdata_exit) { - if(aclk_disable_runtime) { - sleep(1); - continue; - } - legacy_aclk_shared_state_LOCK; - if (unlikely(!legacy_aclk_shared_state.version_neg)) { - if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) { - legacy_aclk_shared_state_UNLOCK; - info("Waiting for ACLK Version Negotiation message from Cloud"); - sleep(1); - continue; - } - info("ACLK version negotiation failed (This is expected). No reply to \"hello\" with \"version\" from cloud in time of %ds." - " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN); - legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN; - aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg); - } - legacy_aclk_shared_state_UNLOCK; - - rrdhost_aclk_state_lock(localhost); - if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { - if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { - rrdhost_aclk_state_unlock(localhost); - errno = 0; - error("ACLK failed to queue on_connect command"); - sleep(1); - continue; - } - localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED; - } - rrdhost_aclk_state_unlock(localhost); - - legacy_aclk_shared_state_LOCK; - if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { - legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); - legacy_aclk_shared_state.next_popcorn_host = NULL; - aclk_update_next_child_to_popcorn(); - } - legacy_aclk_shared_state_UNLOCK; - - while (aclk_process_query(info)) { - // Process all commands - }; - - LEGACY_QUERY_THREAD_LOCK; - - // TODO: Need to check if there are queries awaiting already - if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait))) - sleep_usec(USEC_PER_SEC * 1); - - LEGACY_QUERY_THREAD_UNLOCK; - } - - return NULL; -} - -#ifndef __GNUC__ -#pragma endregion -#endif diff --git a/aclk/legacy/aclk_query.h b/aclk/legacy/aclk_query.h deleted file mode 100644 index 622b66e2..00000000 --- a/aclk/legacy/aclk_query.h +++ /dev/null @@ -1,41 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_ACLK_QUERY_H -#define NETDATA_ACLK_QUERY_H - -#include "libnetdata/libnetdata.h" -#include "web/server/web_client.h" - -#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable - -#define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread. - -extern pthread_cond_t legacy_query_cond_wait; -extern pthread_mutex_t legacy_query_lock_wait; -extern uint8_t *getrusage_called_this_tick; -#define LEGACY_QUERY_THREAD_WAKEUP pthread_cond_signal(&legacy_query_cond_wait) -#define LEGACY_QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&legacy_query_cond_wait) -struct aclk_query_thread { - netdata_thread_t thread; - int idx; -}; - -struct aclk_query_threads { - struct aclk_query_thread *thread_list; - int count; -}; - -struct aclk_cloud_req_v2 { - char *data; - RRDHOST *host; - char *query_endpoint; -}; - -void *legacy_aclk_query_main_thread(void *ptr); -int legacy_aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); - -void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads); -void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads); -unsigned int aclk_query_size(); - -#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/aclk_rx_msgs.c b/aclk/legacy/aclk_rx_msgs.c deleted file mode 100644 index d4778bbc..00000000 --- a/aclk/legacy/aclk_rx_msgs.c +++ /dev/null @@ -1,388 +0,0 @@ - -#include "aclk_rx_msgs.h" - -#include "aclk_common.h" -#include "aclk_stats.h" -#include "aclk_query.h" -#include "agent_cloud_link.h" - -#ifndef UUID_STR_LEN -#define UUID_STR_LEN 37 -#endif - -static inline int aclk_extract_v2_data(char *payload, char **data) -{ - char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR); - if(!ptr) - return 1; - ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR); - *data = strdupz(ptr); - return 0; -} - -#define ACLK_GET_REQ "GET " -#define ACLK_CHILD_REQ "/host/" -#define ACLK_CLOUD_REQ_V2_PREFIX "/api/v1/" -#define STRNCMP_CONSTANT_PREFIX(str, const_pref) strncmp(str, const_pref, strlen(const_pref)) -static inline int aclk_v2_payload_get_query(struct aclk_cloud_req_v2 *cloud_req, struct aclk_request *req) -{ - const char *start, *end, *ptr, *query_type; - char uuid_str[UUID_STR_LEN]; - uuid_t uuid; - - errno = 0; - - if(STRNCMP_CONSTANT_PREFIX(cloud_req->data, ACLK_GET_REQ)) { - error("Only accepting GET HTTP requests from CLOUD"); - return 1; - } - start = ptr = cloud_req->data + strlen(ACLK_GET_REQ); - - if(!STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CHILD_REQ)) { - ptr += strlen(ACLK_CHILD_REQ); - if(strlen(ptr) < UUID_STR_LEN) { - error("the child id in URL too short \"%s\"", start); - return 1; - } - - strncpyz(uuid_str, ptr, UUID_STR_LEN - 1); - - for(int i = 0; i < UUID_STR_LEN && uuid_str[i]; i++) - uuid_str[i] = tolower(uuid_str[i]); - - if(ptr[0] && uuid_parse(uuid_str, uuid)) { - error("Got Child query (/host/XXX/...) host id \"%s\" doesn't look like valid GUID", uuid_str); - return 1; - } - ptr += UUID_STR_LEN - 1; - - cloud_req->host = rrdhost_find_by_guid(uuid_str, 0); - if(!cloud_req->host) { - error("Cannot find host with GUID \"%s\"", uuid_str); - return 1; - } - } - - if(STRNCMP_CONSTANT_PREFIX(ptr, ACLK_CLOUD_REQ_V2_PREFIX)) { - error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX); - return 1; - } - ptr += strlen(ACLK_CLOUD_REQ_V2_PREFIX); - query_type = ptr; - - if(!(end = strstr(ptr, " HTTP/1.1\x0D\x0A"))) { - errno = 0; - error("Doesn't look like HTTP GET request."); - return 1; - } - - if(!(ptr = strchr(ptr, '?')) || ptr > end) - ptr = end; - cloud_req->query_endpoint = mallocz((ptr - query_type) + 1); - strncpyz(cloud_req->query_endpoint, query_type, ptr - query_type); - - req->payload = mallocz((end - start) + 1); - strncpyz(req->payload, start, end - start); - - return 0; -} - -#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\ - if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\ - debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ - rrdhost_aclk_state_unlock(localhost);\ - return 1;\ - }\ - rrdhost_aclk_state_unlock(localhost); - -/* - * Parse the incoming payload and queue a command if valid - */ -static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, char *raw_payload) -{ - UNUSED(raw_payload); - HTTP_CHECK_AGENT_INITIALIZED(); - - errno = 0; - if (unlikely(cloud_to_agent->version != 1)) { - error( - "Received \"http\" message from Cloud with version %d, but ACLK version %d is used", - cloud_to_agent->version, - legacy_aclk_shared_state.version_neg); - return 1; - } - - if (unlikely(!cloud_to_agent->payload)) { - error("payload missing"); - return 1; - } - - if (unlikely(!cloud_to_agent->callback_topic)) { - error("callback_topic missing"); - return 1; - } - - if (unlikely(!cloud_to_agent->msg_id)) { - error("msg_id missing"); - return 1; - } - - if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD))) - debug(D_ACLK, "ACLK failed to queue incoming \"http\" message"); - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.cloud_req_v1++; - legacy_aclk_metrics_per_sample.cloud_req_ok++; - LEGACY_ACLK_STATS_UNLOCK; - } - - return 0; -} - -static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload) -{ - HTTP_CHECK_AGENT_INITIALIZED(); - - struct aclk_cloud_req_v2 *cloud_req; - char *data; - int stat_idx; - - errno = 0; - if (cloud_to_agent->version < ACLK_V_COMPRESSION) { - error( - "This handler cannot reply to request with version older than %d, received %d.", - ACLK_V_COMPRESSION, - cloud_to_agent->version); - return 1; - } - - if (unlikely(aclk_extract_v2_data(raw_payload, &data))) { - error("Error extracting payload expected after the JSON dictionary."); - return 1; - } - - cloud_req = mallocz(sizeof(struct aclk_cloud_req_v2)); - cloud_req->data = data; - cloud_req->host = localhost; - - if (unlikely(aclk_v2_payload_get_query(cloud_req, cloud_to_agent))) { - error("Could not extract payload from query"); - goto cleanup; - } - - if (unlikely(!cloud_to_agent->callback_topic)) { - error("Missing callback_topic"); - goto cleanup; - } - - if (unlikely(!cloud_to_agent->msg_id)) { - error("Missing msg_id"); - goto cleanup; - } - - // we do this here due to cloud_req being taken over by query thread - // which if crazy quick can free it after legacy_aclk_queue_query - stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint); - - // legacy_aclk_queue_query takes ownership of data pointer - if (unlikely(legacy_aclk_queue_query( - cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, - ACLK_CMD_CLOUD_QUERY_2))) { - error("ACLK failed to queue incoming \"http\" v2 message"); - goto cleanup; - } - - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.cloud_req_v2++; - legacy_aclk_metrics_per_sample.cloud_req_ok++; - legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++; - LEGACY_ACLK_STATS_UNLOCK; - } - - return 0; -cleanup: - freez(cloud_req->query_endpoint); - freez(cloud_req->data); - freez(cloud_req); - return 1; -} - -// This handles `version` message from cloud used to negotiate -// protocol version we will use -static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload) -{ - UNUSED(raw_payload); - int version = -1; - errno = 0; - - if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) { - error( - "Unsupported version of \"version\" message from cloud. Expected %d, Got %d", - ACLK_VERSION_NEG_VERSION, - cloud_to_agent->version); - return 1; - } - if (unlikely(!cloud_to_agent->min_version)) { - error("Min version missing or 0"); - return 1; - } - if (unlikely(!cloud_to_agent->max_version)) { - error("Max version missing or 0"); - return 1; - } - if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) { - error( - "Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version, - cloud_to_agent->min_version); - return 1; - } - - if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) { - error( - "Agent too old for this cloud. Minimum version required by cloud %d." - " Maximum version supported by this agent %d.", - cloud_to_agent->min_version, ACLK_VERSION_MAX); - aclk_kill_link = 1; - aclk_disable_runtime = 1; - return 1; - } - if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) { - error( - "Cloud version is too old for this agent. Maximum version supported by cloud %d." - " Minimum (oldest) version supported by this agent %d.", - cloud_to_agent->max_version, ACLK_VERSION_MIN); - aclk_kill_link = 1; - return 1; - } - - version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX); - - legacy_aclk_shared_state_LOCK; - if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) { - errno = 0; - error("The \"version\" message came too late ignoring."); - goto err_cleanup; - } - if (unlikely(legacy_aclk_shared_state.version_neg)) { - errno = 0; - error("Version has already been set to %d", legacy_aclk_shared_state.version_neg); - goto err_cleanup; - } - legacy_aclk_shared_state.version_neg = version; - legacy_aclk_shared_state_UNLOCK; - - info("Choosing version %d of ACLK", version); - - aclk_set_rx_handlers(version); - - return 0; - -err_cleanup: - legacy_aclk_shared_state_UNLOCK; - return 1; -} - -typedef struct aclk_incoming_msg_type{ - char *name; - int(*fnc)(struct aclk_request *, char *); -}aclk_incoming_msg_type; - -aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = { - { .name = "http", .fnc = aclk_handle_cloud_request_v1 }, - { .name = "version", .fnc = aclk_handle_version_response }, - { .name = NULL, .fnc = NULL } -}; - -aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = { - { .name = "http", .fnc = aclk_handle_cloud_request_v2 }, - { .name = "version", .fnc = aclk_handle_version_response }, - { .name = NULL, .fnc = NULL } -}; - -struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; - -void aclk_set_rx_handlers(int version) -{ - if(version >= ACLK_V_COMPRESSION) { - legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression; - return; - } - - legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1; -} - -int legacy_aclk_handle_cloud_message(char *payload) -{ - struct aclk_request cloud_to_agent; - memset(&cloud_to_agent, 0, sizeof(struct aclk_request)); - - if (unlikely(!payload)) { - errno = 0; - error("ACLK incoming message is empty"); - goto err_cleanup_nojson; - } - - debug(D_ACLK, "ACLK incoming message (%s)", payload); - - int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse); - - if (unlikely(rc != JSON_OK)) { - errno = 0; - error("Malformed json request (%s)", payload); - goto err_cleanup; - } - - if (!cloud_to_agent.type_id) { - errno = 0; - error("Cloud message is missing compulsory key \"type\""); - goto err_cleanup; - } - - if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) { - error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring"); - goto err_cleanup; - } - - for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) { - if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) { - if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) { - // in case of success handler is supposed to clean up after itself - // or as in the case of aclk_handle_cloud_request take - // ownership of the pointers (done to avoid copying) - // see what `legacy_aclk_queue_query` parameter `internal` does - - // NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!! - // msg handlers (namely aclk_handle_version_response) - // can freely change what legacy_aclk_incoming_msg_types points to - // so either exit or restart this for loop - freez(cloud_to_agent.type_id); - return 0; - } - goto err_cleanup; - } - } - - errno = 0; - error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id); - -err_cleanup: - if (cloud_to_agent.payload) - freez(cloud_to_agent.payload); - if (cloud_to_agent.type_id) - freez(cloud_to_agent.type_id); - if (cloud_to_agent.msg_id) - freez(cloud_to_agent.msg_id); - if (cloud_to_agent.callback_topic) - freez(cloud_to_agent.callback_topic); - -err_cleanup_nojson: - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics_per_sample.cloud_req_err++; - LEGACY_ACLK_STATS_UNLOCK; - } - - return 1; -} diff --git a/aclk/legacy/aclk_rx_msgs.h b/aclk/legacy/aclk_rx_msgs.h deleted file mode 100644 index f1f99114..00000000 --- a/aclk/legacy/aclk_rx_msgs.h +++ /dev/null @@ -1,13 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_ACLK_RX_MSGS_H -#define NETDATA_ACLK_RX_MSGS_H - -#include "daemon/common.h" -#include "libnetdata/libnetdata.h" - -int legacy_aclk_handle_cloud_message(char *payload); -void aclk_set_rx_handlers(int version); - - -#endif /* NETDATA_ACLK_RX_MSGS_H */ diff --git a/aclk/legacy/aclk_stats.c b/aclk/legacy/aclk_stats.c deleted file mode 100644 index fbbb322a..00000000 --- a/aclk/legacy/aclk_stats.c +++ /dev/null @@ -1,411 +0,0 @@ -#include "aclk_stats.h" - -netdata_mutex_t legacy_aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; - -int legacy_query_thread_count; - -// data ACLK stats need per query thread -struct legacy_aclk_qt_data { - RRDDIM *dim; -} *legacy_aclk_qt_data = NULL; - -// ACLK per query thread cpu stats -struct legacy_aclk_cpu_data { - RRDDIM *user; - RRDDIM *system; - RRDSET *st; -} *legacy_aclk_cpu_data = NULL; - -uint32_t *legacy_aclk_queries_per_thread = NULL; -uint32_t *legacy_aclk_queries_per_thread_sample = NULL; -struct rusage *rusage_per_thread; -uint8_t *getrusage_called_this_tick = NULL; - -static struct legacy_aclk_metrics legacy_aclk_metrics = { - .online = 0, -}; - -struct legacy_aclk_metrics_per_sample legacy_aclk_metrics_per_sample; - -struct aclk_mat_metrics aclk_mat_metrics = { -#ifdef NETDATA_INTERNAL_CHECKS - .latency = { .name = "aclk_latency_mqtt", - .prio = 200002, - .st = NULL, - .rd_avg = NULL, - .rd_max = NULL, - .rd_total = NULL, - .unit = "ms", - .title = "ACLK Message Publish Latency" }, -#endif - - .cloud_q_db_query_time = { .name = "aclk_db_query_time", - .prio = 200006, - .st = NULL, - .rd_avg = NULL, - .rd_max = NULL, - .rd_total = NULL, - .unit = "us", - .title = "Time it took to process cloud requested DB queries" }, - - .cloud_q_recvd_to_processed = { .name = "aclk_cloud_q_recvd_to_processed", - .prio = 200007, - .st = NULL, - .rd_avg = NULL, - .rd_max = NULL, - .rd_total = NULL, - .unit = "us", - .title = "Time from receiving the Cloud Query until it was picked up " - "by query thread (just before passing to the database)." } -}; - -void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) -{ - if (aclk_stats_enabled) { - LEGACY_ACLK_STATS_LOCK; - if (metric->max < measurement) - metric->max = measurement; - - metric->total += measurement; - metric->count++; - LEGACY_ACLK_STATS_UNLOCK; - } -} - -static void aclk_stats_collect(struct legacy_aclk_metrics_per_sample *per_sample, struct legacy_aclk_metrics *permanent) -{ - static RRDSET *st_aclkstats = NULL; - static RRDDIM *rd_online_status = NULL; - - if (unlikely(!st_aclkstats)) { - st_aclkstats = rrdset_create_localhost( - "netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status", - "connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE); - - rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st_aclkstats); - - rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online); - - rrdset_done(st_aclkstats); -} - -static void aclk_stats_query_queue(struct legacy_aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st_query_thread = NULL; - static RRDDIM *rd_queued = NULL; - static RRDDIM *rd_dispatched = NULL; - - if (unlikely(!st_query_thread)) { - st_query_thread = rrdset_create_localhost( - "netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s", - "netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA); - - rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st_query_thread); - - rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued); - rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched); - - rrdset_done(st_query_thread); -} - -static void aclk_stats_write_q(struct legacy_aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st = NULL; - static RRDDIM *rd_wq_add = NULL; - static RRDDIM *rd_wq_consumed = NULL; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_write_q", NULL, "aclk", NULL, "Write Queue Mosq->Libwebsockets", "KiB/s", - "netdata", "stats", 200003, localhost->rrd_update_every, RRDSET_TYPE_AREA); - - rd_wq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_wq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); - - rrddim_set_by_pointer(st, rd_wq_add, per_sample->write_q_added); - rrddim_set_by_pointer(st, rd_wq_consumed, per_sample->write_q_consumed); - - rrdset_done(st); -} - -static void aclk_stats_read_q(struct legacy_aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st = NULL; - static RRDDIM *rd_rq_add = NULL; - static RRDDIM *rd_rq_consumed = NULL; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_read_q", NULL, "aclk", NULL, "Read Queue Libwebsockets->Mosq", "KiB/s", - "netdata", "stats", 200004, localhost->rrd_update_every, RRDSET_TYPE_AREA); - - rd_rq_add = rrddim_add(st, "added", NULL, 1, 1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_rq_consumed = rrddim_add(st, "consumed", NULL, 1, -1024 * localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); - - rrddim_set_by_pointer(st, rd_rq_add, per_sample->read_q_added); - rrddim_set_by_pointer(st, rd_rq_consumed, per_sample->read_q_consumed); - - rrdset_done(st); -} - -static void aclk_stats_cloud_req(struct legacy_aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st = NULL; - static RRDDIM *rd_rq_ok = NULL; - static RRDDIM *rd_rq_err = NULL; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s", - "netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - - rd_rq_ok = rrddim_add(st, "accepted", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_rq_err = rrddim_add(st, "rejected", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); - - rrddim_set_by_pointer(st, rd_rq_ok, per_sample->cloud_req_ok); - rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err); - - rrdset_done(st); -} - -static void aclk_stats_cloud_req_version(struct legacy_aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st = NULL; - static RRDDIM *rd_rq_v1 = NULL; - static RRDDIM *rd_rq_v2 = NULL; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_cloud_req_version", NULL, "aclk", NULL, "Requests received from cloud by their version", "req/s", - "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - - rd_rq_v1 = rrddim_add(st, "v1", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_rq_v2 = rrddim_add(st, "v2+", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); - - rrddim_set_by_pointer(st, rd_rq_v1, per_sample->cloud_req_v1); - rrddim_set_by_pointer(st, rd_rq_v2, per_sample->cloud_req_v2); - - rrdset_done(st); -} - -static char *cloud_req_type_names[ACLK_STATS_CLOUD_REQ_TYPE_CNT] = { - "other", - "info", - "data", - "alarms", - "alarm_log", - "chart", - "charts" - // if you change update: - // #define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7 -}; - -int aclk_cloud_req_type_to_idx(const char *name) -{ - for (int i = 1; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++) - if (!strcmp(cloud_req_type_names[i], name)) - return i; - return 0; -} - -static void aclk_stats_cloud_req_cmd(struct legacy_aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st; - static int initialized = 0; - static RRDDIM *rd_rq_types[ACLK_STATS_CLOUD_REQ_TYPE_CNT]; - - if (unlikely(!initialized)) { - initialized = 1; - st = rrdset_create_localhost( - "netdata", "aclk_cloud_req_cmd", NULL, "aclk", NULL, "Requests received from cloud by their type (api endpoint queried)", "req/s", - "netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - - for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++) - rd_rq_types[i] = rrddim_add(st, cloud_req_type_names[i], NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); - - for (int i = 0; i < ACLK_STATS_CLOUD_REQ_TYPE_CNT; i++) - rrddim_set_by_pointer(st, rd_rq_types[i], per_sample->cloud_req_by_type[i]); - - rrdset_done(st); -} - -#define MAX_DIM_NAME 22 -static void aclk_stats_query_threads(uint32_t *queries_per_thread) -{ - static RRDSET *st = NULL; - - char dim_name[MAX_DIM_NAME]; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", - "netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - - for (int i = 0; i < legacy_query_thread_count; i++) { - if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) - error("snprintf encoding error"); - legacy_aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } - } else - rrdset_next(st); - - for (int i = 0; i < legacy_query_thread_count; i++) { - rrddim_set_by_pointer(st, legacy_aclk_qt_data[i].dim, queries_per_thread[i]); - } - - rrdset_done(st); -} - -static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct aclk_metric_mat_data *data) -{ - if(unlikely(!metric->st)) { - metric->st = rrdset_create_localhost( - "netdata", metric->name, NULL, "aclk", NULL, metric->title, metric->unit, "netdata", "stats", metric->prio, - localhost->rrd_update_every, RRDSET_TYPE_LINE); - - metric->rd_avg = rrddim_add(metric->st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - metric->rd_max = rrddim_add(metric->st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - metric->rd_total = rrddim_add(metric->st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(metric->st); - - if(data->count) - rrddim_set_by_pointer(metric->st, metric->rd_avg, roundf((float)data->total / data->count)); - else - rrddim_set_by_pointer(metric->st, metric->rd_avg, 0); - rrddim_set_by_pointer(metric->st, metric->rd_max, data->max); - rrddim_set_by_pointer(metric->st, metric->rd_total, data->total); - - rrdset_done(metric->st); -} - -static void aclk_stats_cpu_threads(void) -{ - char id[100 + 1]; - char title[100 + 1]; - - for (int i = 0; i < legacy_query_thread_count; i++) { - if (unlikely(!legacy_aclk_cpu_data[i].st)) { - - snprintfz(id, 100, "aclk_thread%d_cpu", i); - snprintfz(title, 100, "Cpu Usage For Thread No %d", i); - - legacy_aclk_cpu_data[i].st = rrdset_create_localhost( - "netdata", id, NULL, "aclk", NULL, title, "milliseconds/s", - "netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - - legacy_aclk_cpu_data[i].user = rrddim_add(legacy_aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - legacy_aclk_cpu_data[i].system = rrddim_add(legacy_aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - - } else - rrdset_next(legacy_aclk_cpu_data[i].st); - } - - for (int i = 0; i < legacy_query_thread_count; i++) { - rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec); - rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec); - rrdset_done(legacy_aclk_cpu_data[i].st); - } -} - -void legacy_aclk_stats_thread_cleanup() -{ - freez(legacy_aclk_qt_data); - freez(legacy_aclk_queries_per_thread); - freez(legacy_aclk_queries_per_thread_sample); - freez(legacy_aclk_cpu_data); - freez(rusage_per_thread); -} - -void *legacy_aclk_stats_main_thread(void *ptr) -{ - struct aclk_stats_thread *args = ptr; - - legacy_query_thread_count = args->query_thread_count; - legacy_aclk_qt_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_qt_data)); - legacy_aclk_cpu_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_cpu_data)); - legacy_aclk_queries_per_thread = callocz(legacy_query_thread_count, sizeof(uint32_t)); - legacy_aclk_queries_per_thread_sample = callocz(legacy_query_thread_count, sizeof(uint32_t)); - rusage_per_thread = callocz(legacy_query_thread_count, sizeof(struct rusage)); - getrusage_called_this_tick = callocz(legacy_query_thread_count, sizeof(uint8_t)); - - heartbeat_t hb; - heartbeat_init(&hb); - usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; - - memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample)); - - struct legacy_aclk_metrics_per_sample per_sample; - struct legacy_aclk_metrics permanent; - - while (!netdata_exit) { - netdata_thread_testcancel(); - // ------------------------------------------------------------------------ - // Wait for the next iteration point. - - heartbeat_next(&hb, step_ut); - if (netdata_exit) break; - - LEGACY_ACLK_STATS_LOCK; - // to not hold lock longer than necessary, especially not to hold it - // during database rrd* operations - memcpy(&per_sample, &legacy_aclk_metrics_per_sample, sizeof(struct legacy_aclk_metrics_per_sample)); - memcpy(&permanent, &legacy_aclk_metrics, sizeof(struct legacy_aclk_metrics)); - memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample)); - - memcpy(legacy_aclk_queries_per_thread_sample, legacy_aclk_queries_per_thread, sizeof(uint32_t) * legacy_query_thread_count); - memset(legacy_aclk_queries_per_thread, 0, sizeof(uint32_t) * legacy_query_thread_count); - memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * legacy_query_thread_count); - LEGACY_ACLK_STATS_UNLOCK; - - aclk_stats_collect(&per_sample, &permanent); - aclk_stats_query_queue(&per_sample); - - aclk_stats_write_q(&per_sample); - aclk_stats_read_q(&per_sample); - - aclk_stats_cloud_req(&per_sample); - aclk_stats_cloud_req_version(&per_sample); - - aclk_stats_cloud_req_cmd(&per_sample); - - aclk_stats_query_threads(legacy_aclk_queries_per_thread_sample); - - aclk_stats_cpu_threads(); - -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency); -#endif - aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_db_query_time, &per_sample.cloud_q_db_query_time); - aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_recvd_to_processed, &per_sample.cloud_q_recvd_to_processed); - } - - return 0; -} - -void legacy_aclk_stats_upd_online(int online) { - if(!aclk_stats_enabled) - return; - - LEGACY_ACLK_STATS_LOCK; - legacy_aclk_metrics.online = online; - - if(!online) - legacy_aclk_metrics_per_sample.offline_during_sample = 1; - LEGACY_ACLK_STATS_UNLOCK; -} diff --git a/aclk/legacy/aclk_stats.h b/aclk/legacy/aclk_stats.h deleted file mode 100644 index 560de3b5..00000000 --- a/aclk/legacy/aclk_stats.h +++ /dev/null @@ -1,100 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_ACLK_STATS_H -#define NETDATA_ACLK_STATS_H - -#include "daemon/common.h" -#include "libnetdata/libnetdata.h" -#include "aclk_common.h" - -#define ACLK_STATS_THREAD_NAME "ACLK_Stats" - -extern netdata_mutex_t legacy_aclk_stats_mutex; - -#define LEGACY_ACLK_STATS_LOCK netdata_mutex_lock(&legacy_aclk_stats_mutex) -#define LEGACY_ACLK_STATS_UNLOCK netdata_mutex_unlock(&legacy_aclk_stats_mutex) - -struct aclk_stats_thread { - netdata_thread_t *thread; - int query_thread_count; -}; - -// preserve between samples -struct legacy_aclk_metrics { - volatile uint8_t online; -}; - -//mat = max average total -struct aclk_metric_mat_data { - volatile uint32_t total; - volatile uint32_t count; - volatile uint32_t max; -}; - -//mat = max average total -struct aclk_metric_mat { - char *name; - char *title; - RRDSET *st; - RRDDIM *rd_avg; - RRDDIM *rd_max; - RRDDIM *rd_total; - long prio; - char *unit; -}; - -extern struct aclk_mat_metrics { -#ifdef NETDATA_INTERNAL_CHECKS - struct aclk_metric_mat latency; -#endif - struct aclk_metric_mat cloud_q_db_query_time; - struct aclk_metric_mat cloud_q_recvd_to_processed; -} aclk_mat_metrics; - -void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); - -#define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7 -// if you change update cloud_req_type_names - -int aclk_cloud_req_type_to_idx(const char *name); - -// reset to 0 on every sample -extern struct legacy_aclk_metrics_per_sample { - /* in the unlikely event of ACLK disconnecting - and reconnecting under 1 sampling rate - we want to make sure we record the disconnection - despite it being then seemingly longer in graph */ - volatile uint8_t offline_during_sample; - - volatile uint32_t queries_queued; - volatile uint32_t queries_dispatched; - - volatile uint32_t write_q_added; - volatile uint32_t write_q_consumed; - - volatile uint32_t read_q_added; - volatile uint32_t read_q_consumed; - - volatile uint32_t cloud_req_ok; - volatile uint32_t cloud_req_err; - - volatile uint16_t cloud_req_v1; - volatile uint16_t cloud_req_v2; - - volatile uint16_t cloud_req_by_type[ACLK_STATS_CLOUD_REQ_TYPE_CNT]; - -#ifdef NETDATA_INTERNAL_CHECKS - struct aclk_metric_mat_data latency; -#endif - struct aclk_metric_mat_data cloud_q_db_query_time; - struct aclk_metric_mat_data cloud_q_recvd_to_processed; -} legacy_aclk_metrics_per_sample; - -extern uint32_t *legacy_aclk_queries_per_thread; -extern struct rusage *rusage_per_thread; - -void *legacy_aclk_stats_main_thread(void *ptr); -void legacy_aclk_stats_thread_cleanup(); -void legacy_aclk_stats_upd_online(int online); - -#endif /* NETDATA_ACLK_STATS_H */ diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c deleted file mode 100644 index 80ca2397..00000000 --- a/aclk/legacy/agent_cloud_link.c +++ /dev/null @@ -1,1502 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "libnetdata/libnetdata.h" -#include "agent_cloud_link.h" -#include "aclk_lws_https_client.h" -#include "aclk_query.h" -#include "aclk_common.h" -#include "aclk_stats.h" -#include "../aclk_collector_list.h" - -#ifdef ENABLE_ACLK -#include <libwebsockets.h> -#endif - -int aclk_shutting_down = 0; - -// Other global state -static int aclk_subscribed = 0; -static char *aclk_username = NULL; -static char *aclk_password = NULL; - -static char *global_base_topic = NULL; -static int aclk_connecting = 0; -int aclk_force_reconnect = 0; // Indication from lower layers - -static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; - -#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) -#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) - -void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); -void aclk_lws_wss_destroy_context(); - -char *create_uuid() -{ - uuid_t uuid; - char *uuid_str = mallocz(36 + 1); - - uuid_generate(uuid); - uuid_unparse(uuid, uuid_str); - - return uuid_str; -} - -int legacy_cloud_to_agent_parse(JSON_ENTRY *e) -{ - struct aclk_request *data = e->callback_data; - - switch (e->type) { - case JSON_OBJECT: - case JSON_ARRAY: - break; - case JSON_STRING: - if (!strcmp(e->name, "msg-id")) { - data->msg_id = strdupz(e->data.string); - break; - } - if (!strcmp(e->name, "type")) { - data->type_id = strdupz(e->data.string); - break; - } - if (!strcmp(e->name, "callback-topic")) { - data->callback_topic = strdupz(e->data.string); - break; - } - if (!strcmp(e->name, "payload")) { - if (likely(e->data.string)) { - size_t len = strlen(e->data.string); - data->payload = mallocz(len+1); - if (!url_decode_r(data->payload, e->data.string, len + 1)) - strcpy(data->payload, e->data.string); - } - break; - } - break; - case JSON_NUMBER: - if (!strcmp(e->name, "version")) { - data->version = e->data.number; - break; - } - if (!strcmp(e->name, "min-version")) { - data->min_version = e->data.number; - break; - } - if (!strcmp(e->name, "max-version")) { - data->max_version = e->data.number; - break; - } - - break; - - case JSON_BOOLEAN: - break; - - case JSON_NULL: - break; - } - return 0; -} - - -static RSA *aclk_private_key = NULL; -static int create_private_key() -{ - if (aclk_private_key != NULL) - RSA_free(aclk_private_key); - aclk_private_key = NULL; - char filename[FILENAME_MAX + 1]; - snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); - - long bytes_read; - char *private_key = read_by_filename(filename, &bytes_read); - if (!private_key) { - error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); - return 1; - } - debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); - - BIO *key_bio = BIO_new_mem_buf(private_key, -1); - if (key_bio==NULL) { - error("Claimed agent cannot establish ACLK - failed to create BIO for key"); - goto biofailed; - } - - aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); - BIO_free(key_bio); - if (aclk_private_key!=NULL) - { - freez(private_key); - return 0; - } - char err[512]; - ERR_error_string_n(ERR_get_error(), err, sizeof(err)); - error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); - -biofailed: - freez(private_key); - return 1; -} - -/* - * After a connection failure -- delay in milliseconds - * When a connection is established, the delay function - * should be called with - * - * mode 0 to reset the delay - * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms - * - */ -unsigned long int aclk_reconnect_delay(int mode) -{ - static int fail = -1; - unsigned long int delay; - - if (!mode || fail == -1) { - srandom(time(NULL)); - fail = mode - 1; - return 0; - } - - delay = (1 << fail); - - if (delay >= ACLK_MAX_BACKOFF_DELAY) { - delay = ACLK_MAX_BACKOFF_DELAY * 1000; - } else { - fail++; - delay *= 1000; - delay += (random() % (MAX(1000, delay/2))); - } - - return delay; -} - -// This will give the base topic that the agent will publish messages. -// subtopics will be sent under the base topic e.g. base_topic/subtopic -// This is called during the connection, we delete any previous topic -// in-case the user has changed the agent id and reclaimed. - -char *create_publish_base_topic() -{ - char *agent_id = is_agent_claimed(); - if (unlikely(!agent_id)) - return NULL; - - ACLK_LOCK; - - if (global_base_topic) - freez(global_base_topic); - char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; - - snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id); - tmp = strchr(tmp_topic, '\n'); - if (unlikely(tmp)) - *tmp = '\0'; - global_base_topic = strdupz(tmp_topic); - - ACLK_UNLOCK; - freez(agent_id); - return global_base_topic; -} - -/* - * Build a topic based on sub_topic and final_topic - * if the sub topic starts with / assume that is an absolute topic - * - */ - -char *get_topic(char *sub_topic, char *final_topic, int max_size) -{ - int rc; - - if (likely(sub_topic && sub_topic[0] == '/')) - return sub_topic; - - if (unlikely(!global_base_topic)) - return sub_topic; - - rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); - if (unlikely(rc >= max_size)) - debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic); - - return final_topic; -} - -/* Avoids the need to scan through all RRDHOSTS - * every time any Query Thread Wakes Up - * (every time we need to check child popcorn expiry) - * call with legacy_aclk_shared_state_LOCK held - */ -void aclk_update_next_child_to_popcorn(void) -{ - RRDHOST *host; - int any = 0; - - rrd_rdlock(); - rrdhost_foreach_read(host) { - if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) - continue; - - rrdhost_aclk_state_lock(host); - if (!ACLK_IS_HOST_POPCORNING(host)) { - rrdhost_aclk_state_unlock(host); - continue; - } - - any = 1; - - if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) { - legacy_aclk_shared_state.next_popcorn_host = host; - rrdhost_aclk_state_unlock(host); - continue; - } - - if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) - legacy_aclk_shared_state.next_popcorn_host = host; - - rrdhost_aclk_state_unlock(host); - } - if(!any) - legacy_aclk_shared_state.next_popcorn_host = NULL; - - rrd_unlock(); -} - -/* If popcorning bump timer. - * If popcorning or initializing (host not stable) return 1 - * Otherwise return 0 - */ -static int aclk_popcorn_check_bump(RRDHOST *host) -{ - time_t now = now_monotonic_sec(); - int updated = 0, ret; - legacy_aclk_shared_state_LOCK; - rrdhost_aclk_state_lock(host); - - ret = ACLK_IS_HOST_INITIALIZING(host); - if (unlikely(ACLK_IS_HOST_POPCORNING(host))) { - if(now != host->aclk_state.t_last_popcorn_update) { - updated = 1; - info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); - } - host->aclk_state.t_last_popcorn_update = now; - rrdhost_aclk_state_unlock(host); - - if (host != localhost && updated) - aclk_update_next_child_to_popcorn(); - - legacy_aclk_shared_state_UNLOCK; - return ret; - } - - rrdhost_aclk_state_unlock(host); - legacy_aclk_shared_state_UNLOCK; - return ret; -} - -inline static int aclk_host_initializing(RRDHOST *host) -{ - rrdhost_aclk_state_lock(host); - int ret = ACLK_IS_HOST_INITIALIZING(host); - rrdhost_aclk_state_unlock(host); - return ret; -} - -static void aclk_start_host_popcorning(RRDHOST *host) -{ - usec_t now = now_monotonic_sec(); - info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); - legacy_aclk_shared_state_LOCK; - rrdhost_aclk_state_lock(host); - if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { - errno = 0; - error("Localhost is allowed to do popcorning only once after startup!"); - rrdhost_aclk_state_unlock(host); - legacy_aclk_shared_state_UNLOCK; - return; - } - - host->aclk_state.state = ACLK_HOST_INITIALIZING; - host->aclk_state.metadata = ACLK_METADATA_REQUIRED; - host->aclk_state.t_last_popcorn_update = now; - rrdhost_aclk_state_unlock(host); - if (host != localhost) - aclk_update_next_child_to_popcorn(); - legacy_aclk_shared_state_UNLOCK; -} - -static void aclk_stop_host_popcorning(RRDHOST *host) -{ - legacy_aclk_shared_state_LOCK; - rrdhost_aclk_state_lock(host); - if (!ACLK_IS_HOST_POPCORNING(host)) { - rrdhost_aclk_state_unlock(host); - legacy_aclk_shared_state_UNLOCK; - return; - } - - info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid); - host->aclk_state.t_last_popcorn_update = 0; - host->aclk_state.metadata = ACLK_METADATA_REQUIRED; - rrdhost_aclk_state_unlock(host); - - if(host == legacy_aclk_shared_state.next_popcorn_host) { - legacy_aclk_shared_state.next_popcorn_host = NULL; - aclk_update_next_child_to_popcorn(); - } - legacy_aclk_shared_state_UNLOCK; -} - -/* - * Add a new collector to the list - * If it exists, update the chart count - */ -void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(tmp_collector->count != 1)) { - COLLECTOR_UNLOCK; - return; - } - - COLLECTOR_UNLOCK; - - if(aclk_popcorn_check_bump(host)) - return; - - if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) - debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); -} - -/* - * Delete a collector from the list - * If the chart count reaches zero the collector will be removed - * from the list by calling del_collector. - * - * This function will release the memory used and schedule - * a cloud update - */ -void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector; - if (unlikely(!netdata_ready)) { - return; - } - - COLLECTOR_LOCK; - - tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); - - if (unlikely(!tmp_collector || tmp_collector->count)) { - COLLECTOR_UNLOCK; - return; - } - - debug( - D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", - tmp_collector->count); - - COLLECTOR_UNLOCK; - - _free_collector(tmp_collector); - - if (aclk_popcorn_check_bump(host)) - return; - - if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) - debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); -} - -static void aclk_graceful_disconnect() -{ - size_t write_q, write_q_bytes, read_q; - time_t event_loop_timeout; - - // Send a graceful disconnect message - BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg); - buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}"); - aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); - buffer_free(b); - - event_loop_timeout = now_realtime_sec() + 5; - write_q = 1; - while (write_q && event_loop_timeout > now_realtime_sec()) { - _link_event_loop(); - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - } - - aclk_shutting_down = 1; - _link_shutdown(); - aclk_lws_wss_mqtt_layer_disconnect_notif(); - - write_q = 1; - event_loop_timeout = now_realtime_sec() + 5; - while (write_q && event_loop_timeout > now_realtime_sec()) { - _link_event_loop(); - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - } - aclk_shutting_down = 0; -} - -#ifndef __GNUC__ -#pragma region Incoming Msg Parsing -#endif - -struct dictionary_singleton { - char *key; - char *result; -}; - -int json_extract_singleton(JSON_ENTRY *e) -{ - struct dictionary_singleton *data = e->callback_data; - - switch (e->type) { - case JSON_OBJECT: - case JSON_ARRAY: - break; - case JSON_STRING: - if (!strcmp(e->name, data->key)) { - data->result = strdupz(e->data.string); - break; - } - break; - case JSON_NUMBER: - case JSON_BOOLEAN: - case JSON_NULL: - break; - } - return 0; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - - -#ifndef __GNUC__ -#pragma region Challenge Response -#endif - -// Base-64 decoder. -// Note: This is non-validating, invalid input will be decoded without an error. -// Challenges are packed into json strings so we don't skip newlines. -// Size errors (i.e. invalid input size or insufficient output space) are caught. -size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size) -{ - static char lookup[256]; - static int first_time=1; - if (first_time) - { - first_time = 0; - for(int i=0; i<256; i++) - lookup[i] = -1; - for(int i='A'; i<='Z'; i++) - lookup[i] = i-'A'; - for(int i='a'; i<='z'; i++) - lookup[i] = i-'a' + 26; - for(int i='0'; i<='9'; i++) - lookup[i] = i-'0' + 52; - lookup['+'] = 62; - lookup['/'] = 63; - } - if ((input_size & 3) != 0) - { - error("Can't decode base-64 input length %zu", input_size); - return 0; - } - size_t unpadded_size = (input_size/4) * 3; - if ( unpadded_size > output_size ) - { - error("Output buffer size %zu is too small to decode %zu into", output_size, input_size); - return 0; - } - // Don't check padding within full quantums - for (size_t i = 0 ; i < input_size-4 ; i+=4 ) - { - uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]]; - output[0] = value >> 16; - output[1] = value >> 8; - output[2] = value; - //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); - output += 3; - input += 4; - } - // Handle padding only in last quantum - if (input[2] == '=') { - uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]]; - output[0] = value >> 4; - //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]); - return unpadded_size-2; - } - else if (input[3] == '=') { - uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]]; - output[0] = value >> 10; - output[1] = value >> 2; - //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]); - return unpadded_size-1; - } - else - { - uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3]; - output[0] = value >> 16; - output[1] = value >> 8; - output[2] = value; - //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); - return unpadded_size; - } -} - -size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size) -{ - uint32_t value; - static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz" - "0123456789+/"; - if ((input_size/3+1)*4 >= output_size) - { - error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size); - return 0; - } - size_t count = 0; - while (input_size>3) - { - value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff; - output[0] = lookup[value >> 18]; - output[1] = lookup[(value >> 12) & 0x3f]; - output[2] = lookup[(value >> 6) & 0x3f]; - output[3] = lookup[value & 0x3f]; - //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); - output += 4; - input += 3; - input_size -= 3; - count += 4; - } - switch (input_size) - { - case 2: - value = (input[0] << 10) + (input[1] << 2); - output[0] = lookup[(value >> 12) & 0x3f]; - output[1] = lookup[(value >> 6) & 0x3f]; - output[2] = lookup[value & 0x3f]; - output[3] = '='; - //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); - count += 4; - break; - case 1: - value = input[0] << 4; - output[0] = lookup[(value >> 6) & 0x3f]; - output[1] = lookup[value & 0x3f]; - output[2] = '='; - output[3] = '='; - //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); - count += 4; - break; - case 0: - break; - } - return count; -} - - - -int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted) -{ - int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING); - if (result == -1) { - char err[512]; - ERR_error_string_n(ERR_get_error(), err, sizeof(err)); - error("Decryption of the challenge failed: %s", err); - } - return result; -} - -void aclk_get_challenge(char *aclk_hostname, int port) -{ - char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - debug(D_ACLK, "Performing challenge-response sequence"); - if (aclk_password != NULL) - { - freez(aclk_password); - aclk_password = NULL; - } - // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge - // TODO - target host? - char *agent_id = is_agent_claimed(); - if (agent_id == NULL) - { - error("Agent was not claimed - cannot perform challenge/response"); - goto CLEANUP; - } - char url[1024]; - sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id); - info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url); - if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL)) - { - error("Challenge failed: %s", data_buffer); - goto CLEANUP; - } - struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; - - debug(D_ACLK, "Challenge response from cloud: %s", data_buffer); - if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK) - { - freez(challenge.result); - error("Could not parse the json response with the challenge: %s", data_buffer); - goto CLEANUP; - } - if (challenge.result == NULL ) { - error("Could not retrieve challenge from auth response: %s", data_buffer); - goto CLEANUP; - } - - - size_t challenge_len = strlen(challenge.result); - unsigned char decoded[512]; - size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); - - unsigned char plaintext[4096]={}; - int decrypted_length = private_decrypt(decoded, decoded_len, plaintext); - freez(challenge.result); - char encoded[512]; - size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); - encoded[encoded_len] = 0; - debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded); - - char response_json[4096]={}; - sprintf(response_json, "{\"response\":\"%s\"}", encoded); - debug(D_ACLK, "Password phase: %s",response_json); - // TODO - host - sprintf(url, "/api/v1/auth/node/%s/password", agent_id); - if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json)) - { - error("Challenge-response failed: %s", data_buffer); - goto CLEANUP; - } - - debug(D_ACLK, "Password response from cloud: %s", data_buffer); - - struct dictionary_singleton password = { .key = "password", .result = NULL }; - if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK) - { - freez(password.result); - error("Could not parse the json response with the password: %s", data_buffer); - goto CLEANUP; - } - - if (password.result == NULL ) { - error("Could not retrieve password from auth response"); - goto CLEANUP; - } - if (aclk_password != NULL ) - freez(aclk_password); - aclk_password = password.result; - if (aclk_username != NULL) - freez(aclk_username); - aclk_username = agent_id; - agent_id = NULL; - -CLEANUP: - if (agent_id != NULL) - freez(agent_id); - freez(data_buffer); - return; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -static void aclk_try_to_connect(char *hostname, int port) -{ - int rc; - -// this is useful for developers working on ACLK -// allows connecting agent to any MQTT broker -// for debugging, development and testing purposes -#ifndef ACLK_DISABLE_CHALLENGE - if (!aclk_private_key) { - error("Cannot try to establish the agent cloud link - no private key available!"); - return; - } -#endif - - info("Attempting to establish the agent cloud link"); -#ifdef ACLK_DISABLE_CHALLENGE - error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing " - "and development purposes only. Warranty void. Won't be able " - "to connect to Netdata Cloud."); - if (aclk_password == NULL) - aclk_password = strdupz("anon"); -#else - aclk_get_challenge(hostname, port); - if (aclk_password == NULL) - return; -#endif - - aclk_connecting = 1; - create_publish_base_topic(); - - legacy_aclk_shared_state_LOCK; - legacy_aclk_shared_state.version_neg = 0; - legacy_aclk_shared_state.version_neg_wait_till = 0; - legacy_aclk_shared_state_UNLOCK; - - rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password); - if (unlikely(rc)) { - error("Failed to initialize the agent cloud link library"); - } -} - -// Sends "hello" message to negotiate ACLK version with cloud -static inline void aclk_hello_msg() -{ - BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); - - char *msg_id = create_uuid(); - - legacy_aclk_shared_state_LOCK; - legacy_aclk_shared_state.version_neg = 0; - legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; - legacy_aclk_shared_state_UNLOCK; - - //Hello message is versioned separately from the rest of the protocol - aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); - buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX); - aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id); - freez(msg_id); - buffer_free(buf); -} - -/** - * Main agent cloud link thread - * - * This thread will simply call the main event loop that handles - * pending requests - both inbound and outbound - * - * @param ptr is a pointer to the netdata_static_thread structure. - * - * @return It always returns NULL - */ -void *legacy_aclk_main(void *ptr) -{ - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - struct aclk_query_threads query_threads; - struct aclk_stats_thread *stats_thread = NULL; - time_t last_periodic_query_wakeup = 0; - - query_threads.thread_list = NULL; - - // This thread is unusual in that it cannot be cancelled by cancel_main_threads() - // as it must notify the far end that it shutdown gracefully and avoid the LWT. - netdata_thread_disable_cancelability(); - -#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK) - info("Killing ACLK thread -> cloud functionality has been disabled"); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - return NULL; -#endif - -#ifndef LWS_WITH_SOCKS5 - ACLK_PROXY_TYPE proxy_type; - aclk_get_proxy(&proxy_type); - if(proxy_type == PROXY_TYPE_SOCKS5) { - error("Disabling ACLK due to requested SOCKS5 proxy."); - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - return NULL; - } -#endif - - info("Waiting for netdata to be ready"); - while (!netdata_ready) { - sleep_usec(USEC_PER_MS * 300); - } - - info("Waiting for Cloud to be enabled"); - while (!netdata_cloud_setting) { - sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) { - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - return NULL; - } - } - - query_threads.count = MIN(processors/2, 6); - query_threads.count = MAX(query_threads.count, 2); - query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); - if(query_threads.count < 1) { - error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count); - query_threads.count = 1; - config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); - } - - //start localhost popcorning - aclk_start_host_popcorning(localhost); - - aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); - if (aclk_stats_enabled) { - stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); - stats_thread->thread = mallocz(sizeof(netdata_thread_t)); - stats_thread->query_thread_count = query_threads.count; - netdata_thread_create( - stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread, - stats_thread); - } - - char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering. - int port_num = 0; - info("Waiting for netdata to be claimed"); - while(1) { - char *agent_id = is_agent_claimed(); - while (likely(!agent_id)) { - sleep_usec(USEC_PER_SEC * 1); - if (netdata_exit) - goto exited; - agent_id = is_agent_claimed(); - } - freez(agent_id); - // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. - // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. - char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); - if (cloud_base_url == NULL) { - error("Do not move the cloud base url out of post_conf_load!!"); - goto exited; - } - if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num)) - error("Agent is claimed but the configuration is invalid, please fix"); - else if (!create_private_key() && !_mqtt_lib_init()) - break; - - for (int i=0; i<60; i++) { - if (netdata_exit) - goto exited; - - sleep_usec(USEC_PER_SEC * 1); - } - } - - usec_t reconnect_expiry = 0; // In usecs - - while (!netdata_exit) { - static int first_init = 0; - /* size_t write_q, write_q_bytes, read_q; - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ - - if (aclk_disable_runtime && !aclk_connected) { - sleep(1); - continue; - } - - if (aclk_kill_link) { // User has reloaded the claiming state - aclk_kill_link = 0; - aclk_graceful_disconnect(); - create_private_key(); - continue; - } - - if (aclk_force_reconnect) { - aclk_lws_wss_destroy_context(); - aclk_force_reconnect = 0; - } - if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) { - if (unlikely(!first_init)) { - aclk_try_to_connect(aclk_hostname, port_num); - first_init = 1; - } else { - if (aclk_connecting == 0) { - if (reconnect_expiry == 0) { - unsigned long int delay = aclk_reconnect_delay(1); - reconnect_expiry = now_realtime_usec() + delay * 1000; - info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0); - } - if (now_realtime_usec() >= reconnect_expiry) { - reconnect_expiry = 0; - aclk_try_to_connect(aclk_hostname, port_num); - } - sleep_usec(USEC_PER_MS * 100); - } - } - if (aclk_connecting) { - _link_event_loop(); - sleep_usec(USEC_PER_MS * 100); - } - continue; - } - - _link_event_loop(); - if (unlikely(!aclk_connected || aclk_force_reconnect)) - continue; - /*static int stress_counter = 0; - if (write_q_bytes==0 && stress_counter ++ >5) - { - aclk_send_stress_test(8000000); - stress_counter = 0; - }*/ - - if (unlikely(!aclk_subscribed)) { - aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1); - aclk_hello_msg(); - } - - if (unlikely(!query_threads.thread_list)) { - legacy_aclk_query_threads_start(&query_threads); - } - - time_t now = now_monotonic_sec(); - if(aclk_connected && last_periodic_query_wakeup < now) { - // to make `legacy_aclk_queue_query()` param `run_after` work - // also makes per child popcorning work - last_periodic_query_wakeup = now; - LEGACY_QUERY_THREAD_WAKEUP; - } - } // forever -exited: - // Wakeup query thread to cleanup - LEGACY_QUERY_THREAD_WAKEUP_ALL; - - freez(aclk_username); - freez(aclk_password); - freez(aclk_hostname); - if (aclk_private_key != NULL) - RSA_free(aclk_private_key); - - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - - char *agent_id = is_agent_claimed(); - if (agent_id && aclk_connected) { - freez(agent_id); - // Wakeup thread to cleanup - LEGACY_QUERY_THREAD_WAKEUP; - aclk_graceful_disconnect(); - } - - legacy_aclk_query_threads_cleanup(&query_threads); - - _reset_collector_list(); - freez(collector_list); - - if(aclk_stats_enabled) { - netdata_thread_join(*stats_thread->thread, NULL); - legacy_aclk_stats_thread_cleanup(); - freez(stats_thread->thread); - freez(stats_thread); - } - - /* - * this must be last -> if all static threads signal - * THREAD_EXITED rrdengine will dealloc the RRDSETs - * and RRDDIMs that are used by still running stat thread. - * see netdata_cleanup_and_exit() for reference - */ - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; - return NULL; -} - -/* - * Send a message to the cloud, using a base topic and sib_topic - * The final topic will be in the form <base_topic>/<sub_topic> - * If base_topic is missing then the global_base_topic will be used (if available) - * - */ -int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id) -{ - int rc; - int mid; - char topic[ACLK_MAX_TOPIC + 1]; - char *final_topic; - - UNUSED(msg_id); - - if (!aclk_connected) - return 0; - - if (unlikely(!message)) - return 0; - - final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); - - if (unlikely(!final_topic)) { - errno = 0; - error("Unable to build outgoing topic; truncated?"); - return 1; - } - - ACLK_LOCK; - rc = _link_send_message(final_topic, message, len, &mid); - // TODO: link the msg_id with the mid so we can trace it - ACLK_UNLOCK; - - if (unlikely(rc)) { - errno = 0; - error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); - } - - return rc; -} - -int aclk_send_message(char *sub_topic, char *message, char *msg_id) -{ - return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id); -} - -/* - * Subscribe to a topic in the cloud - * The final subscription will be in the form - * /agent/claim_id/<sub_topic> - */ -int aclk_subscribe(char *sub_topic, int qos) -{ - int rc; - char topic[ACLK_MAX_TOPIC + 1]; - char *final_topic; - - final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); - if (unlikely(!final_topic)) { - errno = 0; - error("Unable to build outgoing topic; truncated?"); - return 1; - } - - if (!aclk_connected) { - error("Cannot subscribe to %s - not connected!", topic); - return 1; - } - - ACLK_LOCK; - rc = _link_subscribe(final_topic, qos); - ACLK_UNLOCK; - - // TODO: Add better handling -- error will flood the logfile here - if (unlikely(rc)) { - errno = 0; - error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc)); - } - - return rc; -} - -// This is called from a callback when the link goes up -void aclk_connect() -{ - info("Connection detected (%u queued queries)", aclk_query_size()); - - legacy_aclk_stats_upd_online(1); - - aclk_connected = 1; - aclk_reconnect_delay(0); - - LEGACY_QUERY_THREAD_WAKEUP; - return; -} - -// This is called from a callback when the link goes down -void aclk_disconnect() -{ - if (likely(aclk_connected)) - info("Disconnect detected (%u queued queries)", aclk_query_size()); - - legacy_aclk_stats_upd_online(0); - - aclk_subscribed = 0; - rrdhost_aclk_state_lock(localhost); - localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED; - rrdhost_aclk_state_unlock(localhost); - aclk_connected = 0; - aclk_connecting = 0; - aclk_force_reconnect = 1; -} - -inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version) -{ - uuid_t uuid; - char uuid_str[36 + 1]; - - if (unlikely(!msg_id)) { - uuid_generate(uuid); - uuid_unparse(uuid, uuid_str); - msg_id = uuid_str; - } - - if (ts_secs == 0) { - ts_us = now_realtime_usec(); - ts_secs = ts_us / USEC_PER_SEC; - ts_us = ts_us % USEC_PER_SEC; - } - - buffer_sprintf( - dest, - "{\t\"type\": \"%s\",\n" - "\t\"msg-id\": \"%s\",\n" - "\t\"timestamp\": %ld,\n" - "\t\"timestamp-offset-usec\": %llu,\n" - "\t\"connect\": %ld,\n" - "\t\"connect-offset-usec\": %llu,\n" - "\t\"version\": %d", - type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version); - - debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs); -} - - -/* - * This will send alarm information which includes - * configured alarms - * alarm_log - * active alarms - */ -void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); - -void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) -{ - BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - debug(D_ACLK, "Metadata alarms start"); - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - - if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - else - aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - - - buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); - health_alarms2json(localhost, local_buffer, 1); - debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len); - // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : "); - // health_alarm_log2json(localhost, local_buffer, 0); - // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len); - buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : "); - health_active_log_alarms_2json(localhost, local_buffer); - //debug(D_ACLK, "Metadata message %s", local_buffer->buffer); - - - - buffer_sprintf(local_buffer, "\n}\n}"); - aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id); - - freez(msg_id); - buffer_free(local_buffer); -} - -/* - * This will send the agent metadata - * /api/v1/info - * charts - */ -int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) -{ - BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - - debug(D_ACLK, "Metadata /info start"); - - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - // on_connect messages are sent on a health reload, if the on_connect message is real then we - // use the session time as the fake timestamp to indicate that it starts the session. If it is - // a fake on_connect message then use the real timestamp to indicate it is within the existing - // session. - if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - else - aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - - buffer_sprintf(local_buffer, "{\n\t \"info\" : "); - web_client_api_request_v1_info_fill_buffer(host, local_buffer); - debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); - - buffer_sprintf(local_buffer, ", \n\t \"charts\" : "); - charts2json(host, local_buffer, 1, 0); - buffer_sprintf(local_buffer, "\n}\n}"); - debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len); - - aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); - - freez(msg_id); - buffer_free(local_buffer); - return 0; -} - -int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) -{ - BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - local_buffer->contenttype = CT_APPLICATION_JSON; - - if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) - fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg); - - debug(D_ACLK, "Sending Child Disconnect"); - - char *msg_id = create_uuid(); - - aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - - buffer_strcat(local_buffer, ",\"payload\":"); - - buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid); - rrdhost_aclk_state_lock(host); - if(host->aclk_state.claimed_id) - buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id); - else - buffer_strcat(local_buffer, "null}}"); - - rrdhost_aclk_state_unlock(host); - - aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); - - freez(msg_id); - buffer_free(local_buffer); - return 0; -} - -void legacy_aclk_host_state_update(RRDHOST *host, int connect) -{ -#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) - return; -#else -#warning "This check became unnecessary. Remove" -#endif - - if (unlikely(aclk_host_initializing(localhost))) - return; - - if (connect) { - debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); - aclk_start_host_popcorning(host); - legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); - } else { - debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); - aclk_stop_host_popcorning(host); - legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); - } -} - -void aclk_send_stress_test(size_t size) -{ - char *buffer = mallocz(size); - if (buffer != NULL) - { - for(size_t i=0; i<size; i++) - buffer[i] = 'x'; - buffer[size-1] = 0; - time_t time_created = now_realtime_sec(); - sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created); - buffer[strlen(buffer)] = '"'; - buffer[size-2] = '}'; - buffer[size-3] = '"'; - aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL); - error("Sending stress of size %zu at time %ld", size, time_created); - } - free(buffer); -} - -// Send info metadata message to the cloud if the link is established -// or on request -int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host) -{ - legacy_aclk_send_info_metadata(state, host); - - if(host == localhost) - legacy_aclk_send_alarm_metadata(state); - - return 0; -} - -// Triggered by a health reload, sends the alarm metadata -void legacy_aclk_alarm_reload() -{ - if (unlikely(aclk_host_initializing(localhost))) - return; - - if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { - if (likely(aclk_connected)) { - errno = 0; - error("ACLK failed to queue on_connect command on alarm reload"); - } - } -} -//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf) - -int aclk_send_single_chart(RRDHOST *host, char *chart) -{ - RRDSET *st = rrdset_find(host, chart); - if (!st) - st = rrdset_find_byname(host, chart); - if (!st) { - info("FAILED to find chart %s", chart); - return 1; - } - - BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - char *msg_id = create_uuid(); - buffer_flush(local_buffer); - local_buffer->contenttype = CT_APPLICATION_JSON; - - aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - - rrdset2json(st, local_buffer, NULL, NULL, 1); - buffer_sprintf(local_buffer, "\t\n}"); - - aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id); - - freez(msg_id); - buffer_free(local_buffer); - return 0; -} - -int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create) -{ -#ifndef ENABLE_ACLK - UNUSED(host); - UNUSED(chart_name); - return 0; -#else - if (unlikely(!netdata_ready)) - return 0; - - if (!netdata_cloud_setting) - return 0; - - if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) - return 0; - - if (aclk_host_initializing(localhost)) - return 0; - - if (unlikely(aclk_disable_single_updates)) - return 0; - - if (aclk_popcorn_check_bump(host)) - return 0; - - if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) { - if (likely(aclk_connected)) { - errno = 0; - error("ACLK failed to queue chart_update command"); - } - } - - return 0; -#endif -} - -int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) -{ - BUFFER *local_buffer = NULL; - - if (unlikely(!netdata_ready)) - return 0; - - if (host != localhost) - return 0; - - if(unlikely(aclk_host_initializing(localhost))) - return 0; - - /* - * Check if individual updates have been disabled - * This will be the case when we do health reload - * and all the alarms will be dropped and recreated. - * At the end of the health reload the complete alarm metadata - * info will be sent - */ - if (unlikely(aclk_disable_single_updates)) - return 0; - - local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); - char *msg_id = create_uuid(); - - buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); - buffer_strcat(local_buffer, ",\n\t\"payload\": "); - - netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - health_alarm_entry2json_nolock(local_buffer, ae, host); - netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - - buffer_sprintf(local_buffer, "\n}"); - - if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { - if (likely(aclk_connected)) { - errno = 0; - error("ACLK failed to queue alarm_command on alarm_update"); - } - } - - freez(msg_id); - buffer_free(local_buffer); - - return 0; -} - -char *legacy_aclk_state(void) -{ - BUFFER *wb = buffer_create(1024); - char *ret; - - buffer_strcat(wb, - "ACLK Available: Yes\n" - "ACLK Implementation: Legacy\n" - "Claimed: " - ); - - char *agent_id = is_agent_claimed(); - if (agent_id == NULL) - buffer_strcat(wb, "No\n"); - else { - buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); - freez(agent_id); - } - - buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No"); - - ret = strdupz(buffer_tostring(wb)); - buffer_free(wb); - return ret; -} - -char *legacy_aclk_state_json(void) -{ - BUFFER *wb = buffer_create(1024); - char *agent_id = is_agent_claimed(); - - buffer_sprintf(wb, - "{\"aclk-available\":true," - "\"aclk-implementation\":\"Legacy\"," - "\"agent-claimed\":%s," - "\"claimed-id\":", - agent_id ? "true" : "false" - ); - - if (agent_id) { - buffer_sprintf(wb, "\"%s\"", agent_id); - freez(agent_id); - } else - buffer_strcat(wb, "null"); - - buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false"); - - return strdupz(buffer_tostring(wb)); -} diff --git a/aclk/legacy/agent_cloud_link.h b/aclk/legacy/agent_cloud_link.h deleted file mode 100644 index 8954a337..00000000 --- a/aclk/legacy/agent_cloud_link.h +++ /dev/null @@ -1,85 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_AGENT_CLOUD_LINK_H -#define NETDATA_AGENT_CLOUD_LINK_H - -#include "daemon/common.h" -#include "mqtt.h" -#include "aclk_common.h" - -#define ACLK_CHART_TOPIC "outbound/meta" -#define ACLK_ALARMS_TOPIC "outbound/alarms" -#define ACLK_METADATA_TOPIC "outbound/meta" -#define ACLK_COMMAND_TOPIC "inbound/cmd" -#define ACLK_TOPIC_STRUCTURE "/agent/%s" - -#define ACLK_MAX_BACKOFF_DELAY 1024 // maximum backoff delay in seconds - -#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg) -#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds -#define ACLK_PING_INTERVAL 60 -#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop - -#define ACLK_MAX_TOPIC 255 - -#define ACLK_RECONNECT_DELAY 1 // reconnect delay -- with backoff strategy for now -#define ACLK_DEFAULT_PORT 9002 -#define ACLK_DEFAULT_HOST "localhost" - -#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A" - -struct aclk_request { - char *type_id; - char *msg_id; - char *callback_topic; - char *payload; - int version; - int min_version; - int max_version; -}; - -typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION; - -void *legacy_aclk_main(void *ptr); - -extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); -extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id); - -extern char *is_agent_claimed(void); -extern void aclk_lws_wss_mqtt_layer_disconnect_notif(); -char *create_uuid(); - -// callbacks for agent cloud link -int aclk_subscribe(char *topic, int qos); -int legacy_cloud_to_agent_parse(JSON_ENTRY *e); -void aclk_disconnect(); -void aclk_connect(); - -#ifdef ENABLE_ACLK -int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host); -int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); -void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); - -int aclk_wait_for_initialization(); -char *create_publish_base_topic(); - -int aclk_send_single_chart(RRDHOST *host, char *chart); -int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create); -int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); -void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version); -int legacy_aclk_handle_cloud_message(char *payload); -void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); -void legacy_aclk_alarm_reload(void); -unsigned long int aclk_reconnect_delay(int mode); -extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); - -void legacy_aclk_host_state_update(RRDHOST *host, int connect); -int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd); -void aclk_update_next_child_to_popcorn(void); - -char *legacy_aclk_state(void); -char *legacy_aclk_state_json(void); -#endif - -#endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/aclk/legacy/mqtt.c b/aclk/legacy/mqtt.c deleted file mode 100644 index 0e4bb2ec..00000000 --- a/aclk/legacy/mqtt.c +++ /dev/null @@ -1,370 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include <libnetdata/json/json.h> -#include "daemon/common.h" -#include "mqtt.h" -#include "aclk_lws_wss_client.h" -#include "aclk_stats.h" -#include "aclk_rx_msgs.h" - -#include "agent_cloud_link.h" - -#define ACLK_QOS 1 - -extern usec_t aclk_session_us; -extern time_t aclk_session_sec; - -inline const char *_link_strerror(int rc) -{ - return mosquitto_strerror(rc); -} - -#ifdef NETDATA_INTERNAL_CHECKS -static struct timeval sendTimes[1024]; -#endif - -static struct mosquitto *mosq = NULL; - - -void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) -{ - UNUSED(mosq); - UNUSED(obj); - - legacy_aclk_handle_cloud_message(msg->payload); -} - -void publish_callback(struct mosquitto *mosq, void *obj, int rc) -{ - UNUSED(mosq); - UNUSED(obj); - UNUSED(rc); -#ifdef NETDATA_INTERNAL_CHECKS - struct timeval now, *orig; - now_realtime_timeval(&now); - orig = &sendTimes[ rc & 0x3ff ]; - int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec); - diff /= 1000; - - info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff); - - legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.latency, diff); -#endif - return; -} - -void connect_callback(struct mosquitto *mosq, void *obj, int rc) -{ - UNUSED(mosq); - UNUSED(obj); - UNUSED(rc); - - info("Connection to cloud established"); - aclk_connect(); - - return; -} - -void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) -{ - UNUSED(mosq); - UNUSED(obj); - UNUSED(rc); - - if (netdata_exit) - info("Connection to cloud terminated due to agent shutdown"); - else { - errno = 0; - error("Connection to cloud failed"); - } - aclk_disconnect(); - - aclk_lws_wss_mqtt_layer_disconnect_notif(); - - return; -} - -void _show_mqtt_info() -{ - int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; - libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); - - info( - "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, - libmosq_revision); -} - -size_t _mqtt_external_write_hook(void *buf, size_t count) -{ - return aclk_lws_wss_client_write(buf, count); -} - -size_t _mqtt_external_read_hook(void *buf, size_t count) -{ - return aclk_lws_wss_client_read(buf, count); -} - -int _mqtt_lib_init() -{ - int rc; - //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; - /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version. - char *ca_crt; - char *server_crt; - char *server_key; - - // show library info so can have it in the logfile - //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); - ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*"); - server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*"); - server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*"); - - if (ca_crt[0] == '*') { - freez(ca_crt); - ca_crt = NULL; - } - - if (server_crt[0] == '*') { - freez(server_crt); - server_crt = NULL; - } - - if (server_key[0] == '*') { - freez(server_key); - server_key = NULL; - } - */ - - // info( - // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, - // libmosq_revision); - - rc = mosquitto_lib_init(); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) { - error("Failed to initialize MQTT (libmosquitto library)"); - return 1; - } - return 0; -} - -static int _mqtt_create_connection(char *username, char *password) -{ - if (mosq != NULL) - mosquitto_destroy(mosq); - mosq = mosquitto_new(username, true, NULL); - if (unlikely(!mosq)) { - mosquitto_lib_cleanup(); - error("MQTT new structure -- %s", mosquitto_strerror(errno)); - return MOSQ_ERR_UNKNOWN; - } - - // Record the session start time to allow a nominal LWT timestamp - usec_t now = now_realtime_usec(); - aclk_session_sec = now / USEC_PER_SEC; - aclk_session_us = now % USEC_PER_SEC; - - _link_set_lwt("outbound/meta", 2); - - mosquitto_connect_callback_set(mosq, connect_callback); - mosquitto_disconnect_callback_set(mosq, disconnect_callback); - mosquitto_publish_callback_set(mosq, publish_callback); - - info("Using challenge-response: %s / %s", username, password); - mosquitto_username_pw_set(mosq, username, password); - - int rc = mosquitto_threaded_set(mosq, 1); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error("Failed to tune the thread model for libmosquitto (%s)", mosquitto_strerror(rc)); - -#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000 - rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc)); - - rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1); - info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); -#endif - - return rc; -} - -static int _link_mqtt_connect(char *aclk_hostname, int aclk_port) -{ - int rc; - - rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL); - - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - error( - "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc, - mosquitto_strerror(rc)); - else - info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port); - - return rc; -} - -static inline void _link_mosquitto_write() -{ - int rc; - - if (unlikely(!mosq)) { - return; - } - - rc = mosquitto_loop_misc(mosq); - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc)); - - if (likely(mosquitto_want_write(mosq))) { - rc = mosquitto_loop_write(mosq, 1); - if (rc != MOSQ_ERR_SUCCESS) - debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc)); - } -} - -void aclk_lws_connection_established(char *hostname, int port) -{ - _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected. - _link_mosquitto_write(); -} - -void aclk_lws_connection_data_received() -{ - int rc = mosquitto_loop_read(mosq, 1); - if (rc != MOSQ_ERR_SUCCESS) - debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc)); -} - -void aclk_lws_connection_closed() -{ - aclk_disconnect(); - -} - - -int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password) -{ - if(aclk_lws_wss_connect(aclk_hostname, aclk_port)) - return MOSQ_ERR_UNKNOWN; - aclk_lws_wss_service_loop(); - - int rc = _mqtt_create_connection(username, password); - if (rc!= MOSQ_ERR_SUCCESS) - return rc; - - mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook); - return rc; -} - -inline int _link_event_loop() -{ - - // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1). - _link_mosquitto_write(); - aclk_lws_wss_service_loop(); - - // this is because if use LWS we don't want - // mqtt to reconnect by itself - return MOSQ_ERR_SUCCESS; -} - -void _link_shutdown() -{ - int rc; - - if (likely(!mosq)) - return; - - rc = mosquitto_disconnect(mosq); - switch (rc) { - case MOSQ_ERR_SUCCESS: - info("MQTT disconnected from broker"); - break; - default: - info("MQTT invalid structure"); - break; - }; -} - - -int _link_set_lwt(char *sub_topic, int qos) -{ - int rc; - char topic[ACLK_MAX_TOPIC + 1]; - char *final_topic; - - final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); - if (unlikely(!final_topic)) { - errno = 0; - error("Unable to build outgoing topic; truncated?"); - return 1; - } - - usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1; - BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC, ACLK_VERSION_NEG_VERSION); - buffer_strcat(b, ", \"payload\": \"unexpected\" }"); - rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0); - buffer_free(b); - - return rc; -} - -int _link_subscribe(char *topic, int qos) -{ - int rc; - - if (unlikely(!mosq)) - return 1; - - mosquitto_message_callback_set(mosq, mqtt_message_callback); - - rc = mosquitto_subscribe(mosq, NULL, topic, qos); - if (unlikely(rc)) { - errno = 0; - error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc)); - return 1; - } - - _link_mosquitto_write(); - return 0; -} - -/* - * Send a message to the cloud to specific topic - * - */ - -int _link_send_message(char *topic, const void *message, size_t len, int *mid) -{ - int rc; - size_t write_q, write_q_bytes, read_q; - - rc = mosquitto_pub_topic_check(topic); - - if (unlikely(rc != MOSQ_ERR_SUCCESS)) - return rc; - - lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); - rc = mosquitto_publish(mosq, mid, topic, len, message, ACLK_QOS, 0); - -#ifdef NETDATA_INTERNAL_CHECKS - char msg_head[64]; - memset(msg_head, 0, sizeof(msg_head)); - strncpy(msg_head, (char*)message, 60); - for (size_t i = 0; i < sizeof(msg_head); i++) - if(msg_head[i] == '\n') msg_head[i] = ' '; - info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", (int)len, - *mid, write_q, write_q_bytes, read_q, msg_head); - now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]); -#endif - - // TODO: Add better handling -- error will flood the logfile here - if (unlikely(rc != MOSQ_ERR_SUCCESS)) { - errno = 0; - error("MQTT message failed : %s", mosquitto_strerror(rc)); - } - _link_mosquitto_write(); - return rc; -} diff --git a/aclk/legacy/mqtt.h b/aclk/legacy/mqtt.h deleted file mode 100644 index 98d599f5..00000000 --- a/aclk/legacy/mqtt.h +++ /dev/null @@ -1,25 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_MQTT_H -#define NETDATA_MQTT_H - -#ifdef ENABLE_ACLK -#include "externaldeps/mosquitto/mosquitto.h" -#endif - -void _show_mqtt_info(); -int _link_event_loop(); -void _link_shutdown(); -int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password); -//int _link_lib_init(); -int _mqtt_lib_init(); -int _link_subscribe(char *topic, int qos); -int _link_send_message(char *topic, const void *message, size_t len, int *mid); -const char *_link_strerror(int rc); -int _link_set_lwt(char *topic, int qos); - - -int legacy_aclk_handle_cloud_message(char *); -extern char *get_topic(char *sub_topic, char *final_topic, int max_size); - -#endif //NETDATA_MQTT_H diff --git a/aclk/legacy/tests/fake-charts.d.plugin b/aclk/legacy/tests/fake-charts.d.plugin deleted file mode 100644 index a13c6bab..00000000 --- a/aclk/legacy/tests/fake-charts.d.plugin +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env bash - -sleep 45 # Wait until popcorning finishes - -echo "CHART aclk_test.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1" -sleep 5 -echo "DIMENSION aclk1 '' percentage-of-absolute 1 1" -sleep 5 -echo "BEGIN aclk_test.newcol 1000000" -echo "SET aclk1 = 3" -echo "END" -sleep 5 -echo "DIMENSION aclk2 '' percentage-of-absolute 1 1" -sleep 5 -echo "BEGIN aclk_test.newcol 1000000" -echo "SET aclk1 = 3" -echo "SET aclk2 = 3" -echo "END" -sleep 5 -echo "CHART aclk_test2.newcol '' 'Generate new collector/chart event' 'units' aclk_test aclk_test lines 900001 1" -echo "DIMENSION aclk1 '' percentage-of-absolute 1 1" - -sleep 5 -exit 0 # Signal that we are done diff --git a/aclk/legacy/tests/install-fake-charts.d.sh.in b/aclk/legacy/tests/install-fake-charts.d.sh.in deleted file mode 100644 index ac002a2b..00000000 --- a/aclk/legacy/tests/install-fake-charts.d.sh.in +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env bash - -TARGET="@pluginsdir_POST@" -BASE="$(cd "$(dirname "$0")" && pwd)" - -cp "$BASE/fake-charts.d.plugin" "$TARGET/charts.d.plugin" diff --git a/aclk/legacy/tests/launch-paho.sh b/aclk/legacy/tests/launch-paho.sh deleted file mode 100755 index 1c2cb5f2..00000000 --- a/aclk/legacy/tests/launch-paho.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env bash - -docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client -docker run -it paho-client diff --git a/aclk/legacy/tests/paho-inspection.py b/aclk/legacy/tests/paho-inspection.py deleted file mode 100644 index 14e99b65..00000000 --- a/aclk/legacy/tests/paho-inspection.py +++ /dev/null @@ -1,59 +0,0 @@ -import ssl -import paho.mqtt.client as mqtt -import json -import time -import sys - -def on_connect(mqttc, obj, flags, rc): - if rc==0: - print("Successful connection", flush=True) - else : - print(f"Connection error rc={rc}", flush=True) - mqttc.subscribe("/agent/#",0) - -def on_disconnect(mqttc, obj, flags, rc): - print("disconnected rc: "+str(rc), flush=True) - -def on_message(mqttc, obj, msg): - print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True) - try: - print(f"Trying decode of {msg.payload[:60]}",flush=True) - api_msg = json.loads(msg.payload) - except Exception as e: - print(e,flush=True) - return - ts = api_msg["timestamp"] - mtype = api_msg["type"] - print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True) - now = time.time() - print(f"Current {now} -> Delay {now-ts}", flush=True) - if mtype=="disconnect": - print(f"Message dump: {api_msg}", flush=True) - -def on_publish(mqttc, obj, mid): - print("mid: "+str(mid), flush=True) - -def on_subscribe(mqttc, obj, mid, granted_qos): - print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True) - -def on_log(mqttc, obj, level, string): - print(string) - -print(f"Starting paho-inspection on {sys.argv[1]}", flush=True) -mqttc = mqtt.Client(transport='websockets',client_id="paho") -#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) -#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) -mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None) -mqttc.tls_insecure_set(True) -mqttc.on_message = on_message -mqttc.on_connect = on_connect -mqttc.on_disconnect = on_disconnect -mqttc.on_publish = on_publish -mqttc.on_subscribe = on_subscribe -mqttc.username_pw_set("paho","paho") -mqttc.connect(sys.argv[1], 8443, 60) - -#mqttc.publish("/agent/mine","Test1") -#mqttc.subscribe("$SYS/#", 0) -print("Connected successfully, monitoring /agent/#", flush=True) -mqttc.loop_forever() diff --git a/aclk/legacy/tests/paho.Dockerfile b/aclk/legacy/tests/paho.Dockerfile deleted file mode 100644 index d67cc4cb..00000000 --- a/aclk/legacy/tests/paho.Dockerfile +++ /dev/null @@ -1,14 +0,0 @@ -FROM archlinux/base:latest - -RUN pacman -Syyu --noconfirm -RUN pacman --noconfirm --needed -S python-pip - -RUN pip install paho-mqtt - -RUN mkdir -p /opt/paho -COPY paho-inspection.py /opt/paho/ - -WORKDIR /opt/paho -ARG HOST_HOSTNAME -RUN echo $HOST_HOSTNAME >host -CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"] |