summaryrefslogtreecommitdiffstats
path: root/aclk/legacy/agent_cloud_link.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/legacy/agent_cloud_link.c')
-rw-r--r--aclk/legacy/agent_cloud_link.c432
1 files changed, 125 insertions, 307 deletions
diff --git a/aclk/legacy/agent_cloud_link.c b/aclk/legacy/agent_cloud_link.c
index 5ed7e66af..80ca23971 100644
--- a/aclk/legacy/agent_cloud_link.c
+++ b/aclk/legacy/agent_cloud_link.c
@@ -6,6 +6,7 @@
#include "aclk_query.h"
#include "aclk_common.h"
#include "aclk_stats.h"
+#include "../aclk_collector_list.h"
#ifdef ENABLE_ACLK
#include <libwebsockets.h>
@@ -15,46 +16,20 @@ int aclk_shutting_down = 0;
// Other global state
static int aclk_subscribed = 0;
-static int aclk_disable_single_updates = 0;
static char *aclk_username = NULL;
static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_force_reconnect = 0; // Indication from lower layers
-usec_t aclk_session_us = 0; // Used by the mqtt layer
-time_t aclk_session_sec = 0; // Used by the mqtt layer
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
-static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
-#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
-#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
-
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void aclk_lws_wss_destroy_context();
-/*
- * Maintain a list of collectors and chart count
- * If all the charts of a collector are deleted
- * then a new metadata dataset must be send to the cloud
- *
- */
-struct _collector {
- time_t created;
- uint32_t count; //chart count
- uint32_t hostname_hash;
- uint32_t plugin_hash;
- uint32_t module_hash;
- char *hostname;
- char *plugin_name;
- char *module_name;
- struct _collector *next;
-};
-
-struct _collector *collector_list = NULL;
char *create_uuid()
{
@@ -67,7 +42,7 @@ char *create_uuid()
return uuid_str;
}
-int cloud_to_agent_parse(JSON_ENTRY *e)
+int legacy_cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
@@ -247,202 +222,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size)
return final_topic;
}
-#ifndef __GNUC__
-#pragma region ACLK Internal Collector Tracking
-#endif
-
-/*
- * Free a collector structure
- */
-
-static void _free_collector(struct _collector *collector)
-{
- if (likely(collector->plugin_name))
- freez(collector->plugin_name);
-
- if (likely(collector->module_name))
- freez(collector->module_name);
-
- if (likely(collector->hostname))
- freez(collector->hostname);
-
- freez(collector);
-}
-
-/*
- * This will report the collector list
- *
- */
-#ifdef ACLK_DEBUG
-static void _dump_collector_list()
-{
- struct _collector *tmp_collector;
-
- COLLECTOR_LOCK;
-
- info("DUMPING ALL COLLECTORS");
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- info("DUMPING ALL COLLECTORS -- nothing found");
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
-
- while (tmp_collector) {
- info(
- "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
- tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
- tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
-
- tmp_collector = tmp_collector->next;
- }
- info("DUMPING ALL COLLECTORS DONE");
- COLLECTOR_UNLOCK;
-}
-#endif
-
-/*
- * This will cleanup the collector list
- *
- */
-static void _reset_collector_list()
-{
- struct _collector *tmp_collector, *next_collector;
-
- COLLECTOR_LOCK;
-
- if (unlikely(!collector_list || !collector_list->next)) {
- COLLECTOR_UNLOCK;
- return;
- }
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
- collector_list->count = 0;
- collector_list->next = NULL;
-
- // We broke the link; we can unlock
- COLLECTOR_UNLOCK;
-
- while (tmp_collector) {
- next_collector = tmp_collector->next;
- _free_collector(tmp_collector);
- tmp_collector = next_collector;
- }
-}
-
-/*
- * Find a collector (if it exists)
- * Must lock before calling this
- * If last_collector is not null, it will return the previous collector in the linked
- * list (used in collector delete)
- */
-static struct _collector *_find_collector(
- const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
-{
- struct _collector *tmp_collector, *prev_collector;
- uint32_t plugin_hash;
- uint32_t module_hash;
- uint32_t hostname_hash;
-
- if (unlikely(!collector_list)) {
- collector_list = callocz(1, sizeof(struct _collector));
- return NULL;
- }
-
- if (unlikely(!collector_list->next))
- return NULL;
-
- plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
- module_hash = module_name ? simple_hash(module_name) : 1;
- hostname_hash = simple_hash(hostname);
-
- // Note that the first entry is "dummy"
- tmp_collector = collector_list->next;
- prev_collector = collector_list;
- while (tmp_collector) {
- if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
- hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
- (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
- (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
- if (unlikely(last_collector))
- *last_collector = prev_collector;
-
- return tmp_collector;
- }
-
- prev_collector = tmp_collector;
- tmp_collector = tmp_collector->next;
- }
-
- return tmp_collector;
-}
-
-/*
- * Called to delete a collector
- * It will reduce the count (chart_count) and will remove it
- * from the linked list if the count reaches zero
- * The structure will be returned to the caller to free
- * the resources
- *
- */
-static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector, *prev_collector = NULL;
-
- tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
-
- if (likely(tmp_collector)) {
- --tmp_collector->count;
- if (unlikely(!tmp_collector->count))
- prev_collector->next = tmp_collector->next;
- }
- return tmp_collector;
-}
-
-/*
- * Add a new collector (plugin / module) to the list
- * If it already exists just update the chart count
- *
- * Lock before calling
- */
-static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
-{
- struct _collector *tmp_collector;
-
- tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
-
- if (unlikely(!tmp_collector)) {
- tmp_collector = callocz(1, sizeof(struct _collector));
- tmp_collector->hostname_hash = simple_hash(hostname);
- tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
- tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
-
- tmp_collector->hostname = strdupz(hostname);
- tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
- tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
-
- tmp_collector->next = collector_list->next;
- collector_list->next = tmp_collector;
- }
- tmp_collector->count++;
- debug(
- D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
- module_name ? module_name : "*", tmp_collector->count);
- return tmp_collector;
-}
-
-#ifndef __GNUC__
-#pragma endregion
-#endif
-
-/* Avoids the need to scan trough all RRDHOSTS
+/* Avoids the need to scan through all RRDHOSTS
* every time any Query Thread Wakes Up
* (every time we need to check child popcorn expiry)
- * call with ACLK_SHARED_STATE_LOCK held
+ * call with legacy_aclk_shared_state_LOCK held
*/
void aclk_update_next_child_to_popcorn(void)
{
@@ -462,19 +245,19 @@ void aclk_update_next_child_to_popcorn(void)
any = 1;
- if (unlikely(!aclk_shared_state.next_popcorn_host)) {
- aclk_shared_state.next_popcorn_host = host;
+ if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) {
+ legacy_aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
continue;
}
- if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
- aclk_shared_state.next_popcorn_host = host;
+ if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
+ legacy_aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
}
if(!any)
- aclk_shared_state.next_popcorn_host = NULL;
+ legacy_aclk_shared_state.next_popcorn_host = NULL;
rrd_unlock();
}
@@ -487,7 +270,7 @@ static int aclk_popcorn_check_bump(RRDHOST *host)
{
time_t now = now_monotonic_sec();
int updated = 0, ret;
- ACLK_SHARED_STATE_LOCK;
+ legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
ret = ACLK_IS_HOST_INITIALIZING(host);
@@ -502,12 +285,12 @@ static int aclk_popcorn_check_bump(RRDHOST *host)
if (host != localhost && updated)
aclk_update_next_child_to_popcorn();
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return ret;
}
rrdhost_aclk_state_unlock(host);
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return ret;
}
@@ -523,13 +306,13 @@ static void aclk_start_host_popcorning(RRDHOST *host)
{
usec_t now = now_monotonic_sec();
info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
- ACLK_SHARED_STATE_LOCK;
+ legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
errno = 0;
error("Localhost is allowed to do popcorning only once after startup!");
rrdhost_aclk_state_unlock(host);
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return;
}
@@ -539,16 +322,16 @@ static void aclk_start_host_popcorning(RRDHOST *host)
rrdhost_aclk_state_unlock(host);
if (host != localhost)
aclk_update_next_child_to_popcorn();
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
}
static void aclk_stop_host_popcorning(RRDHOST *host)
{
- ACLK_SHARED_STATE_LOCK;
+ legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
if (!ACLK_IS_HOST_POPCORNING(host)) {
rrdhost_aclk_state_unlock(host);
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
return;
}
@@ -557,18 +340,18 @@ static void aclk_stop_host_popcorning(RRDHOST *host)
host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
rrdhost_aclk_state_unlock(host);
- if(host == aclk_shared_state.next_popcorn_host) {
- aclk_shared_state.next_popcorn_host = NULL;
+ if(host == legacy_aclk_shared_state.next_popcorn_host) {
+ legacy_aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_UNLOCK;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
-void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@@ -589,7 +372,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
if(aclk_popcorn_check_bump(host))
return;
- if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}
@@ -601,7 +384,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
* This function will release the memory used and schedule
* a cloud update
*/
-void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
+void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@@ -628,7 +411,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
if (aclk_popcorn_check_bump(host))
return;
- if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
+ if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}
@@ -639,7 +422,7 @@ static void aclk_graceful_disconnect()
// Send a graceful disconnect message
BUFFER *b = buffer_create(512);
- aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}");
aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
buffer_free(b);
@@ -963,10 +746,10 @@ static void aclk_try_to_connect(char *hostname, int port)
aclk_connecting = 1;
create_publish_base_topic();
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.version_neg = 0;
- aclk_shared_state.version_neg_wait_till = 0;
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_LOCK;
+ legacy_aclk_shared_state.version_neg = 0;
+ legacy_aclk_shared_state.version_neg_wait_till = 0;
+ legacy_aclk_shared_state_UNLOCK;
rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password);
if (unlikely(rc)) {
@@ -981,10 +764,10 @@ static inline void aclk_hello_msg()
char *msg_id = create_uuid();
- ACLK_SHARED_STATE_LOCK;
- aclk_shared_state.version_neg = 0;
- aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
- ACLK_SHARED_STATE_UNLOCK;
+ legacy_aclk_shared_state_LOCK;
+ legacy_aclk_shared_state.version_neg = 0;
+ legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
+ legacy_aclk_shared_state_UNLOCK;
//Hello message is versioned separately from the rest of the protocol
aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
@@ -1004,7 +787,7 @@ static inline void aclk_hello_msg()
*
* @return It always returns NULL
*/
-void *aclk_main(void *ptr)
+void *legacy_aclk_main(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_query_threads query_threads;
@@ -1065,7 +848,7 @@ void *aclk_main(void *ptr)
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
netdata_thread_create(
- stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
+ stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread,
stats_thread);
}
@@ -1165,20 +948,20 @@ void *aclk_main(void *ptr)
}
if (unlikely(!query_threads.thread_list)) {
- aclk_query_threads_start(&query_threads);
+ legacy_aclk_query_threads_start(&query_threads);
}
time_t now = now_monotonic_sec();
if(aclk_connected && last_periodic_query_wakeup < now) {
- // to make `aclk_queue_query()` param `run_after` work
+ // to make `legacy_aclk_queue_query()` param `run_after` work
// also makes per child popcorning work
last_periodic_query_wakeup = now;
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
}
} // forever
exited:
// Wakeup query thread to cleanup
- QUERY_THREAD_WAKEUP_ALL;
+ LEGACY_QUERY_THREAD_WAKEUP_ALL;
freez(aclk_username);
freez(aclk_password);
@@ -1192,18 +975,18 @@ exited:
if (agent_id && aclk_connected) {
freez(agent_id);
// Wakeup thread to cleanup
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
aclk_graceful_disconnect();
}
- aclk_query_threads_cleanup(&query_threads);
+ legacy_aclk_query_threads_cleanup(&query_threads);
_reset_collector_list();
freez(collector_list);
if(aclk_stats_enabled) {
netdata_thread_join(*stats_thread->thread, NULL);
- aclk_stats_thread_cleanup();
+ legacy_aclk_stats_thread_cleanup();
freez(stats_thread->thread);
freez(stats_thread);
}
@@ -1306,12 +1089,12 @@ void aclk_connect()
{
info("Connection detected (%u queued queries)", aclk_query_size());
- aclk_stats_upd_online(1);
+ legacy_aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_reconnect_delay(0);
- QUERY_THREAD_WAKEUP;
+ LEGACY_QUERY_THREAD_WAKEUP;
return;
}
@@ -1321,7 +1104,7 @@ void aclk_disconnect()
if (likely(aclk_connected))
info("Disconnect detected (%u queued queries)", aclk_query_size());
- aclk_stats_upd_online(0);
+ legacy_aclk_stats_upd_online(0);
aclk_subscribed = 0;
rrdhost_aclk_state_lock(localhost);
@@ -1372,7 +1155,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts
*/
void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
-void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
+void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1388,9 +1171,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
else
- aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
@@ -1418,7 +1201,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
* /api/v1/info
* charts
*/
-int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
+int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@@ -1433,9 +1216,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *hos
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
- aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
else
- aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
@@ -1459,14 +1242,14 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
- if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
- fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg);
+ if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg);
debug(D_ACLK, "Sending Child Disconnect");
char *msg_id = create_uuid();
- aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\"payload\":");
@@ -1486,10 +1269,10 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
return 0;
}
-void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
+void legacy_aclk_host_state_update(RRDHOST *host, int connect)
{
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
- if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
+ if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
return;
#else
#warning "This check became unnecessary. Remove"
@@ -1498,19 +1281,14 @@ void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
if (unlikely(aclk_host_initializing(localhost)))
return;
- switch (cmd) {
- case ACLK_CMD_CHILD_CONNECT:
- debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
- aclk_start_host_popcorning(host);
- aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
- break;
- case ACLK_CMD_CHILD_DISCONNECT:
- debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
- aclk_stop_host_popcorning(host);
- aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
- break;
- default:
- error("Unknown command for aclk_host_state_update %d.", (int)cmd);
+ if (connect) {
+ debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
+ aclk_start_host_popcorning(host);
+ legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
+ } else {
+ debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
+ aclk_stop_host_popcorning(host);
+ legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
}
}
@@ -1537,31 +1315,21 @@ void aclk_send_stress_test(size_t size)
// or on request
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
{
- aclk_send_info_metadata(state, host);
+ legacy_aclk_send_info_metadata(state, host);
if(host == localhost)
- aclk_send_alarm_metadata(state);
+ legacy_aclk_send_alarm_metadata(state);
return 0;
}
-void aclk_single_update_disable()
-{
- aclk_disable_single_updates = 1;
-}
-
-void aclk_single_update_enable()
-{
- aclk_disable_single_updates = 0;
-}
-
// Triggered by a health reload, sends the alarm metadata
-void aclk_alarm_reload()
+void legacy_aclk_alarm_reload()
{
if (unlikely(aclk_host_initializing(localhost)))
return;
- if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
+ if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue on_connect command on alarm reload");
@@ -1585,7 +1353,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
rrdset2json(st, local_buffer, NULL, NULL, 1);
@@ -1598,7 +1366,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart)
return 0;
}
-int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
+int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
#ifndef ENABLE_ACLK
UNUSED(host);
@@ -1611,7 +1379,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (!netdata_cloud_setting)
return 0;
- if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
+ if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
return 0;
if (aclk_host_initializing(localhost))
@@ -1623,7 +1391,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (aclk_popcorn_check_bump(host))
return 0;
- if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
+ if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue chart_update command");
@@ -1634,7 +1402,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
#endif
}
-int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
+int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer = NULL;
@@ -1661,7 +1429,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
char *msg_id = create_uuid();
buffer_flush(local_buffer);
- aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg);
+ aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
@@ -1670,7 +1438,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
buffer_sprintf(local_buffer, "\n}");
- if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
+ if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue alarm_command on alarm_update");
@@ -1682,3 +1450,53 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
}
+
+char *legacy_aclk_state(void)
+{
+ BUFFER *wb = buffer_create(1024);
+ char *ret;
+
+ buffer_strcat(wb,
+ "ACLK Available: Yes\n"
+ "ACLK Implementation: Legacy\n"
+ "Claimed: "
+ );
+
+ char *agent_id = is_agent_claimed();
+ if (agent_id == NULL)
+ buffer_strcat(wb, "No\n");
+ else {
+ buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
+ freez(agent_id);
+ }
+
+ buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No");
+
+ ret = strdupz(buffer_tostring(wb));
+ buffer_free(wb);
+ return ret;
+}
+
+char *legacy_aclk_state_json(void)
+{
+ BUFFER *wb = buffer_create(1024);
+ char *agent_id = is_agent_claimed();
+
+ buffer_sprintf(wb,
+ "{\"aclk-available\":true,"
+ "\"aclk-implementation\":\"Legacy\","
+ "\"agent-claimed\":%s,"
+ "\"claimed-id\":",
+ agent_id ? "true" : "false"
+ );
+
+ if (agent_id) {
+ buffer_sprintf(wb, "\"%s\"", agent_id);
+ freez(agent_id);
+ } else
+ buffer_strcat(wb, "null");
+
+ buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false");
+
+ return strdupz(buffer_tostring(wb));
+}