From 483926a283e118590da3f9ecfa75a8a4d62143ce Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 1 Dec 2021 07:15:11 +0100 Subject: Merging upstream version 1.32.0. Signed-off-by: Daniel Baumann --- aclk/legacy/agent_cloud_link.c | 432 ++++++++++++----------------------------- 1 file changed, 125 insertions(+), 307 deletions(-) (limited to 'aclk/legacy/agent_cloud_link.c') diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c index 5ed7e66af..80ca23971 100644 --- a/aclk/legacy/agent_cloud_link.c +++ b/aclk/legacy/agent_cloud_link.c @@ -6,6 +6,7 @@ #include "aclk_query.h" #include "aclk_common.h" #include "aclk_stats.h" +#include "../aclk_collector_list.h" #ifdef ENABLE_ACLK #include @@ -15,46 +16,20 @@ int aclk_shutting_down = 0; // Other global state static int aclk_subscribed = 0; -static int aclk_disable_single_updates = 0; static char *aclk_username = NULL; static char *aclk_password = NULL; static char *global_base_topic = NULL; static int aclk_connecting = 0; int aclk_force_reconnect = 0; // Indication from lower layers -usec_t aclk_session_us = 0; // Used by the mqtt layer -time_t aclk_session_sec = 0; // Used by the mqtt layer static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER; -static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER; #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex) #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex) -#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex) -#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex) - void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len); void aclk_lws_wss_destroy_context(); -/* - * Maintain a list of collectors and chart count - * If all the charts of a collector are deleted - * then a new metadata dataset must be send to the cloud - * - */ -struct _collector { - time_t created; - uint32_t count; //chart count - uint32_t hostname_hash; - uint32_t plugin_hash; - uint32_t module_hash; - char *hostname; - char *plugin_name; - char *module_name; - struct _collector *next; -}; - -struct _collector *collector_list = NULL; char *create_uuid() { @@ -67,7 +42,7 @@ char *create_uuid() return uuid_str; } -int cloud_to_agent_parse(JSON_ENTRY *e) +int legacy_cloud_to_agent_parse(JSON_ENTRY *e) { struct aclk_request *data = e->callback_data; @@ -247,202 +222,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size) return final_topic; } -#ifndef __GNUC__ -#pragma region ACLK Internal Collector Tracking -#endif - -/* - * Free a collector structure - */ - -static void _free_collector(struct _collector *collector) -{ - if (likely(collector->plugin_name)) - freez(collector->plugin_name); - - if (likely(collector->module_name)) - freez(collector->module_name); - - if (likely(collector->hostname)) - freez(collector->hostname); - - freez(collector); -} - -/* - * This will report the collector list - * - */ -#ifdef ACLK_DEBUG -static void _dump_collector_list() -{ - struct _collector *tmp_collector; - - COLLECTOR_LOCK; - - info("DUMPING ALL COLLECTORS"); - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - info("DUMPING ALL COLLECTORS -- nothing found"); - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - - while (tmp_collector) { - info( - "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname, - tmp_collector->plugin_name ? tmp_collector->plugin_name : "", - tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count); - - tmp_collector = tmp_collector->next; - } - info("DUMPING ALL COLLECTORS DONE"); - COLLECTOR_UNLOCK; -} -#endif - -/* - * This will cleanup the collector list - * - */ -static void _reset_collector_list() -{ - struct _collector *tmp_collector, *next_collector; - - COLLECTOR_LOCK; - - if (unlikely(!collector_list || !collector_list->next)) { - COLLECTOR_UNLOCK; - return; - } - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - collector_list->count = 0; - collector_list->next = NULL; - - // We broke the link; we can unlock - COLLECTOR_UNLOCK; - - while (tmp_collector) { - next_collector = tmp_collector->next; - _free_collector(tmp_collector); - tmp_collector = next_collector; - } -} - -/* - * Find a collector (if it exists) - * Must lock before calling this - * If last_collector is not null, it will return the previous collector in the linked - * list (used in collector delete) - */ -static struct _collector *_find_collector( - const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector) -{ - struct _collector *tmp_collector, *prev_collector; - uint32_t plugin_hash; - uint32_t module_hash; - uint32_t hostname_hash; - - if (unlikely(!collector_list)) { - collector_list = callocz(1, sizeof(struct _collector)); - return NULL; - } - - if (unlikely(!collector_list->next)) - return NULL; - - plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - module_hash = module_name ? simple_hash(module_name) : 1; - hostname_hash = simple_hash(hostname); - - // Note that the first entry is "dummy" - tmp_collector = collector_list->next; - prev_collector = collector_list; - while (tmp_collector) { - if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash && - hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) && - (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) && - (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) { - if (unlikely(last_collector)) - *last_collector = prev_collector; - - return tmp_collector; - } - - prev_collector = tmp_collector; - tmp_collector = tmp_collector->next; - } - - return tmp_collector; -} - -/* - * Called to delete a collector - * It will reduce the count (chart_count) and will remove it - * from the linked list if the count reaches zero - * The structure will be returned to the caller to free - * the resources - * - */ -static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector, *prev_collector = NULL; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector); - - if (likely(tmp_collector)) { - --tmp_collector->count; - if (unlikely(!tmp_collector->count)) - prev_collector->next = tmp_collector->next; - } - return tmp_collector; -} - -/* - * Add a new collector (plugin / module) to the list - * If it already exists just update the chart count - * - * Lock before calling - */ -static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name) -{ - struct _collector *tmp_collector; - - tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL); - - if (unlikely(!tmp_collector)) { - tmp_collector = callocz(1, sizeof(struct _collector)); - tmp_collector->hostname_hash = simple_hash(hostname); - tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1; - tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1; - - tmp_collector->hostname = strdupz(hostname); - tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL; - tmp_collector->module_name = module_name ? strdupz(module_name) : NULL; - - tmp_collector->next = collector_list->next; - collector_list->next = tmp_collector; - } - tmp_collector->count++; - debug( - D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*", - module_name ? module_name : "*", tmp_collector->count); - return tmp_collector; -} - -#ifndef __GNUC__ -#pragma endregion -#endif - -/* Avoids the need to scan trough all RRDHOSTS +/* Avoids the need to scan through all RRDHOSTS * every time any Query Thread Wakes Up * (every time we need to check child popcorn expiry) - * call with ACLK_SHARED_STATE_LOCK held + * call with legacy_aclk_shared_state_LOCK held */ void aclk_update_next_child_to_popcorn(void) { @@ -462,19 +245,19 @@ void aclk_update_next_child_to_popcorn(void) any = 1; - if (unlikely(!aclk_shared_state.next_popcorn_host)) { - aclk_shared_state.next_popcorn_host = host; + if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) { + legacy_aclk_shared_state.next_popcorn_host = host; rrdhost_aclk_state_unlock(host); continue; } - if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) - aclk_shared_state.next_popcorn_host = host; + if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) + legacy_aclk_shared_state.next_popcorn_host = host; rrdhost_aclk_state_unlock(host); } if(!any) - aclk_shared_state.next_popcorn_host = NULL; + legacy_aclk_shared_state.next_popcorn_host = NULL; rrd_unlock(); } @@ -487,7 +270,7 @@ static int aclk_popcorn_check_bump(RRDHOST *host) { time_t now = now_monotonic_sec(); int updated = 0, ret; - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); ret = ACLK_IS_HOST_INITIALIZING(host); @@ -502,12 +285,12 @@ static int aclk_popcorn_check_bump(RRDHOST *host) if (host != localhost && updated) aclk_update_next_child_to_popcorn(); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return ret; } rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return ret; } @@ -523,13 +306,13 @@ static void aclk_start_host_popcorning(RRDHOST *host) { usec_t now = now_monotonic_sec(); info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { errno = 0; error("Localhost is allowed to do popcorning only once after startup!"); rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return; } @@ -539,16 +322,16 @@ static void aclk_start_host_popcorning(RRDHOST *host) rrdhost_aclk_state_unlock(host); if (host != localhost) aclk_update_next_child_to_popcorn(); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; } static void aclk_stop_host_popcorning(RRDHOST *host) { - ACLK_SHARED_STATE_LOCK; + legacy_aclk_shared_state_LOCK; rrdhost_aclk_state_lock(host); if (!ACLK_IS_HOST_POPCORNING(host)) { rrdhost_aclk_state_unlock(host); - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; return; } @@ -557,18 +340,18 @@ static void aclk_stop_host_popcorning(RRDHOST *host) host->aclk_state.metadata = ACLK_METADATA_REQUIRED; rrdhost_aclk_state_unlock(host); - if(host == aclk_shared_state.next_popcorn_host) { - aclk_shared_state.next_popcorn_host = NULL; + if(host == legacy_aclk_shared_state.next_popcorn_host) { + legacy_aclk_shared_state.next_popcorn_host = NULL; aclk_update_next_child_to_popcorn(); } - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_UNLOCK; } /* * Add a new collector to the list * If it exists, update the chart count */ -void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -589,7 +372,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu if(aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); } @@ -601,7 +384,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu * This function will release the memory used and schedule * a cloud update */ -void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) +void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -628,7 +411,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu if (aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); } @@ -639,7 +422,7 @@ static void aclk_graceful_disconnect() // Send a graceful disconnect message BUFFER *b = buffer_create(512); - aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}"); aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL); buffer_free(b); @@ -963,10 +746,10 @@ static void aclk_try_to_connect(char *hostname, int port) aclk_connecting = 1; create_publish_base_topic(); - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = 0; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + legacy_aclk_shared_state.version_neg = 0; + legacy_aclk_shared_state.version_neg_wait_till = 0; + legacy_aclk_shared_state_UNLOCK; rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password); if (unlikely(rc)) { @@ -981,10 +764,10 @@ static inline void aclk_hello_msg() char *msg_id = create_uuid(); - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.version_neg = 0; - aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; - ACLK_SHARED_STATE_UNLOCK; + legacy_aclk_shared_state_LOCK; + legacy_aclk_shared_state.version_neg = 0; + legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT; + legacy_aclk_shared_state_UNLOCK; //Hello message is versioned separately from the rest of the protocol aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION); @@ -1004,7 +787,7 @@ static inline void aclk_hello_msg() * * @return It always returns NULL */ -void *aclk_main(void *ptr) +void *legacy_aclk_main(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct aclk_query_threads query_threads; @@ -1065,7 +848,7 @@ void *aclk_main(void *ptr) stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; netdata_thread_create( - stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, + stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread, stats_thread); } @@ -1165,20 +948,20 @@ void *aclk_main(void *ptr) } if (unlikely(!query_threads.thread_list)) { - aclk_query_threads_start(&query_threads); + legacy_aclk_query_threads_start(&query_threads); } time_t now = now_monotonic_sec(); if(aclk_connected && last_periodic_query_wakeup < now) { - // to make `aclk_queue_query()` param `run_after` work + // to make `legacy_aclk_queue_query()` param `run_after` work // also makes per child popcorning work last_periodic_query_wakeup = now; - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; } } // forever exited: // Wakeup query thread to cleanup - QUERY_THREAD_WAKEUP_ALL; + LEGACY_QUERY_THREAD_WAKEUP_ALL; freez(aclk_username); freez(aclk_password); @@ -1192,18 +975,18 @@ exited: if (agent_id && aclk_connected) { freez(agent_id); // Wakeup thread to cleanup - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; aclk_graceful_disconnect(); } - aclk_query_threads_cleanup(&query_threads); + legacy_aclk_query_threads_cleanup(&query_threads); _reset_collector_list(); freez(collector_list); if(aclk_stats_enabled) { netdata_thread_join(*stats_thread->thread, NULL); - aclk_stats_thread_cleanup(); + legacy_aclk_stats_thread_cleanup(); freez(stats_thread->thread); freez(stats_thread); } @@ -1306,12 +1089,12 @@ void aclk_connect() { info("Connection detected (%u queued queries)", aclk_query_size()); - aclk_stats_upd_online(1); + legacy_aclk_stats_upd_online(1); aclk_connected = 1; aclk_reconnect_delay(0); - QUERY_THREAD_WAKEUP; + LEGACY_QUERY_THREAD_WAKEUP; return; } @@ -1321,7 +1104,7 @@ void aclk_disconnect() if (likely(aclk_connected)) info("Disconnect detected (%u queued queries)", aclk_query_size()); - aclk_stats_upd_online(0); + legacy_aclk_stats_upd_online(0); aclk_subscribed = 0; rrdhost_aclk_state_lock(localhost); @@ -1372,7 +1155,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts */ void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb); -void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) +void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1388,9 +1171,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); @@ -1418,7 +1201,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) * /api/v1/info * charts */ -int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) +int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1433,9 +1216,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *hos // a fake on_connect message then use the real timestamp to indicate it is within the existing // session. if (metadata_submitted == ACLK_METADATA_SENT) - aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); else - aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"info\" : "); @@ -1459,14 +1242,14 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); local_buffer->contenttype = CT_APPLICATION_JSON; - if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) - fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg); + if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg); debug(D_ACLK, "Sending Child Disconnect"); char *msg_id = create_uuid(); - aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\"payload\":"); @@ -1486,10 +1269,10 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) return 0; } -void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +void legacy_aclk_host_state_update(RRDHOST *host, int connect) { #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE - if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) return; #else #warning "This check became unnecessary. Remove" @@ -1498,19 +1281,14 @@ void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) if (unlikely(aclk_host_initializing(localhost))) return; - switch (cmd) { - case ACLK_CMD_CHILD_CONNECT: - debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); - aclk_start_host_popcorning(host); - aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); - break; - case ACLK_CMD_CHILD_DISCONNECT: - debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); - aclk_stop_host_popcorning(host); - aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); - break; - default: - error("Unknown command for aclk_host_state_update %d.", (int)cmd); + if (connect) { + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + } else { + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); } } @@ -1537,31 +1315,21 @@ void aclk_send_stress_test(size_t size) // or on request int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host) { - aclk_send_info_metadata(state, host); + legacy_aclk_send_info_metadata(state, host); if(host == localhost) - aclk_send_alarm_metadata(state); + legacy_aclk_send_alarm_metadata(state); return 0; } -void aclk_single_update_disable() -{ - aclk_disable_single_updates = 1; -} - -void aclk_single_update_enable() -{ - aclk_disable_single_updates = 0; -} - // Triggered by a health reload, sends the alarm metadata -void aclk_alarm_reload() +void legacy_aclk_alarm_reload() { if (unlikely(aclk_host_initializing(localhost))) return; - if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue on_connect command on alarm reload"); @@ -1585,7 +1353,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart) buffer_flush(local_buffer); local_buffer->contenttype = CT_APPLICATION_JSON; - aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); rrdset2json(st, local_buffer, NULL, NULL, 1); @@ -1598,7 +1366,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart) return 0; } -int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) +int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create) { #ifndef ENABLE_ACLK UNUSED(host); @@ -1611,7 +1379,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (!netdata_cloud_setting) return 0; - if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) + if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) return 0; if (aclk_host_initializing(localhost)) @@ -1623,7 +1391,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (aclk_popcorn_check_bump(host)) return 0; - if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { + if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue chart_update command"); @@ -1634,7 +1402,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) #endif } -int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) +int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) { BUFFER *local_buffer = NULL; @@ -1661,7 +1429,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) char *msg_id = create_uuid(); buffer_flush(local_buffer); - aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg); + aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg); buffer_strcat(local_buffer, ",\n\t\"payload\": "); netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); @@ -1670,7 +1438,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) buffer_sprintf(local_buffer, "\n}"); - if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { + if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue alarm_command on alarm_update"); @@ -1682,3 +1450,53 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) return 0; } + +char *legacy_aclk_state(void) +{ + BUFFER *wb = buffer_create(1024); + char *ret; + + buffer_strcat(wb, + "ACLK Available: Yes\n" + "ACLK Implementation: Legacy\n" + "Claimed: " + ); + + char *agent_id = is_agent_claimed(); + if (agent_id == NULL) + buffer_strcat(wb, "No\n"); + else { + buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id); + freez(agent_id); + } + + buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No"); + + ret = strdupz(buffer_tostring(wb)); + buffer_free(wb); + return ret; +} + +char *legacy_aclk_state_json(void) +{ + BUFFER *wb = buffer_create(1024); + char *agent_id = is_agent_claimed(); + + buffer_sprintf(wb, + "{\"aclk-available\":true," + "\"aclk-implementation\":\"Legacy\"," + "\"agent-claimed\":%s," + "\"claimed-id\":", + agent_id ? "true" : "false" + ); + + if (agent_id) { + buffer_sprintf(wb, "\"%s\"", agent_id); + freez(agent_id); + } else + buffer_strcat(wb, "null"); + + buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false"); + + return strdupz(buffer_tostring(wb)); +} -- cgit v1.2.3