diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 11:08:07 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 11:08:07 +0000 |
commit | c69cb8cc094cc916adbc516b09e944cd3d137c01 (patch) | |
tree | f2878ec41fb6d0e3613906c6722fc02b934eeb80 /aclk/legacy/agent_cloud_link.c | |
parent | Initial commit. (diff) | |
download | netdata-upstream/1.29.3.tar.xz netdata-upstream/1.29.3.zip |
Adding upstream version 1.29.3.upstream/1.29.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | aclk/legacy/agent_cloud_link.c | 1683 |
1 files changed, 1683 insertions, 0 deletions
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c new file mode 100644 index 0000000..e51a013 --- /dev/null +++ b/aclk/legacy/agent_cloud_link.c @@ -0,0 +1,1683 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "libnetdata/libnetdata.h" +#include "agent_cloud_link.h" +#include "aclk_lws_https_client.h" +#include "aclk_query.h" +#include "aclk_common.h" +#include "aclk_stats.h" + +#ifdef ENABLE_ACLK +#include <libwebsockets.h> +#endif + +int aclk_shutting_down = 0; + +// Other global state +static int aclk_subscribed = 0; +static int aclk_disable_single_updates = 0; +static char *aclk_username = NULL; +static char *aclk_password = NULL; + +static char *global_base_topic = NULL; +static int aclk_connecting = 0; +int aclk_force_reconnect = 0; // Indication from lower layers +usec_t aclk_session_us = 0; // Used by the mqtt layer +time_t aclk_session_sec = 0; // Used by the mqtt layer + +static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; + +#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) +#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) + +#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) +#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) + +void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); +void aclk_lws_wss_destroy_context(); +/* + * Maintain a list of collectors and chart count + * If all the charts of a collector are deleted + * then a new metadata dataset must be send to the cloud + * + */ +struct _collector { + time_t created; + uint32_t count; //chart count + uint32_t hostname_hash; + uint32_t plugin_hash; + uint32_t module_hash; + char *hostname; + char *plugin_name; + char *module_name; + struct _collector *next; +}; + +struct _collector *collector_list = NULL; + +char *create_uuid() +{ + uuid_t uuid; + char *uuid_str = mallocz(36 + 1); + + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + + return uuid_str; +} + +int cloud_to_agent_parse(JSON_ENTRY *e) +{ + struct aclk_request *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, "msg-id")) { + data->msg_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "type")) { + data->type_id = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "callback-topic")) { + data->callback_topic = strdupz(e->data.string); + break; + } + if (!strcmp(e->name, "payload")) { + if (likely(e->data.string)) { + size_t len = strlen(e->data.string); + data->payload = mallocz(len+1); + if (!url_decode_r(data->payload, e->data.string, len + 1)) + strcpy(data->payload, e->data.string); + } + break; + } + break; + case JSON_NUMBER: + if (!strcmp(e->name, "version")) { + data->version = e->data.number; + break; + } + if (!strcmp(e->name, "min-version")) { + data->min_version = e->data.number; + break; + } + if (!strcmp(e->name, "max-version")) { + data->max_version = e->data.number; + break; + } + + break; + + case JSON_BOOLEAN: + break; + + case JSON_NULL: + break; + } + return 0; +} + + +static RSA *aclk_private_key = NULL; +static int create_private_key() +{ + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + aclk_private_key = NULL; + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir); + + long bytes_read; + char *private_key = read_by_filename(filename, &bytes_read); + if (!private_key) { + error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename); + return 1; + } + debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read); + + BIO *key_bio = BIO_new_mem_buf(private_key, -1); + if (key_bio==NULL) { + error("Claimed agent cannot establish ACLK - failed to create BIO for key"); + goto biofailed; + } + + aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL); + BIO_free(key_bio); + if (aclk_private_key!=NULL) + { + freez(private_key); + return 0; + } + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Claimed agent cannot establish ACLK - cannot create private key: %s", err); + +biofailed: + freez(private_key); + return 1; +} + +/* + * After a connection failure -- delay in milliseconds + * When a connection is established, the delay function + * should be called with + * + * mode 0 to reset the delay + * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms + * + */ +unsigned long int aclk_reconnect_delay(int mode) +{ + static int fail = -1; + unsigned long int delay; + + if (!mode || fail == -1) { + srandom(time(NULL)); + fail = mode - 1; + return 0; + } + + delay = (1 << fail); + + if (delay >= ACLK_MAX_BACKOFF_DELAY) { + delay = ACLK_MAX_BACKOFF_DELAY * 1000; + } else { + fail++; + delay = (delay * 1000) + (random() % 1000); + } + + return delay; +} + +// This will give the base topic that the agent will publish messages. +// subtopics will be sent under the base topic e.g. base_topic/subtopic +// This is called during the connection, we delete any previous topic +// in-case the user has changed the agent id and reclaimed. + +char *create_publish_base_topic() +{ + char *agent_id = is_agent_claimed(); + if (unlikely(!agent_id)) + return NULL; + + ACLK_LOCK; + + if (global_base_topic) + freez(global_base_topic); + char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp; + + snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id); + tmp = strchr(tmp_topic, '\n'); + if (unlikely(tmp)) + *tmp = '\0'; + global_base_topic = strdupz(tmp_topic); + + ACLK_UNLOCK; + freez(agent_id); + return global_base_topic; +} + +/* + * Build a topic based on sub_topic and final_topic + * if the sub topic starts with / assume that is an absolute topic + * + */ + +char *get_topic(char *sub_topic, char *final_topic, int max_size) +{ + int rc; + + if (likely(sub_topic && sub_topic[0] == '/')) + return sub_topic; + + if (unlikely(!global_base_topic)) + return sub_topic; + + rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic); + if (unlikely(rc >= max_size)) + debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic); + + return final_topic; +} + +#ifndef __GNUC__ +#pragma region ACLK Internal Collector Tracking +#endif + +/* + * Free a collector structure + */ + +static void _free_collector(struct _collector *collector) +{ + if (likely(collector->plugin_name)) + freez(collector->plugin_name); + + if (likely(collector->module_name)) + freez(collector->module_name); + + if (likely(collector->hostname)) + freez(collector->hostname); + + freez(collector); +} + +/* + * This will report the collector list + * + */ +#ifdef ACLK_DEBUG +static void _dump_collector_list() +{ + struct _collector *tmp_collector; + + COLLECTOR_LOCK; + + info("DUMPING ALL COLLECTORS"); + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + info("DUMPING ALL COLLECTORS -- nothing found"); + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + + while (tmp_collector) { + info( + "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, + tmp_collector->plugin_name ? tmp_collector->plugin_name : "", + tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); + + tmp_collector = tmp_collector->next; + } + info("DUMPING ALL COLLECTORS DONE"); + COLLECTOR_UNLOCK; +} +#endif + +/* + * This will cleanup the collector list + * + */ +static void _reset_collector_list() +{ + struct _collector *tmp_collector, *next_collector; + + COLLECTOR_LOCK; + + if (unlikely(!collector_list || !collector_list->next)) { + COLLECTOR_UNLOCK; + return; + } + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + collector_list->count = 0; + collector_list->next = NULL; + + // We broke the link; we can unlock + COLLECTOR_UNLOCK; + + while (tmp_collector) { + next_collector = tmp_collector->next; + _free_collector(tmp_collector); + tmp_collector = next_collector; + } +} + +/* + * Find a collector (if it exists) + * Must lock before calling this + * If last_collector is not null, it will return the previous collector in the linked + * list (used in collector delete) + */ +static struct _collector *_find_collector( + const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) +{ + struct _collector *tmp_collector, *prev_collector; + uint32_t plugin_hash; + uint32_t module_hash; + uint32_t hostname_hash; + + if (unlikely(!collector_list)) { + collector_list = callocz(1, sizeof(struct _collector)); + return NULL; + } + + if (unlikely(!collector_list->next)) + return NULL; + + plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + module_hash = module_name ? simple_hash(module_name) : 1; + hostname_hash = simple_hash(hostname); + + // Note that the first entry is "dummy" + tmp_collector = collector_list->next; + prev_collector = collector_list; + while (tmp_collector) { + if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && + hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && + (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && + (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { + if (unlikely(last_collector)) + *last_collector = prev_collector; + + return tmp_collector; + } + + prev_collector = tmp_collector; + tmp_collector = tmp_collector->next; + } + + return tmp_collector; +} + +/* + * Called to delete a collector + * It will reduce the count (chart_count) and will remove it + * from the linked list if the count reaches zero + * The structure will be returned to the caller to free + * the resources + * + */ +static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector, *prev_collector = NULL; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); + + if (likely(tmp_collector)) { + --tmp_collector->count; + if (unlikely(!tmp_collector->count)) + prev_collector->next = tmp_collector->next; + } + return tmp_collector; +} + +/* + * Add a new collector (plugin / module) to the list + * If it already exists just update the chart count + * + * Lock before calling + */ +static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + + tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); + + if (unlikely(!tmp_collector)) { + tmp_collector = callocz(1, sizeof(struct _collector)); + tmp_collector->hostname_hash = simple_hash(hostname); + tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; + tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; + + tmp_collector->hostname = strdupz(hostname); + tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; + tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; + + tmp_collector->next = collector_list->next; + collector_list->next = tmp_collector; + } + tmp_collector->count++; + debug( + D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", + module_name ? module_name : "*", tmp_collector->count); + return tmp_collector; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +/* Avoids the need to scan trough all RRDHOSTS + * every time any Query Thread Wakes Up + * (every time we need to check child popcorn expiry) + * call with ACLK_SHARED_STATE_LOCK held + */ +void aclk_update_next_child_to_popcorn(void) +{ + RRDHOST *host; + int any = 0; + + rrd_rdlock(); + rrdhost_foreach_read(host) { + if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) + continue; + + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); + continue; + } + + any = 1; + + if (unlikely(!aclk_shared_state.next_popcorn_host)) { + aclk_shared_state.next_popcorn_host = host; + rrdhost_aclk_state_unlock(host); + continue; + } + + if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) + aclk_shared_state.next_popcorn_host = host; + + rrdhost_aclk_state_unlock(host); + } + if(!any) + aclk_shared_state.next_popcorn_host = NULL; + + rrd_unlock(); +} + +/* If popcorning bump timer. + * If popcorning or initializing (host not stable) return 1 + * Otherwise return 0 + */ +static int aclk_popcorn_check_bump(RRDHOST *host) +{ + time_t now = now_monotonic_sec(); + int updated = 0, ret; + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + + ret = ACLK_IS_HOST_INITIALIZING(host); + if (unlikely(ACLK_IS_HOST_POPCORNING(host))) { + if(now != host->aclk_state.t_last_popcorn_update) { + updated = 1; + info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + } + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + + if (host != localhost && updated) + aclk_update_next_child_to_popcorn(); + + ACLK_SHARED_STATE_UNLOCK; + return ret; + } + + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return ret; +} + +inline static int aclk_host_initializing(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + int ret = ACLK_IS_HOST_INITIALIZING(host); + rrdhost_aclk_state_unlock(host); + return ret; +} + +static void aclk_start_host_popcorning(RRDHOST *host) +{ + usec_t now = now_monotonic_sec(); + info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { + errno = 0; + error("Localhost is allowed to do popcorning only once after startup!"); + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return; + } + + host->aclk_state.state = ACLK_HOST_INITIALIZING; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + if (host != localhost) + aclk_update_next_child_to_popcorn(); + ACLK_SHARED_STATE_UNLOCK; +} + +static void aclk_stop_host_popcorning(RRDHOST *host) +{ + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return; + } + + info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid); + host->aclk_state.t_last_popcorn_update = 0; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(host); + + if(host == aclk_shared_state.next_popcorn_host) { + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); + } + ACLK_SHARED_STATE_UNLOCK; +} + +/* + * Add a new collector to the list + * If it exists, update the chart count + */ +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(tmp_collector->count != 1)) { + COLLECTOR_UNLOCK; + return; + } + + COLLECTOR_UNLOCK; + + if(aclk_popcorn_check_bump(host)) + return; + + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); +} + +/* + * Delete a collector from the list + * If the chart count reaches zero the collector will be removed + * from the list by calling del_collector. + * + * This function will release the memory used and schedule + * a cloud update + */ +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +{ + struct _collector *tmp_collector; + if (unlikely(!netdata_ready)) { + return; + } + + COLLECTOR_LOCK; + + tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name); + + if (unlikely(!tmp_collector || tmp_collector->count)) { + COLLECTOR_UNLOCK; + return; + } + + debug( + D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*", + tmp_collector->count); + + COLLECTOR_UNLOCK; + + _free_collector(tmp_collector); + + if (aclk_popcorn_check_bump(host)) + return; + + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); +} + +static void aclk_graceful_disconnect() +{ + size_t write_q, write_q_bytes, read_q; + time_t event_loop_timeout; + + // Send a graceful disconnect message + BUFFER *b = buffer_create(512); + aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}"); + aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); + buffer_free(b); + + event_loop_timeout = now_realtime_sec() + 5; + write_q = 1; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + + aclk_shutting_down = 1; + _link_shutdown(); + aclk_lws_wss_mqtt_layer_disconect_notif(); + + write_q = 1; + event_loop_timeout = now_realtime_sec() + 5; + while (write_q && event_loop_timeout > now_realtime_sec()) { + _link_event_loop(); + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q); + } + aclk_shutting_down = 0; +} + +#ifndef __GNUC__ +#pragma region Incoming Msg Parsing +#endif + +struct dictionary_singleton { + char *key; + char *result; +}; + +int json_extract_singleton(JSON_ENTRY *e) +{ + struct dictionary_singleton *data = e->callback_data; + + switch (e->type) { + case JSON_OBJECT: + case JSON_ARRAY: + break; + case JSON_STRING: + if (!strcmp(e->name, data->key)) { + data->result = strdupz(e->data.string); + break; + } + break; + case JSON_NUMBER: + case JSON_BOOLEAN: + case JSON_NULL: + break; + } + return 0; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + + +#ifndef __GNUC__ +#pragma region Challenge Response +#endif + +// Base-64 decoder. +// Note: This is non-validating, invalid input will be decoded without an error. +// Challenges are packed into json strings so we don't skip newlines. +// Size errors (i.e. invalid input size or insufficient output space) are caught. +size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size) +{ + static char lookup[256]; + static int first_time=1; + if (first_time) + { + first_time = 0; + for(int i=0; i<256; i++) + lookup[i] = -1; + for(int i='A'; i<='Z'; i++) + lookup[i] = i-'A'; + for(int i='a'; i<='z'; i++) + lookup[i] = i-'a' + 26; + for(int i='0'; i<='9'; i++) + lookup[i] = i-'0' + 52; + lookup['+'] = 62; + lookup['/'] = 63; + } + if ((input_size & 3) != 0) + { + error("Can't decode base-64 input length %zu", input_size); + return 0; + } + size_t unpadded_size = (input_size/4) * 3; + if ( unpadded_size > output_size ) + { + error("Output buffer size %zu is too small to decode %zu into", output_size, input_size); + return 0; + } + // Don't check padding within full quantums + for (size_t i = 0 ; i < input_size-4 ; i+=4 ) + { + uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + output += 3; + input += 4; + } + // Handle padding only in last quantum + if (input[2] == '=') { + uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]]; + output[0] = value >> 4; + //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]); + return unpadded_size-2; + } + else if (input[3] == '=') { + uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]]; + output[0] = value >> 10; + output[1] = value >> 2; + //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]); + return unpadded_size-1; + } + else + { + uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3]; + output[0] = value >> 16; + output[1] = value >> 8; + output[2] = value; + //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]); + return unpadded_size; + } +} + +size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size) +{ + uint32_t value; + static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/"; + if ((input_size/3+1)*4 >= output_size) + { + error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size); + return 0; + } + size_t count = 0; + while (input_size>3) + { + value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff; + output[0] = lookup[value >> 18]; + output[1] = lookup[(value >> 12) & 0x3f]; + output[2] = lookup[(value >> 6) & 0x3f]; + output[3] = lookup[value & 0x3f]; + //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + output += 4; + input += 3; + input_size -= 3; + count += 4; + } + switch (input_size) + { + case 2: + value = (input[0] << 10) + (input[1] << 2); + output[0] = lookup[(value >> 12) & 0x3f]; + output[1] = lookup[(value >> 6) & 0x3f]; + output[2] = lookup[value & 0x3f]; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 1: + value = input[0] << 4; + output[0] = lookup[(value >> 6) & 0x3f]; + output[1] = lookup[value & 0x3f]; + output[2] = '='; + output[3] = '='; + //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); + count += 4; + break; + case 0: + break; + } + return count; +} + + + +int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted) +{ + int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING); + if (result == -1) { + char err[512]; + ERR_error_string_n(ERR_get_error(), err, sizeof(err)); + error("Decryption of the challenge failed: %s", err); + } + return result; +} + +void aclk_get_challenge(char *aclk_hostname, int port) +{ + char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + debug(D_ACLK, "Performing challenge-response sequence"); + if (aclk_password != NULL) + { + freez(aclk_password); + aclk_password = NULL; + } + // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge + // TODO - target host? + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + { + error("Agent was not claimed - cannot perform challenge/response"); + goto CLEANUP; + } + char url[1024]; + sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id); + info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url); + if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL)) + { + error("Challenge failed: %s", data_buffer); + goto CLEANUP; + } + struct dictionary_singleton challenge = { .key = "challenge", .result = NULL }; + + debug(D_ACLK, "Challenge response from cloud: %s", data_buffer); + if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK) + { + freez(challenge.result); + error("Could not parse the json response with the challenge: %s", data_buffer); + goto CLEANUP; + } + if (challenge.result == NULL ) { + error("Could not retrieve challenge from auth response: %s", data_buffer); + goto CLEANUP; + } + + + size_t challenge_len = strlen(challenge.result); + unsigned char decoded[512]; + size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded)); + + unsigned char plaintext[4096]={}; + int decrypted_length = private_decrypt(decoded, decoded_len, plaintext); + freez(challenge.result); + char encoded[512]; + size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded)); + encoded[encoded_len] = 0; + debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded); + + char response_json[4096]={}; + sprintf(response_json, "{\"response\":\"%s\"}", encoded); + debug(D_ACLK, "Password phase: %s",response_json); + // TODO - host + sprintf(url, "/api/v1/auth/node/%s/password", agent_id); + if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json)) + { + error("Challenge-response failed: %s", data_buffer); + goto CLEANUP; + } + + debug(D_ACLK, "Password response from cloud: %s", data_buffer); + + struct dictionary_singleton password = { .key = "password", .result = NULL }; + if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK) + { + freez(password.result); + error("Could not parse the json response with the password: %s", data_buffer); + goto CLEANUP; + } + + if (password.result == NULL ) { + error("Could not retrieve password from auth response"); + goto CLEANUP; + } + if (aclk_password != NULL ) + freez(aclk_password); + aclk_password = password.result; + if (aclk_username != NULL) + freez(aclk_username); + aclk_username = agent_id; + agent_id = NULL; + +CLEANUP: + if (agent_id != NULL) + freez(agent_id); + freez(data_buffer); + return; +} + +#ifndef __GNUC__ +#pragma endregion +#endif + +static void aclk_try_to_connect(char *hostname, int port) +{ + int rc; + +// this is usefull for developers working on ACLK +// allows connecting agent to any MQTT broker +// for debugging, development and testing purposes +#ifndef ACLK_DISABLE_CHALLENGE + if (!aclk_private_key) { + error("Cannot try to establish the agent cloud link - no private key available!"); + return; + } +#endif + + info("Attempting to establish the agent cloud link"); +#ifdef ACLK_DISABLE_CHALLENGE + error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing " + "and development purposes only. Warranty void. Won't be able " + "to connect to Netdata Cloud."); + if (aclk_password == NULL) + aclk_password = strdupz("anon"); +#else + aclk_get_challenge(hostname, port); + if (aclk_password == NULL) + return; +#endif + + aclk_connecting = 1; + create_publish_base_topic(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = 0; + ACLK_SHARED_STATE_UNLOCK; + + rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password); + if (unlikely(rc)) { + error("Failed to initialize the agent cloud link library"); + } +} + +// Sends "hello" message to negotiate ACLK version with cloud +static inline void aclk_hello_msg() +{ + BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE); + + char *msg_id = create_uuid(); + + ACLK_SHARED_STATE_LOCK; + aclk_shared_state.version_neg = 0; + aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + ACLK_SHARED_STATE_UNLOCK; + + //Hello message is versioned separatelly from the rest of the protocol + aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); + buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX); + aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id); + freez(msg_id); + buffer_free(buf); +} + +/** + * Main agent cloud link thread + * + * This thread will simply call the main event loop that handles + * pending requests - both inbound and outbound + * + * @param ptr is a pointer to the netdata_static_thread structure. + * + * @return It always returns NULL + */ +void *aclk_main(void *ptr) +{ + struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + struct aclk_query_threads query_threads; + struct aclk_stats_thread *stats_thread = NULL; + time_t last_periodic_query_wakeup = 0; + + query_threads.thread_list = NULL; + + // This thread is unusual in that it cannot be cancelled by cancel_main_threads() + // as it must notify the far end that it shutdown gracefully and avoid the LWT. + netdata_thread_disable_cancelability(); + +#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK) + info("Killing ACLK thread -> cloud functionality has been disabled"); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +#endif + +#ifndef LWS_WITH_SOCKS5 + ACLK_PROXY_TYPE proxy_type; + aclk_get_proxy(&proxy_type); + if(proxy_type == PROXY_TYPE_SOCKS5) { + error("Disabling ACLK due to requested SOCKS5 proxy."); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } +#endif + + info("Waiting for netdata to be ready"); + while (!netdata_ready) { + sleep_usec(USEC_PER_MS * 300); + } + + info("Waiting for Cloud to be enabled"); + while (!netdata_cloud_setting) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) { + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; + } + } + + query_threads.count = MIN(processors/2, 6); + query_threads.count = MAX(query_threads.count, 2); + query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); + if(query_threads.count < 1) { + error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count); + query_threads.count = 1; + config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); + } + + //start localhost popcorning + aclk_start_host_popcorning(localhost); + + aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); + if (aclk_stats_enabled) { + stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); + stats_thread->thread = mallocz(sizeof(netdata_thread_t)); + stats_thread->query_thread_count = query_threads.count; + netdata_thread_create( + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread); + } + + char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering. + int port_num = 0; + info("Waiting for netdata to be claimed"); + while(1) { + char *agent_id = is_agent_claimed(); + while (likely(!agent_id)) { + sleep_usec(USEC_PER_SEC * 1); + if (netdata_exit) + goto exited; + agent_id = is_agent_claimed(); + } + freez(agent_id); + // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load. + // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code. + char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL); + if (cloud_base_url == NULL) { + error("Do not move the cloud base url out of post_conf_load!!"); + goto exited; + } + if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num)) + error("Agent is claimed but the configuration is invalid, please fix"); + else if (!create_private_key() && !_mqtt_lib_init()) + break; + + for (int i=0; i<60; i++) { + if (netdata_exit) + goto exited; + + sleep_usec(USEC_PER_SEC * 1); + } + } + + usec_t reconnect_expiry = 0; // In usecs + + while (!netdata_exit) { + static int first_init = 0; + /* size_t write_q, write_q_bytes, read_q; + lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/ + + if (aclk_disable_runtime && !aclk_connected) { + sleep(1); + continue; + } + + if (aclk_kill_link) { // User has reloaded the claiming state + aclk_kill_link = 0; + aclk_graceful_disconnect(); + create_private_key(); + continue; + } + + if (aclk_force_reconnect) { + aclk_lws_wss_destroy_context(); + aclk_force_reconnect = 0; + } + if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) { + if (unlikely(!first_init)) { + aclk_try_to_connect(aclk_hostname, port_num); + first_init = 1; + } else { + if (aclk_connecting == 0) { + if (reconnect_expiry == 0) { + unsigned long int delay = aclk_reconnect_delay(1); + reconnect_expiry = now_realtime_usec() + delay * 1000; + info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0); + } + if (now_realtime_usec() >= reconnect_expiry) { + reconnect_expiry = 0; + aclk_try_to_connect(aclk_hostname, port_num); + } + sleep_usec(USEC_PER_MS * 100); + } + } + if (aclk_connecting) { + _link_event_loop(); + sleep_usec(USEC_PER_MS * 100); + } + continue; + } + + _link_event_loop(); + if (unlikely(!aclk_connected || aclk_force_reconnect)) + continue; + /*static int stress_counter = 0; + if (write_q_bytes==0 && stress_counter ++ >5) + { + aclk_send_stress_test(8000000); + stress_counter = 0; + }*/ + + if (unlikely(!aclk_subscribed)) { + aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1); + aclk_hello_msg(); + } + + if (unlikely(!query_threads.thread_list)) { + aclk_query_threads_start(&query_threads); + } + + time_t now = now_monotonic_sec(); + if(aclk_connected && last_periodic_query_wakeup < now) { + // to make `aclk_queue_query()` param `run_after` work + // also makes per child popcorning work + last_periodic_query_wakeup = now; + QUERY_THREAD_WAKEUP; + } + } // forever +exited: + // Wakeup query thread to cleanup + QUERY_THREAD_WAKEUP_ALL; + + freez(aclk_username); + freez(aclk_password); + freez(aclk_hostname); + if (aclk_private_key != NULL) + RSA_free(aclk_private_key); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + char *agent_id = is_agent_claimed(); + if (agent_id && aclk_connected) { + freez(agent_id); + // Wakeup thread to cleanup + QUERY_THREAD_WAKEUP; + aclk_graceful_disconnect(); + } + + aclk_query_threads_cleanup(&query_threads); + + _reset_collector_list(); + freez(collector_list); + + if(aclk_stats_enabled) { + netdata_thread_join(*stats_thread->thread, NULL); + aclk_stats_thread_cleanup(); + freez(stats_thread->thread); + freez(stats_thread); + } + + /* + * this must be last -> if all static threads signal + * THREAD_EXITED rrdengine will dealloc the RRDSETs + * and RRDDIMs that are used by still runing stat thread. + * see netdata_cleanup_and_exit() for reference + */ + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; + return NULL; +} + +/* + * Send a message to the cloud, using a base topic and sib_topic + * The final topic will be in the form <base_topic>/<sub_topic> + * If base_topic is missing then the global_base_topic will be used (if available) + * + */ +int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id) +{ + int rc; + int mid; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + UNUSED(msg_id); + + if (!aclk_connected) + return 0; + + if (unlikely(!message)) + return 0; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + ACLK_LOCK; + rc = _link_send_message(final_topic, message, len, &mid); + // TODO: link the msg_id with the mid so we can trace it + ACLK_UNLOCK; + + if (unlikely(rc)) { + errno = 0; + error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc)); + } + + return rc; +} + +int aclk_send_message(char *sub_topic, char *message, char *msg_id) +{ + return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id); +} + +/* + * Subscribe to a topic in the cloud + * The final subscription will be in the form + * /agent/claim_id/<sub_topic> + */ +int aclk_subscribe(char *sub_topic, int qos) +{ + int rc; + char topic[ACLK_MAX_TOPIC + 1]; + char *final_topic; + + final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC); + if (unlikely(!final_topic)) { + errno = 0; + error("Unable to build outgoing topic; truncated?"); + return 1; + } + + if (!aclk_connected) { + error("Cannot subscribe to %s - not connected!", topic); + return 1; + } + + ACLK_LOCK; + rc = _link_subscribe(final_topic, qos); + ACLK_UNLOCK; + + // TODO: Add better handling -- error will flood the logfile here + if (unlikely(rc)) { + errno = 0; + error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc)); + } + + return rc; +} + +// This is called from a callback when the link goes up +void aclk_connect() +{ + info("Connection detected (%u queued queries)", aclk_query_size()); + + aclk_stats_upd_online(1); + + aclk_connected = 1; + aclk_reconnect_delay(0); + + QUERY_THREAD_WAKEUP; + return; +} + +// This is called from a callback when the link goes down +void aclk_disconnect() +{ + if (likely(aclk_connected)) + info("Disconnect detected (%u queued queries)", aclk_query_size()); + + aclk_stats_upd_online(0); + + aclk_subscribed = 0; + rrdhost_aclk_state_lock(localhost); + localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(localhost); + aclk_connected = 0; + aclk_connecting = 0; + aclk_force_reconnect = 1; +} + +inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version) +{ + uuid_t uuid; + char uuid_str[36 + 1]; + + if (unlikely(!msg_id)) { + uuid_generate(uuid); + uuid_unparse(uuid, uuid_str); + msg_id = uuid_str; + } + + if (ts_secs == 0) { + ts_us = now_realtime_usec(); + ts_secs = ts_us / USEC_PER_SEC; + ts_us = ts_us % USEC_PER_SEC; + } + + buffer_sprintf( + dest, + "{\t\"type\": \"%s\",\n" + "\t\"msg-id\": \"%s\",\n" + "\t\"timestamp\": %ld,\n" + "\t\"timestamp-offset-usec\": %llu,\n" + "\t\"connect\": %ld,\n" + "\t\"connect-offset-usec\": %llu,\n" + "\t\"version\": %d", + type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version); + + debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs); +} + + +/* + * This will send alarm information which includes + * configured alarms + * alarm_log + * active alarms + */ +void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); + +void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + debug(D_ACLK, "Metadata alarms start"); + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + + if (metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + else + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + + buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : "); + health_alarms2json(localhost, local_buffer, 1); + debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len); + // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : "); + // health_alarm_log2json(localhost, local_buffer, 0); + // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len); + buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : "); + health_active_log_alarms_2json(localhost, local_buffer); + //debug(D_ACLK, "Metadata message %s", local_buffer->buffer); + + + + buffer_sprintf(local_buffer, "\n}\n}"); + aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); +} + +/* + * This will send the agent metadata + * /api/v1/info + * charts + */ +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + + debug(D_ACLK, "Metadata /info start"); + + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + // on_connect messages are sent on a health reload, if the on_connect message is real then we + // use the session time as the fake timestamp to indicate that it starts the session. If it is + // a fake on_connect message then use the real timestamp to indicate it is within the existing + // session. + if (metadata_submitted == ACLK_METADATA_SENT) + aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg); + else + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + buffer_sprintf(local_buffer, "{\n\t \"info\" : "); + web_client_api_request_v1_info_fill_buffer(host, local_buffer); + debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); + + buffer_sprintf(local_buffer, ", \n\t \"charts\" : "); + charts2json(host, local_buffer, 1, 0); + buffer_sprintf(local_buffer, "\n}\n}"); + debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len); + + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg); + + debug(D_ACLK, "Sending Child Disconnect"); + + char *msg_id = create_uuid(); + + aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg); + + buffer_strcat(local_buffer, ",\"payload\":"); + + buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid); + rrdhost_aclk_state_lock(host); + if(host->aclk_state.claimed_id) + buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id); + else + buffer_strcat(local_buffer, "null}}"); + + rrdhost_aclk_state_unlock(host); + + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +{ +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + return; +#else +#warning "This check became unnecessary. Remove" +#endif + + if (unlikely(aclk_host_initializing(localhost))) + return; + + switch (cmd) { + case ACLK_CMD_CHILD_CONNECT: + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + break; + case ACLK_CMD_CHILD_DISCONNECT: + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); + break; + default: + error("Unknown command for aclk_host_state_update %d.", (int)cmd); + } +} + +void aclk_send_stress_test(size_t size) +{ + char *buffer = mallocz(size); + if (buffer != NULL) + { + for(size_t i=0; i<size; i++) + buffer[i] = 'x'; + buffer[size-1] = 0; + time_t time_created = now_realtime_sec(); + sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created); + buffer[strlen(buffer)] = '"'; + buffer[size-2] = '}'; + buffer[size-3] = '"'; + aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL); + error("Sending stress of size %zu at time %ld", size, time_created); + } + free(buffer); +} + +// Send info metadata message to the cloud if the link is established +// or on request +int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host) +{ + aclk_send_info_metadata(state, host); + + if(host == localhost) + aclk_send_alarm_metadata(state); + + return 0; +} + +void aclk_single_update_disable() +{ + aclk_disable_single_updates = 1; +} + +void aclk_single_update_enable() +{ + aclk_disable_single_updates = 0; +} + +// Trigged by a health reload, sends the alarm metadata +void aclk_alarm_reload() +{ + if (unlikely(aclk_host_initializing(localhost))) + return; + + if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue on_connect command on alarm reload"); + } + } +} +//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf) + +int aclk_send_single_chart(RRDHOST *host, char *chart) +{ + RRDSET *st = rrdset_find(host, chart); + if (!st) + st = rrdset_find_byname(host, chart); + if (!st) { + info("FAILED to find chart %s", chart); + return 1; + } + + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *msg_id = create_uuid(); + buffer_flush(local_buffer); + local_buffer->contenttype = CT_APPLICATION_JSON; + + aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + rrdset2json(st, local_buffer, NULL, NULL, 1); + buffer_sprintf(local_buffer, "\t\n}"); + + aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) +{ +#ifndef ENABLE_ACLK + UNUSED(host); + UNUSED(chart_name); + return 0; +#else + if (unlikely(!netdata_ready)) + return 0; + + if (!netdata_cloud_setting) + return 0; + + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) + return 0; + + if (aclk_host_initializing(localhost)) + return 0; + + if (unlikely(aclk_disable_single_updates)) + return 0; + + if (aclk_popcorn_check_bump(host)) + return 0; + + if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue chart_update command"); + } + } + + return 0; +#endif +} + +int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +{ + BUFFER *local_buffer = NULL; + + if (unlikely(!netdata_ready)) + return 0; + + if (host != localhost) + return 0; + + if(unlikely(aclk_host_initializing(localhost))) + return 0; + + /* + * Check if individual updates have been disabled + * This will be the case when we do health reload + * and all the alarms will be dropped and recreated. + * At the end of the health reload the complete alarm metadata + * info will be sent + */ + if (unlikely(aclk_disable_single_updates)) + return 0; + + local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + char *msg_id = create_uuid(); + + buffer_flush(local_buffer); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg); + buffer_strcat(local_buffer, ",\n\t\"payload\": "); + + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); + health_alarm_entry2json_nolock(local_buffer, ae, host); + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); + + buffer_sprintf(local_buffer, "\n}"); + + if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { + if (likely(aclk_connected)) { + errno = 0; + error("ACLK failed to queue alarm_command on alarm_update"); + } + } + + freez(msg_id); + buffer_free(local_buffer); + + return 0; +} |