summaryrefslogtreecommitdiffstats
path: root/database/sqlite
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
commita836a244a3d2bdd4da1ee2641e3e957850668cea (patch)
treecb87c75b3677fab7144f868435243f864048a1e6 /database/sqlite
parentAdding upstream version 1.38.1. (diff)
downloadnetdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz
netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--database/sqlite/sqlite_aclk.c908
-rw-r--r--database/sqlite/sqlite_aclk.h125
-rw-r--r--database/sqlite/sqlite_aclk_alert.c719
-rw-r--r--database/sqlite/sqlite_aclk_alert.h26
-rw-r--r--database/sqlite/sqlite_aclk_node.c100
-rw-r--r--database/sqlite/sqlite_aclk_node.h3
-rw-r--r--database/sqlite/sqlite_context.c24
-rw-r--r--database/sqlite/sqlite_db_migration.c4
-rw-r--r--database/sqlite/sqlite_functions.c92
-rw-r--r--database/sqlite/sqlite_functions.h3
-rw-r--r--database/sqlite/sqlite_health.c163
-rw-r--r--database/sqlite/sqlite_metadata.c549
-rw-r--r--database/sqlite/sqlite_metadata.h2
13 files changed, 1329 insertions, 1389 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 3b0c4052..a33e09f5 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -5,58 +5,176 @@
#include "sqlite_aclk_node.h"
+struct aclk_sync_config_s {
+ uv_thread_t thread;
+ uv_loop_t loop;
+ uv_timer_t timer_req;
+ time_t cleanup_after; // Start a cleanup after this timestamp
+ uv_async_t async;
+ /* FIFO command queue */
+ uv_mutex_t cmd_mutex;
+ uv_cond_t cmd_cond;
+ bool initialized;
+ volatile unsigned queue_size;
+ struct aclk_database_cmdqueue cmd_queue;
+} aclk_sync_config = { 0 };
+
+
void sanity_check(void) {
// make sure the compiler will stop on misconfigurations
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
}
-static int sql_check_aclk_table(void *data, int argc, char **argv, char **column)
+
+int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd)
{
- struct aclk_database_worker_config *wc = data;
- UNUSED(argc);
- UNUSED(column);
+ unsigned queue_size;
- debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]);
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_DELETE_HOST;
- cmd.data = strdupz((char *) argv[0]);
- aclk_database_enq_cmd_noblock(wc, &cmd);
+ /* wait for free space in queue */
+ uv_mutex_lock(&aclk_sync_config.cmd_mutex);
+ if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
+ return 1;
+ }
+
+ fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
+ aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
+ aclk_sync_config.cmd_queue.tail + 1 : 0;
+ aclk_sync_config.queue_size = queue_size + 1;
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
return 0;
}
-#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
-
-static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
+static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd)
{
- char *err_msg = NULL;
- debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
- int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
- sqlite3_free(err_msg);
+ unsigned queue_size;
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&aclk_sync_config.cmd_mutex);
+ while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
+ uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex);
}
+ fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
+ /* enqueue command */
+ aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
+ aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
+ aclk_sync_config.cmd_queue.tail + 1 : 0;
+ aclk_sync_config.queue_size = queue_size + 1;
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
+
+ /* wake up event loop */
+ int rc = uv_async_send(&aclk_sync_config.async);
+ if (unlikely(rc))
+ debug(D_ACLK_SYNC, "Failed to wake up event loop");
}
-static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+enum {
+ IDX_HOST_ID,
+ IDX_HOSTNAME,
+ IDX_REGISTRY,
+ IDX_UPDATE_EVERY,
+ IDX_OS,
+ IDX_TIMEZONE,
+ IDX_TAGS,
+ IDX_HOPS,
+ IDX_MEMORY_MODE,
+ IDX_ABBREV_TIMEZONE,
+ IDX_UTC_OFFSET,
+ IDX_PROGRAM_NAME,
+ IDX_PROGRAM_VERSION,
+ IDX_ENTRIES,
+ IDX_HEALTH_ENABLED,
+};
+
+static int create_host_callback(void *data, int argc, char **argv, char **column)
{
- UNUSED(cmd);
+ int *number_of_chidren = data;
+ UNUSED(argc);
+ UNUSED(column);
- debug(D_ACLK, "Checking database for %s", wc->host_guid);
+ char guid[UUID_STR_LEN];
+ uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid);
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
+ struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
- buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND "
- "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL);
- db_execute(buffer_tostring(sql));
+ system_info->hops = str2i((const char *) argv[IDX_HOPS]);
- buffer_free(sql);
+ sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info);
+
+ RRDHOST *host = rrdhost_find_or_create(
+ (const char *) argv[IDX_HOSTNAME]
+ , (const char *) argv[IDX_REGISTRY]
+ , guid
+ , (const char *) argv[IDX_OS]
+ , (const char *) argv[IDX_TIMEZONE]
+ , (const char *) argv[IDX_ABBREV_TIMEZONE]
+ , (int32_t) (argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0)
+ , (const char *) argv[IDX_TAGS]
+ , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown")
+ , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown")
+ , argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1
+ , argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0
+ , default_rrd_memory_mode
+ , 0 // health
+ , 0 // rrdpush enabled
+ , NULL //destination
+ , NULL // api key
+ , NULL // send charts matching
+ , false // rrdpush_enable_replication
+ , 0 // rrdpush_seconds_to_replicate
+ , 0 // rrdpush_replication_step
+ , system_info
+ , 1
+ );
+ if (likely(host))
+ host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]);
+
+ (*number_of_chidren)++;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ char node_str[UUID_STR_LEN] = "<none>";
+ if (likely(host->node_id))
+ uuid_unparse_lower(*host->node_id, node_str);
+ internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str);
+#endif
+ return 0;
}
+#ifdef ENABLE_ACLK
+static struct aclk_database_cmd aclk_database_deq_cmd(void)
+{
+ struct aclk_database_cmd ret;
+ unsigned queue_size;
-#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
+ uv_mutex_lock(&aclk_sync_config.cmd_mutex);
+ queue_size = aclk_sync_config.queue_size;
+ if (queue_size == 0) {
+ memset(&ret, 0, sizeof(ret));
+ ret.opcode = ACLK_DATABASE_NOOP;
+ ret.completion = NULL;
+
+ } else {
+ /* dequeue command */
+ ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head];
+ if (queue_size == 1) {
+ aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
+ } else {
+ aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
+ aclk_sync_config.cmd_queue.head + 1 : 0;
+ }
+ aclk_sync_config.queue_size = queue_size - 1;
+ /* wake up producers */
+ uv_cond_signal(&aclk_sync_config.cmd_cond);
+ }
+ uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
+ return ret;
+}
+
+#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
static int is_host_available(uuid_t *host_id)
{
sqlite3_stmt *res = NULL;
@@ -76,7 +194,7 @@ static int is_host_available(uuid_t *host_id)
rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id parameter to select node instance information");
+ error_report("Failed to bind host_id parameter to check host existence");
goto failed;
}
rc = sqlite3_step_monitored(res);
@@ -89,15 +207,13 @@ failed:
}
// OPCODE: ACLK_DATABASE_DELETE_HOST
-void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+static void sql_delete_aclk_table_list(char *host_guid)
{
- UNUSED(wc);
- char uuid_str[GUID_LEN + 1];
- char host_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
+ char host_str[UUID_STR_LEN];
int rc;
uuid_t host_uuid;
- char *host_guid = (char *)cmd.data;
if (unlikely(!host_guid))
return;
@@ -139,273 +255,67 @@ void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct a
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc);
- db_execute(buffer_tostring(sql));
+ rc = db_execute(db_meta, buffer_tostring(sql));
+ if (unlikely(rc))
+ error("Failed to drop unused ACLK tables");
fail:
buffer_free(sql);
}
-uv_mutex_t aclk_async_lock;
-struct aclk_database_worker_config *aclk_thread_head = NULL;
-
-int claimed()
+static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused)
{
- int rc;
- rrdhost_aclk_state_lock(localhost);
- rc = (localhost->aclk_state.claimed_id != NULL);
- rrdhost_aclk_state_unlock(localhost);
- return rc;
-}
-
-void aclk_add_worker_thread(struct aclk_database_worker_config *wc)
-{
- if (unlikely(!wc))
- return;
-
- uv_mutex_lock(&aclk_async_lock);
- if (unlikely(!wc->host)) {
- wc->next = aclk_thread_head;
- aclk_thread_head = wc;
- }
- uv_mutex_unlock(&aclk_async_lock);
+ debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]);
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_DELETE_HOST;
+ cmd.param[0] = strdupz((char *) argv[0]);
+ aclk_database_enq_cmd_noblock(&cmd);
+ return 0;
}
-void aclk_del_worker_thread(struct aclk_database_worker_config *wc)
-{
- if (unlikely(!wc))
- return;
-
- uv_mutex_lock(&aclk_async_lock);
- struct aclk_database_worker_config **tmp = &aclk_thread_head;
- while (*tmp && (*tmp) != wc)
- tmp = &(*tmp)->next;
- if (*tmp)
- *tmp = wc->next;
- uv_mutex_unlock(&aclk_async_lock);
-}
+#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
+ "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
-int aclk_worker_thread_exists(char *guid)
+static void sql_check_aclk_table_list(void)
{
- int rc = 0;
- uv_mutex_lock(&aclk_async_lock);
-
- struct aclk_database_worker_config *tmp = aclk_thread_head;
-
- while (tmp && !rc) {
- rc = strcmp(tmp->uuid_str, guid) == 0;
- tmp = tmp->next;
+ char *err_msg = NULL;
+ debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
+ int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg);
+ if (rc != SQLITE_OK) {
+ error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
+ sqlite3_free(err_msg);
}
- uv_mutex_unlock(&aclk_async_lock);
- return rc;
}
-void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc)
-{
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- wc->queue_size = 0;
- fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
- fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
-}
+#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d;"
-int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
+static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE || wc->is_shutting_down) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return 1;
- }
-
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
- uv_mutex_unlock(&wc->cmd_mutex);
+ char sql[512];
+ snprintfz(sql,511, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL);
+ if (unlikely(db_execute(db_meta, sql)))
+ error_report("Failed to clean stale ACLK alert entries");
return 0;
}
-void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
-{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- if (wc->is_shutting_down) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return;
- }
- while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
- }
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
- uv_mutex_unlock(&wc->cmd_mutex);
-
- /* wake up event loop */
- int rc = uv_async_send(&wc->async);
- if (unlikely(rc))
- debug(D_ACLK_SYNC, "Failed to wake up event loop");
-}
-
-struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_config* wc)
-{
- struct aclk_database_cmd ret;
- unsigned queue_size;
- uv_mutex_lock(&wc->cmd_mutex);
- queue_size = wc->queue_size;
- if (queue_size == 0 || wc->is_shutting_down) {
- memset(&ret, 0, sizeof(ret));
- ret.opcode = ACLK_DATABASE_NOOP;
- ret.completion = NULL;
- if (wc->is_shutting_down)
- uv_cond_signal(&wc->cmd_cond);
- } else {
- /* dequeue command */
- ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
- if (queue_size == 1) {
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- } else {
- wc->cmd_queue.head = wc->cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.head + 1 : 0;
- }
- wc->queue_size = queue_size - 1;
- /* wake up producers */
- uv_cond_signal(&wc->cmd_cond);
- }
- uv_mutex_unlock(&wc->cmd_mutex);
-
- return ret;
-}
+#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');"
-struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id)
+static void sql_maint_aclk_sync_database_all(void)
{
- if (unlikely(!node_id))
- return NULL;
-
- uv_mutex_lock(&aclk_async_lock);
- struct aclk_database_worker_config *wc = aclk_thread_head;
-
- while (wc) {
- if (!strcmp(wc->node_id, node_id))
- break;
- wc = wc->next;
+ char *err_msg = NULL;
+ debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
+ int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ALERT_LIST, sql_maint_aclk_sync_database, NULL, &err_msg);
+ if (rc != SQLITE_OK) {
+ error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
+ sqlite3_free(err_msg);
}
- uv_mutex_unlock(&aclk_async_lock);
-
- return (wc);
}
-void aclk_sync_exit_all()
-{
- rrd_rdlock();
- RRDHOST *host;
- rrdhost_foreach_read(host) {
- struct aclk_database_worker_config *wc = host->dbsync_worker;
- if (wc) {
- wc->is_shutting_down = 1;
- (void) aclk_database_deq_cmd(wc);
- uv_cond_signal(&wc->cmd_cond);
- }
- }
- rrd_unlock();
-
- uv_mutex_lock(&aclk_async_lock);
- struct aclk_database_worker_config *wc = aclk_thread_head;
- while (wc) {
- wc->is_shutting_down = 1;
- wc = wc->next;
- }
- uv_mutex_unlock(&aclk_async_lock);
-}
-
-enum {
- IDX_HOST_ID,
- IDX_HOSTNAME,
- IDX_REGISTRY,
- IDX_UPDATE_EVERY,
- IDX_OS,
- IDX_TIMEZONE,
- IDX_TAGS,
- IDX_HOPS,
- IDX_MEMORY_MODE,
- IDX_ABBREV_TIMEZONE,
- IDX_UTC_OFFSET,
- IDX_PROGRAM_NAME,
- IDX_PROGRAM_VERSION,
- IDX_ENTRIES,
- IDX_HEALTH_ENABLED,
-};
-
-static int create_host_callback(void *data, int argc, char **argv, char **column)
-{
- UNUSED(data);
- UNUSED(argc);
- UNUSED(column);
-
- char guid[UUID_STR_LEN];
- uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid);
-
- struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
- __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
-
- system_info->hops = str2i((const char *) argv[IDX_HOPS]);
-
- sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info);
-
- RRDHOST *host = rrdhost_find_or_create(
- (const char *) argv[IDX_HOSTNAME]
- , (const char *) argv[IDX_REGISTRY]
- , guid
- , (const char *) argv[IDX_OS]
- , (const char *) argv[IDX_TIMEZONE]
- , (const char *) argv[IDX_ABBREV_TIMEZONE]
- , argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET]) : 0
- , (const char *) argv[IDX_TAGS]
- , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown")
- , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown")
- , argv[3] ? str2i(argv[IDX_UPDATE_EVERY]) : 1
- , argv[13] ? str2i(argv[IDX_ENTRIES]) : 0
- , default_rrd_memory_mode
- , 0 // health
- , 0 // rrdpush enabled
- , NULL //destination
- , NULL // api key
- , NULL // send charts matching
- , false // rrdpush_enable_replication
- , 0 // rrdpush_seconds_to_replicate
- , 0 // rrdpush_replication_step
- , system_info
- , 1
- );
- if (likely(host))
- host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]);
-
-#ifdef NETDATA_INTERNAL_CHECKS
- char node_str[UUID_STR_LEN] = "<none>";
- if (likely(host->node_id))
- uuid_unparse_lower(*host->node_id, node_str);
- internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str);
-#endif
- return 0;
-}
-
-#ifdef ENABLE_ACLK
-static int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
+static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
char uuid_str[GUID_LEN + 1];
- UNUSED(data);
- UNUSED(argc);
- UNUSED(column);
-
uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str);
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
@@ -415,156 +325,81 @@ static int aclk_start_sync_thread(void *data, int argc, char **argv, char **colu
sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]);
return 0;
}
-#endif
-void sql_aclk_sync_init(void)
-{
- char *err_msg = NULL;
- int rc;
-
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) {
- return;
- }
- error_report("Database has not been initialized");
- return;
- }
-
- info("Creating archived hosts");
- rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, "
- "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, "
- "program_version, entries, health_enabled FROM host WHERE hops >0;",
- create_host_callback, NULL, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg);
- sqlite3_free(err_msg);
- }
-
-#ifdef ENABLE_ACLK
- fatal_assert(0 == uv_mutex_init(&aclk_async_lock));
- rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE "
- "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg);
- sqlite3_free(err_msg);
- }
- info("ACLK sync initialization completed");
-#endif
-}
static void async_cb(uv_async_t *handle)
{
uv_stop(handle->loop);
uv_update_time(handle->loop);
- debug(D_ACLK_SYNC, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
#define TIMER_PERIOD_MS (1000)
-static void timer_cb(uv_timer_t* handle)
+static void timer_cb(uv_timer_t *handle)
{
uv_stop(handle->loop);
uv_update_time(handle->loop);
-#ifdef ENABLE_ACLK
- struct aclk_database_worker_config *wc = handle->data;
+ struct aclk_sync_config_s *config = handle->data;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_TIMER;
- aclk_database_enq_cmd_noblock(wc, &cmd);
time_t now = now_realtime_sec();
- if (wc->cleanup_after && wc->cleanup_after < now) {
+ if (config->cleanup_after && config->cleanup_after < now) {
cmd.opcode = ACLK_DATABASE_CLEANUP;
- if (!aclk_database_enq_cmd_noblock(wc, &cmd))
- wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
+ if (!aclk_database_enq_cmd_noblock(&cmd))
+ config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
if (aclk_connected) {
- if (wc->alert_updates && !wc->pause_alert_updates) {
- cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
- cmd.count = ACLK_MAX_ALERT_UPDATES;
- aclk_database_enq_cmd_noblock(wc, &cmd);
- }
+ cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
+ aclk_database_enq_cmd_noblock(&cmd);
+
+ aclk_check_node_info_and_collectors();
}
-#endif
}
-static void aclk_database_worker(void *arg)
+static void aclk_synchronization(void *arg __maybe_unused)
{
- service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
+ struct aclk_sync_config_s *config = arg;
+ uv_thread_set_name_np(config->thread, "ACLKSYNC");
worker_register("ACLKSYNC");
+ service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
+
worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
- worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan");
- worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log");
worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
- worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info");
- worker_register_job_name(ACLK_DATABASE_NODE_COLLECTORS, "node collectors");
+ worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
+ worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot");
worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check");
worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
- struct aclk_database_worker_config *wc = arg;
- uv_loop_t *loop;
- int ret;
- enum aclk_database_opcode opcode;
- uv_timer_t timer_req;
- struct aclk_database_cmd cmd;
+ uv_loop_t *loop = &config->loop;
+ fatal_assert(0 == uv_loop_init(loop));
+ fatal_assert(0 == uv_async_init(loop, &config->async, async_cb));
- char threadname[NETDATA_THREAD_NAME_MAX+1];
- if (wc->host)
- snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host));
- else {
- snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", wc->uuid_str);
- threadname[11] = '\0';
- }
- uv_thread_set_name_np(wc->thread, threadname);
-
- loop = wc->loop = mallocz(sizeof(uv_loop_t));
- ret = uv_loop_init(loop);
- if (ret) {
- error("uv_loop_init(): %s", uv_strerror(ret));
- goto error_after_loop_init;
- }
- loop->data = wc;
+ fatal_assert(0 == uv_timer_init(loop, &config->timer_req));
+ config->timer_req.data = config;
+ fatal_assert(0 == uv_timer_start(&config->timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
- ret = uv_async_init(wc->loop, &wc->async, async_cb);
- if (ret) {
- error("uv_async_init(): %s", uv_strerror(ret));
- goto error_after_async_init;
- }
- wc->async.data = wc;
-
- ret = uv_timer_init(loop, &timer_req);
- if (ret) {
- error("uv_timer_init(): %s", uv_strerror(ret));
- goto error_after_timer_init;
- }
- timer_req.data = wc;
- fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
-
- wc->node_info_send = 1;
- info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, (unsigned long int) sizeof(*wc));
-
- memset(&cmd, 0, sizeof(cmd));
+ info("Starting ACLK synchronization thread");
- wc->startup_time = now_realtime_sec();
- wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST;
+ config->cleanup_after = now_realtime_sec() + ACLK_DATABASE_CLEANUP_FIRST;
+ config->initialized = true;
- debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count);
-
- while (likely(!netdata_exit)) {
+ while (likely(service_running(SERVICE_ACLKSYNC))) {
+ enum aclk_database_opcode opcode;
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
/* wait for commands */
do {
- cmd = aclk_database_deq_cmd(wc);
+ struct aclk_database_cmd cmd = aclk_database_deq_cmd();
- if (netdata_exit)
+ if (unlikely(!service_running(SERVICE_ACLKSYNC)))
break;
opcode = cmd.opcode;
@@ -576,201 +411,216 @@ static void aclk_database_worker(void *arg)
case ACLK_DATABASE_NOOP:
/* the command queue was empty, do nothing */
break;
-
// MAINTENANCE
case ACLK_DATABASE_CLEANUP:
- debug(D_ACLK_SYNC, "Database cleanup for %s", wc->host_guid);
-
- if (wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST + 2 < now_realtime_sec() && claimed() && aclk_connected) {
- cmd.opcode = ACLK_DATABASE_NODE_INFO;
- cmd.completion = NULL;
- (void) aclk_database_enq_cmd_noblock(wc, &cmd);
- }
-
- sql_maint_aclk_sync_database(wc, cmd);
- if (wc->host == localhost)
- sql_check_aclk_table_list(wc);
+ // Scan all aclk_alert_ tables and cleanup as needed
+ sql_maint_aclk_sync_database_all();
+ sql_check_aclk_table_list();
break;
case ACLK_DATABASE_DELETE_HOST:
- debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data);
- sql_delete_aclk_table_list(wc, cmd);
+ sql_delete_aclk_table_list(cmd.param[0]);
+ break;
+// NODE STATE
+ case ACLK_DATABASE_NODE_STATE:;
+ RRDHOST *host = cmd.param[0];
+ int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
+ struct aclk_sync_host_config *ahc = host->aclk_sync_host_config;
+ if (unlikely(!ahc))
+ sql_create_aclk_table(host, &host->host_uuid, host->node_id);
+ aclk_host_state_update(host, live);
break;
-
// ALERTS
case ACLK_DATABASE_PUSH_ALERT_CONFIG:
- debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid);
- aclk_push_alert_config_event(wc, cmd);
+ aclk_push_alert_config_event(cmd.param[0], cmd.param[1]);
break;
case ACLK_DATABASE_PUSH_ALERT:
- debug(D_ACLK_SYNC, "Pushing alert info to the cloud for %s", wc->host_guid);
- aclk_push_alert_event(wc, cmd);
- break;
- case ACLK_DATABASE_ALARM_HEALTH_LOG:
- debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid);
- aclk_push_alarm_health_log(wc, cmd);
+ aclk_push_alert_events_for_all_hosts();
break;
- case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:
- debug(D_ACLK_SYNC, "Pushing alert snapshot to the cloud for node %s", wc->host_guid);
- aclk_push_alert_snapshot_event(wc, cmd);
+ case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:;
+ aclk_push_alert_snapshot_event(cmd.param[0]);
break;
case ACLK_DATABASE_QUEUE_REMOVED_ALERTS:
- debug(D_ACLK_SYNC, "Queueing removed alerts for node %s", wc->host_guid);
- sql_process_queue_removed_alerts_to_aclk(wc, cmd);
- break;
-
-// NODE OPERATIONS
- case ACLK_DATABASE_NODE_INFO:
- debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str);
- sql_build_node_info(wc, cmd);
- break;
- case ACLK_DATABASE_NODE_COLLECTORS:
- debug(D_ACLK_SYNC,"Sending node collectors info for %s", wc->uuid_str);
- sql_build_node_collectors(wc);
- break;
-#ifdef ENABLE_ACLK
-
-// NODE_INSTANCE DETECTION
- case ACLK_DATABASE_ORPHAN_HOST:
- wc->host = NULL;
- wc->is_orphan = 1;
- aclk_add_worker_thread(wc);
- break;
-#endif
- case ACLK_DATABASE_TIMER:
- if (unlikely(localhost && !wc->host && !wc->is_orphan)) {
- if (claimed()) {
- wc->host = rrdhost_find_by_guid(wc->host_guid);
- if (wc->host) {
- info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid);
- snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host));
- uv_thread_set_name_np(wc->thread, threadname);
- wc->host->dbsync_worker = wc;
- if (unlikely(!wc->hostname))
- wc->hostname = strdupz(rrdhost_hostname(wc->host));
- aclk_del_worker_thread(wc);
- wc->node_info_send = 1;
- }
- }
- }
- if (wc->node_info_send && localhost && claimed() && aclk_connected) {
- cmd.opcode = ACLK_DATABASE_NODE_INFO;
- cmd.completion = NULL;
- wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd);
- }
- if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) {
- cmd.opcode = ACLK_DATABASE_NODE_COLLECTORS;
- cmd.completion = NULL;
- wc->node_collectors_send = aclk_database_enq_cmd_noblock(wc, &cmd);
- }
- if (localhost == wc->host)
- (void) sqlite3_wal_checkpoint(db_meta, NULL);
+ sql_process_queue_removed_alerts_to_aclk(cmd.param[0]);
break;
default:
debug(D_ACLK_SYNC, "%s: default.", __func__);
break;
}
if (cmd.completion)
- aclk_complete(cmd.completion);
+ completion_mark_complete(cmd.completion);
} while (opcode != ACLK_DATABASE_NOOP);
}
- if (!uv_timer_stop(&timer_req))
- uv_close((uv_handle_t *)&timer_req, NULL);
-
- /* cleanup operations of the event loop */
- //info("Shutting down ACLK sync event loop for %s", wc->host_guid);
+ if (!uv_timer_stop(&config->timer_req))
+ uv_close((uv_handle_t *)&config->timer_req, NULL);
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour we need to be aware if this becomes
- * an issue in the future.
- */
- uv_close((uv_handle_t *)&wc->async, NULL);
- uv_run(loop, UV_RUN_DEFAULT);
+ uv_close((uv_handle_t *)&config->async, NULL);
+// uv_close((uv_handle_t *)&config->async_exit, NULL);
+ uv_cond_destroy(&config->cmd_cond);
+ (void) uv_loop_close(loop);
- info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid);
- /* TODO: don't let the API block by waiting to enqueue commands */
- uv_cond_destroy(&wc->cmd_cond);
-
- int rc;
- do {
- rc = uv_loop_close(loop);
- } while (rc != UV_EBUSY);
-
- freez(loop);
+ worker_unregister();
+ service_exits();
+ info("ACLK SYNC: Shutting down ACLK synchronization event loop");
+}
- rrd_rdlock();
- if (likely(wc->host))
- wc->host->dbsync_worker = NULL;
- freez(wc->hostname);
- freez(wc);
- rrd_unlock();
+static void aclk_synchronization_init(void)
+{
+ aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
+ aclk_sync_config.queue_size = 0;
+ fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond));
+ fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex));
- worker_unregister();
- return;
-
-error_after_timer_init:
- uv_close((uv_handle_t *)&wc->async, NULL);
-error_after_async_init:
- fatal_assert(0 == uv_loop_close(loop));
-error_after_loop_init:
- freez(loop);
- worker_unregister();
+ fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config));
}
+#endif
// -------------------------------------------------------------
-void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
+void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused)
{
#ifdef ENABLE_ACLK
char uuid_str[GUID_LEN + 1];
char host_guid[GUID_LEN + 1];
+ int rc;
uuid_unparse_lower_fix(host_uuid, uuid_str);
-
- if (aclk_worker_thread_exists(uuid_str))
- return;
-
uuid_unparse_lower(*host_uuid, host_guid);
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
+ char sql[ACLK_SYNC_QUERY_SIZE];
- buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str);
- db_execute(buffer_tostring(sql));
- buffer_flush(sql);
-
- buffer_sprintf(sql, INDEX_ACLK_ALERT, uuid_str, uuid_str);
- db_execute(buffer_tostring(sql));
-
- buffer_free(sql);
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, TABLE_ACLK_ALERT, uuid_str);
+ rc = db_execute(db_meta, sql);
+ if (unlikely(rc))
+ error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid);
+ else {
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT, uuid_str, uuid_str);
+ rc = db_execute(db_meta, sql);
+ if (unlikely(rc))
+ error_report("Failed to create ACLK alert table index for host %s", host ? string2str(host->hostname) : host_guid);
+ }
+ if (likely(host) && unlikely(host->aclk_sync_host_config))
+ return;
- if (likely(host) && unlikely(host->dbsync_worker))
+ if (unlikely(!host))
return;
- struct aclk_database_worker_config *wc = callocz(1, sizeof(struct aclk_database_worker_config));
+ struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config));
if (node_id && !uuid_is_null(*node_id))
uuid_unparse_lower(*node_id, wc->node_id);
- if (likely(host)) {
- host->dbsync_worker = (void *)wc;
- wc->hostname = strdupz(rrdhost_hostname(host));
- if (node_id && !host->node_id) {
- host->node_id = mallocz(sizeof(*host->node_id));
- uuid_copy(*host->node_id, *node_id);
- }
+
+ host->aclk_sync_host_config = (void *)wc;
+ if (node_id && !host->node_id) {
+ host->node_id = mallocz(sizeof(*host->node_id));
+ uuid_copy(*host->node_id, *node_id);
}
- else
- wc->hostname = get_hostname_by_node_id(wc->node_id);
+
wc->host = host;
strcpy(wc->uuid_str, uuid_str);
- strcpy(wc->host_guid, host_guid);
wc->alert_updates = 0;
- aclk_database_init_cmd_queue(wc);
- aclk_add_worker_thread(wc);
- fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc));
-#else
- UNUSED(host);
- UNUSED(host_uuid);
- UNUSED(node_id);
+ time_t now = now_realtime_sec();
+ wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now;
#endif
-} \ No newline at end of file
+}
+
+#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \
+ "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \
+ "program_version, entries, health_enabled FROM host WHERE hops >0;"
+
+#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \
+ "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; "
+void sql_aclk_sync_init(void)
+{
+ char *err_msg = NULL;
+ int rc;
+
+ if (unlikely(!db_meta)) {
+ if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) {
+ return;
+ }
+ error_report("Database has not been initialized");
+ return;
+ }
+
+ info("Creating archived hosts");
+ int number_of_children = 0;
+ rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_HOSTS, create_host_callback, &number_of_children, &err_msg);
+
+ if (rc != SQLITE_OK) {
+ error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg);
+ sqlite3_free(err_msg);
+ }
+
+ info("Created %d archived hosts", number_of_children);
+ // Trigger host context load for hosts that have been created
+ metadata_queue_load_host_context(NULL);
+
+#ifdef ENABLE_ACLK
+ if (!number_of_children)
+ aclk_queue_node_info(localhost, true);
+
+ rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES,aclk_config_parameters, NULL,&err_msg);
+
+ if (rc != SQLITE_OK) {
+ error_report("SQLite error when configuring host ACLK synchonization parameters, rc = %d (%s)", rc, err_msg);
+ sqlite3_free(err_msg);
+ }
+ aclk_synchronization_init();
+
+ info("ACLK sync initialization completed");
+#endif
+}
+
+// Public
+
+static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1)
+{
+ struct aclk_database_cmd cmd;
+ cmd.opcode = opcode;
+ cmd.param[0] = (void *) param0;
+ cmd.param[1] = (void *) param1;
+ cmd.completion = NULL;
+ aclk_database_enq_cmd(&cmd);
+}
+
+// Public
+void aclk_push_alert_config(const char *node_id, const char *config_hash)
+{
+ if (unlikely(!aclk_sync_config.initialized))
+ return;
+
+ queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash));
+}
+
+void aclk_push_node_alert_snapshot(const char *node_id)
+{
+ if (unlikely(!aclk_sync_config.initialized))
+ return;
+
+ queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, strdupz(node_id), NULL);
+}
+
+
+void aclk_push_node_removed_alerts(const char *node_id)
+{
+ if (unlikely(!aclk_sync_config.initialized))
+ return;
+
+ queue_aclk_sync_cmd(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, strdupz(node_id), NULL);
+}
+
+void schedule_node_info_update(RRDHOST *host __maybe_unused)
+{
+#ifdef ENABLE_ACLK
+ if (unlikely(!host))
+ return;
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_NODE_STATE;
+ cmd.param[0] = host;
+ cmd.completion = NULL;
+ aclk_database_enq_cmd(&cmd);
+#endif
+}
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 208177e4..d555a0ce 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -18,45 +18,6 @@
#define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400)
#define ACLK_SYNC_QUERY_SIZE 512
-struct aclk_completion {
- uv_mutex_t mutex;
- uv_cond_t cond;
- volatile unsigned completed;
-};
-
-static inline void init_aclk_completion(struct aclk_completion *p)
-{
- p->completed = 0;
- fatal_assert(0 == uv_cond_init(&p->cond));
- fatal_assert(0 == uv_mutex_init(&p->mutex));
-}
-
-static inline void destroy_aclk_completion(struct aclk_completion *p)
-{
- uv_cond_destroy(&p->cond);
- uv_mutex_destroy(&p->mutex);
-}
-
-static inline void wait_for_aclk_completion(struct aclk_completion *p)
-{
- uv_mutex_lock(&p->mutex);
- while (0 == p->completed) {
- uv_cond_wait(&p->cond, &p->mutex);
- }
- fatal_assert(1 == p->completed);
- uv_mutex_unlock(&p->mutex);
-}
-
-static inline void aclk_complete(struct aclk_completion *p)
-{
- uv_mutex_lock(&p->mutex);
- p->completed = 1;
- uv_mutex_unlock(&p->mutex);
- uv_cond_broadcast(&p->cond);
-}
-
-extern uv_mutex_t aclk_async_lock;
-
static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
{
uuid_unparse_lower(*uuid, out);
@@ -66,6 +27,12 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
out[23] = '_';
}
+static inline int claimed()
+{
+ return localhost->aclk_state.claimed_id != NULL;
+}
+
+
#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
"alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
"unique(alert_unique_id));"
@@ -74,16 +41,14 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0,
- ACLK_DATABASE_ORPHAN_HOST,
- ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CLEANUP,
ACLK_DATABASE_DELETE_HOST,
- ACLK_DATABASE_NODE_INFO,
+ ACLK_DATABASE_NODE_STATE,
ACLK_DATABASE_PUSH_ALERT,
ACLK_DATABASE_PUSH_ALERT_CONFIG,
ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
+ ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,
ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
- ACLK_DATABASE_NODE_COLLECTORS,
ACLK_DATABASE_TIMER,
// leave this last
@@ -93,10 +58,8 @@ enum aclk_database_opcode {
struct aclk_database_cmd {
enum aclk_database_opcode opcode;
- void *data;
- void *data_param;
- int count;
- struct aclk_completion *completion;
+ void *param[2];
+ struct completion *completion;
};
#define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024)
@@ -106,67 +69,27 @@ struct aclk_database_cmdqueue {
struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE];
};
-struct aclk_database_worker_config {
- uv_thread_t thread;
- char uuid_str[GUID_LEN + 1];
- char node_id[GUID_LEN + 1];
- char host_guid[GUID_LEN + 1];
- char *hostname; // hostname to avoid constant lookups
- time_t cleanup_after; // Start a cleanup after this timestamp
- time_t startup_time; // When the sync thread started
- uint64_t alerts_batch_id; // batch id for alerts to use
- uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
- uint64_t alert_sequence_id; // last alert sequence_id
- int pause_alert_updates;
- uint32_t chart_payload_count;
- uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested
- uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message
- uv_loop_t *loop;
+struct aclk_sync_host_config {
RRDHOST *host;
- uv_async_t async;
- /* FIFO command queue */
- uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
- volatile unsigned queue_size;
- struct aclk_database_cmdqueue cmd_queue;
int alert_updates;
- int node_info_send;
+ int alert_checkpoint_req;
+ int alert_queue_removed;
+ time_t node_info_send_time;
time_t node_collectors_send;
- volatile unsigned is_shutting_down;
- volatile unsigned is_orphan;
- struct aclk_database_worker_config *next;
+ char uuid_str[UUID_STR_LEN];
+ char node_id[UUID_STR_LEN];
+ char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested
};
-static inline RRDHOST *find_host_by_node_id(char *node_id)
-{
- uuid_t node_uuid;
- if (unlikely(!node_id))
- return NULL;
-
- if (uuid_parse(node_id, node_uuid))
- return NULL;
-
- rrd_rdlock();
- RRDHOST *host, *ret = NULL;
- rrdhost_foreach_read(host) {
- if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
- ret = host;
- break;
- }
- }
- rrd_unlock();
-
- return ret;
-}
-
-
extern sqlite3 *db_meta;
-int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
-void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
+int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd);
void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
void sql_aclk_sync_init(void);
-int claimed();
-void aclk_sync_exit_all();
-struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id);
+void aclk_push_alert_config(const char *node_id, const char *config_hash);
+void aclk_push_node_alert_snapshot(const char *node_id);
+void aclk_push_node_health_log(const char *node_id);
+void aclk_push_node_removed_alerts(const char *node_id);
+void schedule_node_info_update(RRDHOST *host);
+
#endif //NETDATA_SQLITE_ACLK_H
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index ce284ebc..62f1df29 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -5,20 +5,20 @@
#ifdef ENABLE_ACLK
#include "../../aclk/aclk_alarm_api.h"
-#include "../../aclk/aclk.h"
#endif
+#define SQL_GET_ALERT_REMOVE_TIME "SELECT when_key FROM health_log_%s WHERE alarm_id = %u " \
+ "AND unique_id > %u AND unique_id < %u " \
+ "AND new_status = -2;"
+
time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) {
sqlite3_stmt *res = NULL;
- int rc = 0;
time_t when = 0;
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \
- "and unique_id > %u and unique_id < %u " \
- "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_GET_ALERT_REMOVE_TIME, uuid_str, alarm_id, after_unique_id, before_unique_id);
- rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
+ int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to find removed gap.");
return 0;
@@ -36,22 +36,26 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after
return when;
}
+#define SQL_UPDATE_FILTERED_ALERT "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u"
+
void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) {
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u", uuid_str, ae->unique_id, unique_id);
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id);
sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL);
ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
}
+#define SQL_SELECT_ALERT_BY_UNIQUE_ID "SELECT hl.unique_id FROM health_log_%s hl, alert_hash ah WHERE hl.unique_id = %u " \
+ "AND hl.config_hash_id = ah.hash_id " \
+ "AND ah.warn IS NULL AND ah.crit IS NULL;"
+
static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char *uuid_str) {
sqlite3_stmt *res = NULL;
int rc = 0;
bool ret = false;
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.unique_id from health_log_%s hl, alert_hash ah where hl.unique_id = %u " \
- "and hl.config_hash_id = ah.hash_id " \
- "and ah.warn is null and ah.crit is null;", uuid_str, unique_id);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_UNIQUE_ID, uuid_str, unique_id);
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -73,12 +77,18 @@ static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char
#define MAX_REMOVED_PERIOD 86400
//decide if some events should be sent or not
+
+#define SQL_SELECT_ALERT_BY_ID "SELECT hl.new_status, hl.config_hash_id, hl.unique_id FROM health_log_%s hl, aclk_alert_%s aa " \
+ "WHERE hl.unique_id = aa.filtered_alert_unique_id " \
+ "AND hl.alarm_id = %u " \
+ "ORDER BY alarm_event_id DESC LIMIT 1;"
+
int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
{
sqlite3_stmt *res = NULL;
- char uuid_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- int send = 1, rc = 0;
+ int send = 1;
if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) {
return 0;
@@ -87,9 +97,6 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
if (unlikely(uuid_is_null(ae->config_hash_id)))
return 0;
- if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
- return 0;
-
char sql[ACLK_SYNC_QUERY_SIZE];
uuid_t config_hash_id;
RRDCALC_STATUS status;
@@ -97,12 +104,9 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
//get the previous sent event of this alarm_id
//base the search on the last filtered event
- snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \
- where hl.unique_id = aa.filtered_alert_unique_id \
- and hl.alarm_id = %u \
- order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_ID, uuid_str, uuid_str, ae->alarm_id);
- rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
+ int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to filter alert events.");
send = 1;
@@ -126,7 +130,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
goto done;
}
- if (uuid_compare(ae->config_hash_id, config_hash_id)) {
+ if (uuid_memcmp(&ae->config_hash_id, &config_hash_id)) {
send = 1;
goto done;
}
@@ -162,6 +166,10 @@ done:
// will replace call to aclk_update_alarm in health/health_log.c
// and handle both cases
+
+#define SQL_QUEUE_ALERT_TO_CLOUD "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
+ "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) ON CONFLICT (alert_unique_id) do nothing;"
+
int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
{
if(!service_running(SERVICE_ACLK))
@@ -182,27 +190,24 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
}
}
- int rc = 0;
- sqlite3_stmt *res_alert = NULL;
- char uuid_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
+ return 0;
+
+ sqlite3_stmt *res_alert = NULL;
+ char sql[ACLK_SYNC_QUERY_SIZE];
- buffer_sprintf(
- sql,
- "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
- "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) on conflict (alert_unique_id) do nothing; ",
- uuid_str);
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str);
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0);
+ int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to store alert event");
- buffer_free(sql);
return 1;
}
- rc = sqlite3_bind_int(res_alert, 1, ae->unique_id);
+ rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@@ -213,16 +218,12 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
}
ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
- struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
- if (wc) {
- wc->pause_alert_updates = 0;
- }
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
bind_fail:
if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
error_report("Failed to reset statement in store alert event, rc = %d", rc);
- buffer_free(sql);
return 0;
}
@@ -254,11 +255,10 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
#endif
}
-void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+void aclk_push_alert_event(struct aclk_sync_host_config *wc)
{
#ifndef ENABLE_ACLK
UNUSED(wc);
- UNUSED(cmd);
#else
int rc;
@@ -278,26 +278,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
- if (wc->alerts_start_seq_id != 0) {
- buffer_sprintf(
- sql,
- "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
- "; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64
- " and date_cloud_ack is null "
- "; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64
- " and date_submitted is null",
- wc->uuid_str,
- wc->alerts_start_seq_id,
- wc->uuid_str,
- wc->alerts_start_seq_id,
- wc->uuid_str,
- wc->alerts_start_seq_id);
- db_execute(buffer_tostring(sql));
- buffer_reset(sql);
- wc->alerts_start_seq_id = 0;
- }
-
- int limit = cmd.count > 0 ? cmd.count : 1;
+ int limit = ACLK_MAX_ALERT_UPDATES;
sqlite3_stmt *res = NULL;
@@ -318,10 +299,15 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str);
- db_execute(buffer_tostring(sql_fix));
- buffer_flush(sql_fix);
- buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str);
- db_execute(buffer_tostring(sql_fix));
+ rc = db_execute(db_meta, buffer_tostring(sql_fix));
+ if (unlikely(rc))
+ error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
+ else {
+ buffer_flush(sql_fix);
+ buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str);
+ if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix))))
+ error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
+ }
buffer_free(sql_fix);
// Try again
@@ -353,8 +339,8 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
- alarm_log.batch_id = wc->alerts_batch_id;
- alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
+ //alarm_log.batch_id = wc->alerts_batch_id;
+ //alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str);
@@ -429,19 +415,23 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
"WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
wc->uuid_str, first_sequence_id, last_sequence_id);
- db_execute(buffer_tostring(sql));
+
+ if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
+ error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host));
+
+ // Mark to do one more check
+ rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+
} else {
if (log_first_sequence_id)
log_access(
- "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
+ "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "",
wc->node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A",
log_first_sequence_id,
- log_last_sequence_id,
- wc->alerts_batch_id);
+ log_last_sequence_id);
log_first_sequence_id = 0;
log_last_sequence_id = 0;
- wc->pause_alert_updates = 1;
}
rc = sqlite3_finalize(res);
@@ -451,8 +441,24 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
freez(claim_id);
buffer_free(sql);
#endif
+}
+
+void aclk_push_alert_events_for_all_hosts(void)
+{
+ RRDHOST *host;
+
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS))
+ continue;
- return;
+ internal_error(true, "ACLK SYNC: Scanning host %s", rrdhost_hostname(host));
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+
+ struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ if (likely(wc))
+ aclk_push_alert_event(wc);
+ }
+ dfe_done(host);
}
void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
@@ -467,137 +473,15 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
"where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
"order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str);
- db_execute(buffer_tostring(sql));
-
- buffer_free(sql);
-
- struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
- if (wc) {
- wc->pause_alert_updates = 0;
- }
-}
-
-void aclk_send_alarm_health_log(char *node_id)
-{
- if (unlikely(!node_id))
- return;
-
- struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id);
-
- if (likely(!wc)) {
- RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host))
- wc = (struct aclk_database_worker_config *)host->dbsync_worker;
- }
-
- if (!wc) {
- log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id);
- return;
- }
-
- log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, wc->hostname ? wc->hostname : "N/A");
-
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
-
- aclk_database_enq_cmd(wc, &cmd);
- return;
-}
-
-void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
-{
- UNUSED(cmd);
-#ifndef ENABLE_ACLK
- UNUSED(wc);
-#else
- int rc;
-
- char *claim_id = get_agent_claimid();
- if (unlikely(!claim_id))
- return;
-
- RRDHOST *host = wc->host;
- if (unlikely(!host)) {
- host = find_host_by_node_id(wc->node_id);
-
- if (unlikely(!host)) {
- log_access(
- "AC [%s (N/A)]: ACLK synchronization thread for %s is not yet linked to HOST.",
- wc->node_id,
- wc->host_guid);
- freez(claim_id);
- return;
- }
- }
-
- uint64_t first_sequence = 0;
- uint64_t last_sequence = 0;
- struct timeval first_timestamp;
- struct timeval last_timestamp;
-
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
-
- sqlite3_stmt *res = NULL;
-
- //TODO: make this better: include info from health log too
- buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \
- "FROM aclk_alert_%s;", wc->uuid_str);
-
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
- if (rc != SQLITE_OK) {
- error_report("Failed to prepare statement to get health log statistics from the database");
- buffer_free(sql);
- freez(claim_id);
- return;
- }
-
- first_timestamp.tv_sec = 0;
- first_timestamp.tv_usec = 0;
- last_timestamp.tv_sec = 0;
- last_timestamp.tv_usec = 0;
-
- while (sqlite3_step_monitored(res) == SQLITE_ROW) {
- first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
- if (sqlite3_column_bytes(res, 1) > 0) {
- first_timestamp.tv_sec = sqlite3_column_int64(res, 1);
- }
-
- last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
- if (sqlite3_column_bytes(res, 3) > 0) {
- last_timestamp.tv_sec = sqlite3_column_int64(res, 3);
- }
- }
-
- struct alarm_log_entries log_entries;
- log_entries.first_seq_id = first_sequence;
- log_entries.first_when = first_timestamp;
- log_entries.last_seq_id = last_sequence;
- log_entries.last_when = last_timestamp;
-
- struct alarm_log_health alarm_log;
- alarm_log.claim_id = claim_id;
- alarm_log.node_id = wc->node_id;
- alarm_log.log_entries = log_entries;
- alarm_log.status = wc->alert_updates == 0 ? 2 : 1;
- alarm_log.enabled = (int)host->health.health_enabled;
-
- wc->alert_sequence_id = last_sequence;
+ netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
- aclk_send_alarm_log_health(&alarm_log);
- log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->hostname ? wc->hostname : "N/A", first_sequence, last_sequence);
+ if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
+ error_report("Failed to queue existing ACLK alert events for host %s", rrdhost_hostname(host));
- rc = sqlite3_finalize(res);
- if (unlikely(rc != SQLITE_OK))
- error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc);
+ netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
- freez(claim_id);
buffer_free(sql);
-
- aclk_alert_reloaded = 1;
-#endif
-
- return;
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
void aclk_send_alarm_configuration(char *config_hash)
@@ -605,22 +489,14 @@ void aclk_send_alarm_configuration(char *config_hash)
if (unlikely(!config_hash))
return;
- struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) localhost->dbsync_worker;
+ struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config;
- if (unlikely(!wc)) {
+ if (unlikely(!wc))
return;
- }
log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG;
- cmd.data_param = (void *) strdupz(config_hash);
- cmd.completion = NULL;
- aclk_database_enq_cmd(wc, &cmd);
-
- return;
+ aclk_push_alert_config(wc->node_id, config_hash);
}
#define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
@@ -628,19 +504,31 @@ void aclk_send_alarm_configuration(char *config_hash)
"options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
"p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;"
-int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
{
- UNUSED(wc);
-#ifndef ENABLE_ACLK
- UNUSED(cmd);
-#else
int rc = 0;
+#ifdef ENABLE_ACLK
+
CHECK_SQLITE_CONNECTION(db_meta);
sqlite3_stmt *res = NULL;
- char *config_hash = (char *) cmd.data_param;
+ struct aclk_sync_host_config *wc = NULL;
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if (unlikely(!host)) {
+ freez(config_hash);
+ freez(node_id);
+ return 1;
+ }
+
+ wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
+ if (unlikely(!wc)) {
+ freez(config_hash);
+ freez(node_id);
+ return 1;
+ }
rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -723,7 +611,6 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct
if (likely(p_alarm_config.cfg_hash)) {
log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
aclk_send_provide_alarm_cfg(&p_alarm_config);
- freez((char *) cmd.data_param);
freez(p_alarm_config.cfg_hash);
destroy_aclk_alarm_configuration(&alarm_config);
}
@@ -735,150 +622,125 @@ bind_fail:
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
- return rc;
+ freez(config_hash);
+ freez(node_id);
#endif
- return 0;
+ return rc;
}
// Start streaming alerts
-void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id)
+void aclk_start_alert_streaming(char *node_id, bool resets)
{
if (unlikely(!node_id))
return;
- //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id);
-
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid))
return;
- struct aclk_database_worker_config *wc = NULL;
RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host)) {
- wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
- (struct aclk_database_worker_config *)host->dbsync_worker :
- (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
- if (unlikely(!host->health.health_enabled)) {
- log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
- return;
- }
+ if (unlikely(!host))
+ return;
- if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1))
- sql_queue_existing_alerts_to_aclk(host);
- } else
- wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
-
- if (likely(wc)) {
- log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id);
- __sync_synchronize();
- wc->alerts_batch_id = batch_id;
- wc->alerts_start_seq_id = start_seq_id;
- wc->alert_updates = 1;
- wc->pause_alert_updates = 0;
- __sync_synchronize();
+ struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+
+ if (unlikely(!wc))
+ return;
+
+ if (unlikely(!host->health.health_enabled)) {
+ log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
+ return;
}
- else
- log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
- return;
+ if (resets) {
+ log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ sql_queue_existing_alerts_to_aclk(host);
+ } else
+ log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+
+ wc->alert_updates = 1;
+ wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
}
-void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
-{
- UNUSED(cmd);
+#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
+ "SELECT unique_id alert_unique_id, UNIXEPOCH(), unique_id alert_unique_id FROM health_log_%s " \
+ "WHERE new_status = -2 AND updated_by_id = 0 AND unique_id NOT IN " \
+ "(SELECT alert_unique_id FROM aclk_alert_%s) " \
+ "AND config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \
+ "ORDER BY unique_id ASC " \
+ "ON CONFLICT (alert_unique_id) DO NOTHING;"
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+void sql_process_queue_removed_alerts_to_aclk(char *node_id)
+{
+ struct aclk_sync_host_config *wc;
+ RRDHOST *host = find_host_by_node_id(node_id);
+ freez(node_id);
- buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
- "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \
- "where new_status = -2 and updated_by_id = 0 and unique_id not in " \
- "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \
- "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+ if (unlikely(!host || !(wc = host->aclk_sync_host_config)))
+ return;
- db_execute(buffer_tostring(sql));
+ char sql[ACLK_SYNC_QUERY_SIZE * 2];
- log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str, wc->uuid_str);
- buffer_free(sql);
+ if (unlikely(db_execute(db_meta, sql))) {
+ log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS FAILED", wc->node_id, rrdhost_hostname(wc->host));
+ error_report("Failed to queue ACLK alert removed entries for host %s", rrdhost_hostname(wc->host));
+ }
+ else
+ log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
- wc->pause_alert_updates = 0;
- return;
+ rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ wc->alert_queue_removed = 0;
}
void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
{
- if (unlikely(!host->dbsync_worker))
+ if (unlikely(!host->aclk_sync_host_config))
return;
- if (!claimed())
+ if (!claimed() || !host->node_id)
return;
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS;
- cmd.data = NULL;
- cmd.data_param = NULL;
- cmd.completion = NULL;
- aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
+ char node_id[UUID_STR_LEN];
+ uuid_unparse_lower(*host->node_id, node_id);
+
+ aclk_push_node_removed_alerts(node_id);
}
-void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
+void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
{
- UNUSED(claim_id);
- if (unlikely(!node_id))
- return;
-
uuid_t node_uuid;
- if (uuid_parse(node_id, node_uuid))
+ if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
- struct aclk_database_worker_config *wc = NULL;
RRDHOST *host = find_host_by_node_id(node_id);
- if (likely(host))
- wc = (struct aclk_database_worker_config *)host->dbsync_worker;
-
- if (likely(wc)) {
- log_access(
- "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64,
- wc->node_id,
- wc->host ? rrdhost_hostname(wc->host) : "N/A",
- snapshot_id,
- sequence_id);
- if (wc->alerts_snapshot_id == snapshot_id)
- return;
- __sync_synchronize();
- wc->alerts_snapshot_id = snapshot_id;
- wc->alerts_ack_sequence_id = sequence_id;
- __sync_synchronize();
-
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT;
- cmd.data_param = NULL;
- cmd.completion = NULL;
- aclk_database_enq_cmd(wc, &cmd);
- } else
- log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
-
- return;
-}
+ if (unlikely(!host)) {
+ log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
+ return;
+ }
-void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
-{
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
- if (alerts_ack_sequence_id != 0) {
- buffer_sprintf(
- sql,
- "UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id <= %" PRIu64 "",
- uuid_str,
- alerts_ack_sequence_id);
- db_execute(buffer_tostring(sql));
+ if (unlikely(!wc)) {
+ log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
+ return;
}
- buffer_free(sql);
+ log_access(
+ "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
+ node_id,
+ wc->host ? rrdhost_hostname(wc->host) : "N/A",
+ snapshot_uuid);
+ if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
+ return;
+ __sync_synchronize();
+ wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
+ __sync_synchronize();
+
+ aclk_push_node_alert_snapshot(node_id);
}
#ifdef ENABLE_ACLK
@@ -949,37 +811,41 @@ static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark)
#endif
#define ALARM_EVENTS_PER_CHUNK 10
-void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
{
-#ifndef ENABLE_ACLK
- UNUSED(wc);
- UNUSED(cmd);
-#else
- UNUSED(cmd);
- // we perhaps we don't need this for snapshots
- if (unlikely(!wc->alert_updates)) {
- log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+#ifdef ENABLE_ACLK
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if (unlikely(!host)) {
+ log_access("AC [%s (N/A)]: Node id not found", node_id);
+ freez(node_id);
return;
}
+ freez(node_id);
- if (unlikely(!wc->host)) {
- error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid);
+ struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+
+ // we perhaps we don't need this for snapshots
+ if (unlikely(!wc->alert_updates)) {
+ log_access(
+ "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.",
+ wc->node_id,
+ wc->host ? rrdhost_hostname(wc->host) : "N/A");
return;
}
- if (unlikely(!wc->alerts_snapshot_id))
+ if (unlikely(!wc->alerts_snapshot_uuid))
return;
char *claim_id = get_agent_claimid();
if (unlikely(!claim_id))
return;
- log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->alerts_snapshot_id);
+ log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
- aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
-
- RRDHOST *host = wc->host;
uint32_t cnt = 0;
+ char uuid_str[UUID_STR_LEN];
+ uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
@@ -995,6 +861,9 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
continue;
+ if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
+ continue;
+
cnt++;
}
@@ -1008,7 +877,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
struct alarm_snapshot alarm_snap;
alarm_snap.node_id = wc->node_id;
alarm_snap.claim_id = claim_id;
- alarm_snap.snapshot_id = wc->alerts_snapshot_id;
+ alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
alarm_snap.chunks = chunks;
alarm_snap.chunk = chunk;
@@ -1024,6 +893,9 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
continue;
+ if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
+ continue;
+
cnt++;
struct alarm_log_entry alarm_log;
@@ -1047,7 +919,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
struct alarm_snapshot alarm_snap;
alarm_snap.node_id = wc->node_id;
alarm_snap.claim_id = claim_id;
- alarm_snap.snapshot_id = wc->alerts_snapshot_id;
+ alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
alarm_snap.chunks = chunks;
alarm_snap.chunk = chunk;
@@ -1061,73 +933,204 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
}
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
- wc->alerts_snapshot_id = 0;
+ wc->alerts_snapshot_uuid = NULL;
freez(claim_id);
#endif
- return;
}
+#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE filtered_alert_unique_id NOT IN (SELECT unique_id FROM health_log_%s);"
+
void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
{
if (!claimed())
return;
- char uuid_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ char sql[512];
+ snprintfz(sql,511,SQL_DELETE_ALERT_ENTRIES, uuid_str, uuid_str);
- buffer_sprintf(sql,"delete from aclk_alert_%s where filtered_alert_unique_id not in "
- " (select unique_id from health_log_%s); ", uuid_str, uuid_str);
-
char *err_msg = NULL;
- int rc = sqlite3_exec_monitored(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg);
+ int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
- error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""",
- uuid_str, err_msg);
+ error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg);
sqlite3_free(err_msg);
}
- buffer_free(sql);
}
+#define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \
+ "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \
+ "FROM aclk_alert_%s WHERE date_submitted IS NULL;"
+
int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
{
int rc;
- struct aclk_database_worker_config *wc = NULL;
- wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ struct aclk_sync_host_config *wc = NULL;
+ wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
if (!wc)
return 1;
proto_alert_status->alert_updates = wc->alert_updates;
- proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ char sql[ACLK_SYNC_QUERY_SIZE];
sqlite3_stmt *res = NULL;
- buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \
- "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \
- "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \
- "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to get alert log status from the database.");
- buffer_free(sql);
return 1;
}
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
- proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
- proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
+ proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
}
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
- buffer_free(sql);
return 0;
}
+
+void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused)
+{
+ if (unlikely(!node_id))
+ return;
+
+ struct aclk_sync_host_config *wc = NULL;
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if (unlikely(!host))
+ return;
+
+ wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
+ if (unlikely(!wc)) {
+ log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
+ return;
+ }
+
+ log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
+
+ wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
+}
+
+typedef struct active_alerts {
+ char *name;
+ char *chart;
+ RRDCALC_STATUS status;
+} active_alerts_t;
+
+static inline int compare_active_alerts(const void * a, const void * b) {
+ active_alerts_t *active_alerts_a = (active_alerts_t *)a;
+ active_alerts_t *active_alerts_b = (active_alerts_t *)b;
+
+ if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) )
+ {
+ return strcmp(active_alerts_a->chart, active_alerts_b->chart);
+ }
+ else
+ return strcmp(active_alerts_a->name, active_alerts_b->name);
+}
+
+void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
+{
+#ifdef ENABLE_ACLK
+ struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ if (unlikely(!wc)) {
+ log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
+ return;
+ }
+
+ //TODO: make sure all pending events are sent.
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) {
+ //postpone checkpoint send
+ wc->alert_checkpoint_req++;
+ log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
+ return;
+ }
+
+ //TODO: lock rc here, or make sure it's called when health decides
+ //count them
+ RRDCALC *rc;
+ uint32_t cnt = 0;
+ size_t len = 0;
+ active_alerts_t *active_alerts = NULL;
+
+ foreach_rrdcalc_in_rrdhost_read(host, rc) {
+ if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
+ continue;
+
+ if (rc->status == RRDCALC_STATUS_WARNING ||
+ rc->status == RRDCALC_STATUS_CRITICAL) {
+
+ cnt++;
+ }
+ }
+ foreach_rrdcalc_in_rrdhost_done(rc);
+
+ if (cnt) {
+ active_alerts = callocz(cnt, sizeof(active_alerts_t));
+ cnt = 0;
+ foreach_rrdcalc_in_rrdhost_read(host, rc) {
+ if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
+ continue;
+
+ if (rc->status == RRDCALC_STATUS_WARNING ||
+ rc->status == RRDCALC_STATUS_CRITICAL) {
+
+ active_alerts[cnt].name = (char *)rrdcalc_name(rc);
+ len += string_strlen(rc->name);
+ active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc);
+ len += string_strlen(rc->chart);
+ active_alerts[cnt].status = rc->status;
+ len++;
+ cnt++;
+ }
+ }
+ foreach_rrdcalc_in_rrdhost_done(rc);
+ }
+
+ BUFFER *alarms_to_hash;
+ if (cnt) {
+ qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
+
+ alarms_to_hash = buffer_create(len, NULL);
+ for (uint32_t i=0;i<cnt;i++) {
+ buffer_strcat(alarms_to_hash, active_alerts[i].name);
+ buffer_strcat(alarms_to_hash, active_alerts[i].chart);
+ if (active_alerts[i].status == RRDCALC_STATUS_WARNING)
+ buffer_strcat(alarms_to_hash, "W");
+ else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL)
+ buffer_strcat(alarms_to_hash, "C");
+ }
+ } else {
+ alarms_to_hash = buffer_create(1, NULL);
+ buffer_strcat(alarms_to_hash, "");
+ len = 0;
+ }
+
+ char hash[SHA256_DIGEST_LENGTH + 1];
+ if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) {
+ hash[SHA256_DIGEST_LENGTH] = 0;
+
+ struct alarm_checkpoint alarm_checkpoint;
+ char *claim_id = get_agent_claimid();
+ alarm_checkpoint.claim_id = claim_id;
+ alarm_checkpoint.node_id = wc->node_id;
+ alarm_checkpoint.checksum = (char *)hash;
+
+ aclk_send_provide_alarm_checkpoint(&alarm_checkpoint);
+ log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
+ } else {
+ log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
+ }
+ wc->alert_checkpoint_req = 0;
+ buffer_free(alarms_to_hash);
+#endif
+}
diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h
index 88a939e8..d7252aad 100644
--- a/database/sqlite/sqlite_aclk_alert.h
+++ b/database/sqlite/sqlite_aclk_alert.h
@@ -5,27 +5,31 @@
extern sqlite3 *db_meta;
+#define SEND_REMOVED_AFTER_HEALTH_LOOPS 3
+#define SEND_CHECKPOINT_AFTER_HEALTH_LOOPS 4
+
struct proto_alert_status {
int alert_updates;
- uint64_t alerts_batch_id;
- uint64_t last_acked_sequence_id;
uint64_t pending_min_sequence_id;
uint64_t pending_max_sequence_id;
uint64_t last_submitted_sequence_id;
};
-int aclk_add_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void aclk_send_alarm_health_log(char *node_id);
-void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
+int aclk_add_alert_event(struct aclk_sync_host_config *wc, struct aclk_database_cmd cmd);
+void aclk_push_alert_event(struct aclk_sync_host_config *wc);
void aclk_send_alarm_configuration (char *config_hash);
-int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id);
+int aclk_push_alert_config_event(char *node_id, char *config_hash);
+void aclk_start_alert_streaming(char *node_id, bool resets);
void sql_queue_removed_alerts_to_aclk(RRDHOST *host);
-void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id);
+void sql_process_queue_removed_alerts_to_aclk(char *node_id);
+void aclk_send_alarm_checkpoint(char *node_id, char *claim_id);
+void aclk_push_alarm_checkpoint(RRDHOST *host);
+
+void aclk_push_alert_snapshot_event(char *node_id);
+void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, char *snapshot_uuid);
int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status);
int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter);
+void aclk_push_alert_events_for_all_hosts(void);
+
#endif //NETDATA_SQLITE_ACLK_ALERT_H
diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c
index afe77499..3817296d 100644
--- a/database/sqlite/sqlite_aclk_node.c
+++ b/database/sqlite/sqlite_aclk_node.c
@@ -25,12 +25,17 @@ DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) {
return dict;
}
-#endif
-void sql_build_node_collectors(struct aclk_database_worker_config *wc)
+static void build_node_collectors(char *node_id __maybe_unused)
{
-#ifdef ENABLE_ACLK
- if (!wc->host)
+
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if (unlikely(!host))
+ return;
+
+ struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
+ if (unlikely(!wc))
return;
struct update_node_collectors upd_node_collectors;
@@ -39,48 +44,51 @@ void sql_build_node_collectors(struct aclk_database_worker_config *wc)
upd_node_collectors.node_id = wc->node_id;
upd_node_collectors.claim_id = get_agent_claimid();
- upd_node_collectors.node_collectors = collectors_from_charts(wc->host, dict);
+ upd_node_collectors.node_collectors = collectors_from_charts(host, dict);
aclk_update_node_collectors(&upd_node_collectors);
dictionary_destroy(dict);
freez(upd_node_collectors.claim_id);
- log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(wc->host));
-#else
- UNUSED(wc);
-#endif
- return;
+ log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", node_id, rrdhost_hostname(host));
+
+ freez(node_id);
}
-void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+static void build_node_info(char *node_id __maybe_unused)
{
- UNUSED(cmd);
-
-#ifdef ENABLE_ACLK
struct update_node_info node_info;
- if (!wc->host) {
- wc->node_info_send = 1;
+ RRDHOST *host = find_host_by_node_id(node_id);
+
+ if (unlikely((!host))) {
+ freez(node_id);
+ return;
+ }
+
+ struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
+
+ if (unlikely(!wc)) {
+ freez(node_id);
return;
}
rrd_rdlock();
node_info.node_id = wc->node_id;
node_info.claim_id = get_agent_claimid();
- node_info.machine_guid = wc->host_guid;
+ node_info.machine_guid = host->machine_guid;
node_info.child = (wc->host != localhost);
- node_info.ml_info.ml_capable = ml_capable(localhost);
+ node_info.ml_info.ml_capable = ml_capable();
node_info.ml_info.ml_enabled = ml_enabled(wc->host);
node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host);
now_realtime_timeval(&node_info.updated_at);
- RRDHOST *host = wc->host;
char *host_version = NULL;
if (host != localhost) {
netdata_mutex_lock(&host->receiver_lock);
- host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : "unknown");
+ host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : rrdhost_program_version(host));
netdata_mutex_unlock(&host->receiver_lock);
}
@@ -91,7 +99,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.data.kernel_name = host->system_info->kernel_name;
node_info.data.kernel_version = host->system_info->kernel_version;
node_info.data.architecture = host->system_info->architecture;
- node_info.data.cpus = host->system_info->host_cores ? str2uint32_t(host->system_info->host_cores) : 0;
+ node_info.data.cpus = host->system_info->host_cores ? str2uint32_t(host->system_info->host_cores, NULL) : 0;
node_info.data.cpu_frequency = host->system_info->host_cpu_freq ? host->system_info->host_cpu_freq : "0";
node_info.data.memory = host->system_info->host_ram_total ? host->system_info->host_ram_total : "0";
node_info.data.disk_space = host->system_info->host_disk_space ? host->system_info->host_disk_space : "0";
@@ -101,7 +109,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.data.virtualization_type = host->system_info->virtualization ? host->system_info->virtualization : "unknown";
node_info.data.container_type = host->system_info->container ? host->system_info->container : "unknown";
node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", "");
- node_info.data.machine_guid = wc->host_guid;
+ node_info.data.machine_guid = host->machine_guid;
struct capability node_caps[] = {
{ .name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled },
@@ -116,7 +124,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.data.host_labels_ptr = host->rrdlabels;
aclk_update_node_info(&node_info);
- log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), wc->host_guid, wc->host == localhost ? "parent" : "child");
+ log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), host->machine_guid, wc->host == localhost ? "parent" : "child");
rrd_unlock();
freez(node_info.claim_id);
@@ -124,9 +132,47 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
freez(host_version);
wc->node_collectors_send = now_realtime_sec();
-#else
- UNUSED(wc);
-#endif
+ freez(node_id);
+
+}
+
- return;
+void aclk_check_node_info_and_collectors(void)
+{
+ RRDHOST *host;
+
+ if (unlikely(!aclk_connected))
+ return;
+
+ size_t pending = 0;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+
+ struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ if (unlikely(!wc))
+ continue;
+
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
+ internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host));
+ pending++;
+ continue;
+ }
+
+ if (wc->node_info_send_time && wc->node_info_send_time + 30 < now_realtime_sec()) {
+ wc->node_info_send_time = 0;
+ build_node_info(strdupz(wc->node_id));
+ internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host));
+ }
+
+ if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) {
+ build_node_collectors(strdupz(wc->node_id));
+ internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host));
+ wc->node_collectors_send = 0;
+ }
+ }
+ dfe_done(host);
+
+ if(pending)
+ info("ACLK: %zu nodes are pending for contexts to load, skipped sending node info for them", pending);
}
+
+#endif
diff --git a/database/sqlite/sqlite_aclk_node.h b/database/sqlite/sqlite_aclk_node.h
index c2c54f8c..6afdf8d7 100644
--- a/database/sqlite/sqlite_aclk_node.h
+++ b/database/sqlite/sqlite_aclk_node.h
@@ -3,6 +3,5 @@
#ifndef NETDATA_SQLITE_ACLK_NODE_H
#define NETDATA_SQLITE_ACLK_NODE_H
-void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void sql_build_node_collectors(struct aclk_database_worker_config *wc);
+void aclk_check_node_info_and_collectors(void);
#endif //NETDATA_SQLITE_ACLK_NODE_H
diff --git a/database/sqlite/sqlite_context.c b/database/sqlite/sqlite_context.c
index 892292cc..b72726dc 100644
--- a/database/sqlite/sqlite_context.c
+++ b/database/sqlite/sqlite_context.c
@@ -117,7 +117,6 @@ void sql_close_context_database(void)
rc = sqlite3_close_v2(db_context_meta);
if (unlikely(rc != SQLITE_OK))
error_report("Error %d while closing the context SQLite database, %s", rc, sqlite3_errstr(rc));
- return;
}
//
@@ -243,8 +242,6 @@ failed:
rc = sqlite3_reset(res);
if (rc != SQLITE_OK)
error_report("Failed to reset statement that fetches chart label data, rc = %d", rc);
-
- return;
}
// CONTEXT LIST
@@ -372,9 +369,9 @@ int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
goto skip_store;
}
- rc = sqlite3_bind_int(res, 10, (time_t) context_data->deleted);
+ rc = sqlite3_bind_int(res, 10, context_data->deleted);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind last_time_t to store context details");
+ error_report("Failed to bind deleted flag to store context details");
goto skip_store;
}
@@ -431,11 +428,11 @@ int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
if (rc_stored != SQLITE_DONE)
error_report("Failed to delete context %s, rc = %d", context_data->id, rc_stored);
#ifdef NETDATA_INTERNAL_CHECKS
- else {
- char host_uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(*host_uuid, host_uuid_str);
- info("%s: Deleted context %s under host %s", __FUNCTION__ , context_data->id, host_uuid_str);
- }
+ else {
+ char host_uuid_str[UUID_STR_LEN];
+ uuid_unparse_lower(*host_uuid, host_uuid_str);
+ info("%s: Deleted context %s under host %s", __FUNCTION__, context_data->id, host_uuid_str);
+ }
#endif
skip_delete:
@@ -449,6 +446,10 @@ skip_delete:
int sql_context_cache_stats(int op)
{
int count, dummy;
+
+ if (unlikely(!db_context_meta))
+ return 0;
+
netdata_thread_disable_cancelability();
sqlite3_db_status(db_context_meta, op, &count, &dummy, 0);
netdata_thread_enable_cancelability();
@@ -489,6 +490,8 @@ int ctx_unittest(void)
uuid_t host_uuid;
uuid_generate(host_uuid);
+ initialize_thread_key_pool();
+
int rc = sql_init_context_database(1);
if (rc != SQLITE_OK)
@@ -556,6 +559,7 @@ int ctx_unittest(void)
freez((void *)context_data.title);
freez((void *)context_data.chart_type);
freez((void *)context_data.family);
+ freez((void *)context_data.units);
// The list should be empty
info("List context start after delete");
diff --git a/database/sqlite/sqlite_db_migration.c b/database/sqlite/sqlite_db_migration.c
index 8b1d0159..3132ae2d 100644
--- a/database/sqlite/sqlite_db_migration.c
+++ b/database/sqlite/sqlite_db_migration.c
@@ -7,7 +7,7 @@ static int return_int_cb(void *data, int argc, char **argv, char **column)
int *status = data;
UNUSED(argc);
UNUSED(column);
- *status = str2uint32_t(argv[0]);
+ *status = str2uint32_t(argv[0], NULL);
return 0;
}
@@ -49,7 +49,7 @@ static int column_exists_in_table(const char *table, const char *column)
}
const char *database_migrate_v1_v2[] = {
- "ALTER TABLE host ADD hops INTEGER;",
+ "ALTER TABLE host ADD hops INTEGER NOT NULL DEFAULT 0;",
NULL
};
diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c
index 1d03cfc2..2fca2dfc 100644
--- a/database/sqlite/sqlite_functions.c
+++ b/database/sqlite/sqlite_functions.c
@@ -49,7 +49,6 @@ const char *database_config[] = {
const char *database_cleanup[] = {
"DELETE FROM chart WHERE chart_id NOT IN (SELECT chart_id FROM dimension);",
"DELETE FROM host WHERE host_id NOT IN (SELECT host_id FROM chart);",
- "DELETE FROM chart_label WHERE chart_id NOT IN (SELECT chart_id FROM chart);",
"DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);",
"DELETE FROM host_info WHERE host_id NOT IN (SELECT host_id FROM host);",
"DELETE FROM host_label WHERE host_id NOT IN (SELECT host_id FROM host);",
@@ -117,7 +116,6 @@ int execute_insert(sqlite3_stmt *res)
break;
}
}
-
return rc;
}
@@ -156,6 +154,12 @@ static void release_statement(void *statement)
error_report("Failed to finalize statement, rc = %d", rc);
}
+void initialize_thread_key_pool(void)
+{
+ for (int i = 0; i < MAX_PREPARED_STATEMENTS; i++)
+ (void)pthread_key_create(&key_pool[i], release_statement);
+}
+
int prepare_statement(sqlite3 *database, const char *query, sqlite3_stmt **statement)
{
static __thread uint32_t keys_used = 0;
@@ -448,8 +452,7 @@ int sql_init_database(db_check_action_type_t rebuild, int memory)
info("SQLite database initialization completed");
- for (int i = 0; i < MAX_PREPARED_STATEMENTS; i++)
- (void)pthread_key_create(&key_pool[i], release_statement);
+ initialize_thread_key_pool();
rc = sqlite3_create_function(db_meta, "u2h", 1, SQLITE_ANY | SQLITE_DETERMINISTIC, 0, sqlite_uuid_parse, 0, 0);
if (unlikely(rc != SQLITE_OK))
@@ -505,14 +508,15 @@ skip:
error_report("Failed to finalize statement %s, rc = %d", sql, rc);
return result;
}
-
-void db_execute(const char *cmd)
+// Return 0 OK
+// Return 1 Failed
+int db_execute(sqlite3 *db, const char *cmd)
{
int rc;
int cnt = 0;
while (cnt < SQL_MAX_RETRY) {
char *err_msg;
- rc = sqlite3_exec_monitored(db_meta, cmd, 0, 0, &err_msg);
+ rc = sqlite3_exec_monitored(db, cmd, 0, 0, &err_msg);
if (rc != SQLITE_OK) {
error_report("Failed to execute '%s', rc = %d (%s) -- attempt %d", cmd, rc, err_msg, cnt);
sqlite3_free(err_msg);
@@ -527,6 +531,7 @@ void db_execute(const char *cmd)
++cnt;
}
+ return (rc != SQLITE_OK);
}
static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id)
@@ -540,7 +545,7 @@ static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id)
return;
}
- struct aclk_database_worker_config *wc = host->dbsync_worker;
+ struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
if (unlikely(!host->node_id))
host->node_id = mallocz(sizeof(*host->node_id));
@@ -594,7 +599,7 @@ int update_node_id(uuid_t *host_id, uuid_t *node_id)
rrd_wrlock();
host = rrdhost_find_by_guid(host_guid);
if (likely(host))
- set_host_node_id(host, node_id);
+ set_host_node_id(host, node_id);
rrd_unlock();
failed:
@@ -604,48 +609,6 @@ failed:
return rc - 1;
}
-#define SQL_SELECT_HOSTNAME_BY_NODE_ID "SELECT h.hostname FROM node_instance ni, " \
-"host h WHERE ni.host_id = h.host_id AND ni.node_id = @node_id;"
-
-char *get_hostname_by_node_id(char *node)
-{
- sqlite3_stmt *res = NULL;
- char *hostname = NULL;
- int rc;
-
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
- error_report("Database has not been initialized");
- return NULL;
- }
-
- uuid_t node_id;
- if (uuid_parse(node, node_id))
- return NULL;
-
- rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOSTNAME_BY_NODE_ID, -1, &res, 0);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement to fetch hostname by node id");
- return NULL;
- }
-
- rc = sqlite3_bind_blob(res, 1, &node_id, sizeof(node_id), SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id parameter to select node instance information");
- goto failed;
- }
-
- rc = sqlite3_step_monitored(res);
- if (likely(rc == SQLITE_ROW))
- hostname = strdupz((char *)sqlite3_column_text(res, 0));
-
-failed:
- if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when search for hostname by node id");
-
- return hostname;
-}
-
#define SQL_SELECT_HOST_BY_NODE_ID "select host_id from node_instance where node_id = @node_id;"
int get_host_id(uuid_t *node_id, uuid_t *host_id)
@@ -684,7 +647,7 @@ failed:
return (rc == SQLITE_ROW) ? 0 : -1;
}
-#define SQL_SELECT_NODE_ID "select node_id from node_instance where host_id = @host_id and node_id not null;"
+#define SQL_SELECT_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id AND node_id IS NOT NULL;"
int get_node_id(uuid_t *host_id, uuid_t *node_id)
{
@@ -720,8 +683,8 @@ failed:
return (rc == SQLITE_ROW) ? 0 : -1;
}
-#define SQL_INVALIDATE_NODE_INSTANCES "update node_instance set node_id = NULL where exists " \
- "(select host_id from node_instance where host_id = @host_id and (@claim_id is null or claim_id <> @claim_id));"
+#define SQL_INVALIDATE_NODE_INSTANCES "UPDATE node_instance SET node_id = NULL WHERE EXISTS " \
+ "(SELECT host_id FROM node_instance WHERE host_id = @host_id AND (@claim_id IS NULL OR claim_id <> @claim_id));"
void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id)
{
@@ -765,8 +728,8 @@ failed:
error_report("Failed to finalize the prepared statement when invalidating node instance information");
}
-#define SQL_GET_NODE_INSTANCE_LIST "select ni.node_id, ni.host_id, h.hostname " \
- "from node_instance ni, host h where ni.host_id = h.host_id;"
+#define SQL_GET_NODE_INSTANCE_LIST "SELECT ni.node_id, ni.host_id, h.hostname " \
+ "FROM node_instance ni, host h WHERE ni.host_id = h.host_id AND h.hops >=0;"
struct node_instance_list *get_node_list(void)
{
@@ -805,13 +768,18 @@ struct node_instance_list *get_node_list(void)
uuid_copy(node_list[row].node_id, *((uuid_t *)sqlite3_column_blob(res, 0)));
if (sqlite3_column_bytes(res, 1) == sizeof(uuid_t)) {
uuid_t *host_id = (uuid_t *)sqlite3_column_blob(res, 1);
- uuid_copy(node_list[row].host_id, *host_id);
- node_list[row].queryable = 1;
uuid_unparse_lower(*host_id, host_guid);
RRDHOST *host = rrdhost_find_by_guid(host_guid);
- node_list[row].live = host && (host == localhost || host->receiver) ? 1 : 0;
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) {
+ info("ACLK: 'host:%s' skipping get node list because context is initializing", rrdhost_hostname(host));
+ continue;
+ }
+ uuid_copy(node_list[row].host_id, *host_id);
+ node_list[row].queryable = 1;
+ node_list[row].live = (host && (host == localhost || host->receiver
+ || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)))) ? 1 : 0;
node_list[row].hops = (host && host->system_info) ? host->system_info->hops :
- uuid_compare(*host_id, localhost->host_uuid) ? 1 : 0;
+ uuid_memcmp(host_id, &localhost->host_uuid) ? 1 : 0;
node_list[row].hostname =
sqlite3_column_bytes(res, 2) ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
}
@@ -950,6 +918,10 @@ int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_b
int sql_metadata_cache_stats(int op)
{
int count, dummy;
+
+ if (unlikely(!db_meta))
+ return 0;
+
netdata_thread_disable_cancelability();
sqlite3_db_status(db_meta, op, &count, &dummy, 0);
netdata_thread_enable_cancelability();
diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h
index 40abd010..ee63a397 100644
--- a/database/sqlite/sqlite_functions.h
+++ b/database/sqlite/sqlite_functions.h
@@ -55,7 +55,8 @@ int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_b
int prepare_statement(sqlite3 *database, const char *query, sqlite3_stmt **statement);
int execute_insert(sqlite3_stmt *res);
int exec_statement_with_uuid(const char *sql, uuid_t *uuid);
-void db_execute(const char *cmd);
+int db_execute(sqlite3 *database, const char *cmd);
+void initialize_thread_key_pool(void);
// Look up functions
int get_node_id(uuid_t *host_id, uuid_t *node_id);
diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c
index 471fa3ad..dd08f63e 100644
--- a/database/sqlite/sqlite_health.c
+++ b/database/sqlite/sqlite_health.c
@@ -12,7 +12,7 @@
#define SQL_CREATE_HEALTH_LOG_TABLE(guid) "CREATE TABLE IF NOT EXISTS health_log_%s(hostname text, unique_id int, alarm_id int, alarm_event_id int, config_hash_id blob, updated_by_id int, updates_id int, when_key int, duration int, non_clear_duration int, flags int, exec_run_timestamp int, delay_up_to_timestamp int, name text, chart text, family text, exec text, recipient text, source text, units text, info text, exec_code int, new_status real, old_status real, delay int, new_value double, old_value double, last_repeat int, class text, component text, type text, chart_context text);", guid
int sql_create_health_log_table(RRDHOST *host) {
int rc;
- char *err_msg = NULL, command[MAX_HEALTH_SQL_SIZE + 1];
+ char command[MAX_HEALTH_SQL_SIZE + 1];
if (unlikely(!db_meta)) {
if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
@@ -25,18 +25,17 @@ int sql_create_health_log_table(RRDHOST *host) {
snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_CREATE_HEALTH_LOG_TABLE(uuid_str));
- rc = sqlite3_exec_monitored(db_meta, command, 0, 0, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("HEALTH [%s]: SQLite error during creation of health log table, rc = %d (%s)", rrdhost_hostname(host), rc, err_msg);
- sqlite3_free(err_msg);
- return 1;
+ rc = db_execute(db_meta, command);
+ if (unlikely(rc))
+ error_report("HEALTH [%s]: SQLite error during creation of health log table", rrdhost_hostname(host));
+ else {
+ snprintfz(command, MAX_HEALTH_SQL_SIZE, "CREATE INDEX IF NOT EXISTS health_log_index_%s ON health_log_%s (unique_id); ", uuid_str, uuid_str);
+ rc = db_execute(db_meta, command);
+ if (unlikely(unlikely(rc)))
+ error_report("HEALTH [%s]: SQLite error during creation of health log table index", rrdhost_hostname(host));
}
- snprintfz(command, MAX_HEALTH_SQL_SIZE, "CREATE INDEX IF NOT EXISTS "
- "health_log_index_%s ON health_log_%s (unique_id); ", uuid_str, uuid_str);
- db_execute(command);
-
- return 0;
+ return rc;
}
/* Health related SQL queries
@@ -104,7 +103,7 @@ void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae) {
error_report("HEALTH [%s]: Failed to update health log, rc = %d", rrdhost_hostname(host), rc);
}
- failed:
+failed:
if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
error_report("HEALTH [%s]: Failed to finalize the prepared statement for updating health log.", rrdhost_hostname(host));
}
@@ -345,7 +344,7 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) {
ae->flags |= HEALTH_ENTRY_FLAG_SAVED;
host->health.health_log_entries_written++;
- failed:
+failed:
if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
error_report("HEALTH [%s]: Failed to finalize the prepared statement for inserting to health log.", rrdhost_hostname(host));
}
@@ -452,7 +451,7 @@ void sql_health_alarm_log_count(RRDHOST *host) {
#define SQL_INJECT_REMOVED_UPDATE(guid) "update health_log_%s set flags = flags | ?1, updated_by_id = ?2 where unique_id = ?3; ", guid
void sql_inject_removed_status(char *uuid_str, uint32_t alarm_id, uint32_t alarm_event_id, uint32_t unique_id, uint32_t max_unique_id)
{
- int rc = 0;
+ int rc;
char command[MAX_HEALTH_SQL_SIZE + 1];
if (!alarm_id || !alarm_event_id || !unique_id || !max_unique_id)
@@ -546,7 +545,7 @@ failed:
#define SQL_SELECT_MAX_UNIQUE_ID(guid) "SELECT MAX(unique_id) from health_log_%s", guid
uint32_t sql_get_max_unique_id (char *uuid_str)
{
- int rc = 0;
+ int rc;
char command[MAX_HEALTH_SQL_SIZE + 1];
uint32_t max_unique_id = 0;
@@ -573,10 +572,9 @@ uint32_t sql_get_max_unique_id (char *uuid_str)
#define SQL_SELECT_LAST_STATUSES(guid) "SELECT new_status, unique_id, alarm_id, alarm_event_id from health_log_%s group by alarm_id having max(alarm_event_id)", guid
void sql_check_removed_alerts_state(char *uuid_str)
{
- int rc = 0;
+ int rc;
char command[MAX_HEALTH_SQL_SIZE + 1];
- RRDCALC_STATUS status;
- uint32_t alarm_id = 0, alarm_event_id = 0, unique_id = 0, max_unique_id = 0;
+ uint32_t max_unique_id = 0;
sqlite3_stmt *res = NULL;
@@ -588,6 +586,9 @@ void sql_check_removed_alerts_state(char *uuid_str)
}
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ uint32_t alarm_id, alarm_event_id, unique_id;
+ RRDCALC_STATUS status;
+
status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
unique_id = (uint32_t) sqlite3_column_int64(res, 1);
alarm_id = (uint32_t) sqlite3_column_int64(res, 2);
@@ -683,8 +684,7 @@ void sql_health_alarm_log_load(RRDHOST *host) {
}
// Check if we got last_repeat field
- time_t last_repeat = 0;
- last_repeat = (time_t)sqlite3_column_int64(res, 27);
+ time_t last_repeat = (time_t)sqlite3_column_int64(res, 27);
rc = dictionary_get(all_rrdcalcs, (char *) sqlite3_column_text(res, 14));
if(unlikely(rc)) {
@@ -847,196 +847,161 @@ int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg)
}
}
- param++;
- rc = sqlite3_bind_blob(res, param, hash_id, sizeof(*hash_id), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, hash_id, sizeof(*hash_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->alarm, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->alarm, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->template_key, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->template_key, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->on, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->on, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->classification, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->classification, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->component, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->component, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->type, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->type, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->os, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->os, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->host, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->host, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->lookup, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->lookup, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->every, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->every, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->units, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->units, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->calc, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->calc, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->families, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->families, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->plugin, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->plugin, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->module, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->module, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->charts, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->charts, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->green, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->green, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->red, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->red, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->warn, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->warn, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->crit, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->crit, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->exec, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->exec, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->to, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->to, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->info, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->info, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->delay, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->delay, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->options, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->options, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->repeat, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->repeat, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->host_labels, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->host_labels, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
if (cfg->p_db_lookup_after) {
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_dimensions, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_dimensions, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_method, param);
+ rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_method, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 31, cfg->p_db_lookup_options);
+ rc = sqlite3_bind_int(res, ++param, (int) cfg->p_db_lookup_options);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 32, cfg->p_db_lookup_after);
+ rc = sqlite3_bind_int(res, ++param, (int) cfg->p_db_lookup_after);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 33, cfg->p_db_lookup_before);
+ rc = sqlite3_bind_int(res, ++param, (int) cfg->p_db_lookup_before);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
} else {
- param++;
- rc = sqlite3_bind_null(res, 29);
+ rc = sqlite3_bind_null(res, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_null(res, 30);
+
+ rc = sqlite3_bind_null(res, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_null(res, 31);
+
+ rc = sqlite3_bind_null(res, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_null(res, 32);
+
+ rc = sqlite3_bind_null(res, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_null(res, 33);
+
+ rc = sqlite3_bind_null(res, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
}
- param++;
- rc = sqlite3_bind_int(res, 34, cfg->p_update_every);
+ rc = sqlite3_bind_int(res, ++param, cfg->p_update_every);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@@ -1050,7 +1015,7 @@ int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg)
return 0;
- bind_fail:
+bind_fail:
error_report("Failed to bind parameter %d to store alert hash_id, rc = %d", param, rc);
rc = sqlite3_reset(res);
if (unlikely(rc != SQLITE_OK))
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 35f928ff..607d789a 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -64,9 +64,11 @@ enum metadata_opcode {
METADATA_STORE_CLAIM_ID,
METADATA_ADD_HOST_INFO,
METADATA_SCAN_HOSTS,
+ METADATA_LOAD_HOST_CONTEXT,
METADATA_MAINTENANCE,
METADATA_SYNC_SHUTDOWN,
METADATA_UNITTEST,
+ METADATA_ML_LOAD_MODELS,
// leave this last
// we need it to check for worker utilization
METADATA_MAX_ENUMERATIONS_DEFINED
@@ -154,19 +156,43 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value
return 1;
}
-static void check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer)
+#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;"
+#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;"
+
+static void clean_old_chart_labels(RRDSET *st)
+{
+ char sql[512];
+ time_t first_time_s = rrdset_first_entry_s(st);
+
+ if (unlikely(!first_time_s))
+ snprintfz(sql, 511,SQL_DELETE_CHART_LABEL);
+ else
+ snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
+
+ int rc = exec_statement_with_uuid(sql, &st->chart_uuid);
+ if (unlikely(rc))
+ error_report("METADATA: 'host:%s' Failed to clean old labels for chart %s", rrdhost_hostname(st->rrdhost), rrdset_name(st));
+}
+
+static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter)
{
size_t old_version = st->rrdlabels_last_saved_version;
size_t new_version = dictionary_version(st->rrdlabels);
- if(new_version != old_version) {
- buffer_flush(work_buffer);
- struct query_build tmp = {.sql = work_buffer, .count = 0};
- uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
- rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
+ if (new_version == old_version)
+ return 0;
+
+ struct query_build tmp = {.sql = work_buffer, .count = 0};
+ uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
+ rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
+ int rc = db_execute(db_meta, buffer_tostring(work_buffer));
+ if (likely(!rc)) {
st->rrdlabels_last_saved_version = new_version;
- db_execute(buffer_tostring(work_buffer));
+ (*query_counter)++;
}
+
+ clean_old_chart_labels(st);
+ return rc;
}
// Migrate all hosts with hops zero to this host_uuid
@@ -177,12 +203,13 @@ void migrate_localhost(uuid_t *host_uuid)
rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid);
if (!rc)
rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid);
- if (!rc)
- db_execute(DELETE_MISSING_NODE_INSTANCES);
-
+ if (!rc) {
+ if (unlikely(db_execute(db_meta, DELETE_MISSING_NODE_INSTANCES)))
+ error_report("Failed to remove deleted hosts from node instances");
+ }
}
-static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
+static int store_claim_id(uuid_t *host_id, uuid_t *claim_id)
{
sqlite3_stmt *res = NULL;
int rc;
@@ -190,18 +217,18 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
if (unlikely(!db_meta)) {
if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
error_report("Database has not been initialized");
- return;
+ return 1;
}
rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement store chart labels");
- return;
+ error_report("Failed to prepare statement to store host claim id");
+ return 1;
}
rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id parameter to store node instance information");
+ error_report("Failed to bind host_id parameter to store claim id");
goto failed;
}
@@ -210,17 +237,19 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
else
rc = sqlite3_bind_null(res, 2);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind claim_id parameter to store node instance information");
+ error_report("Failed to bind claim_id parameter to host claim id");
goto failed;
}
rc = execute_insert(res);
if (unlikely(rc != SQLITE_DONE))
- error_report("Failed to store node instance information, rc = %d", rc);
+ error_report("Failed to store host claim id rc = %d", rc);
failed:
if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when storing node instance information");
+ error_report("Failed to finalize the prepared statement when storing a host claim id");
+
+ return rc != SQLITE_DONE;
}
static void delete_dimension_uuid(uuid_t *dimension_uuid)
@@ -252,7 +281,7 @@ skip_execution:
//
// Store host and host system info information in the database
-static int sql_store_host_info(RRDHOST *host)
+static int store_host_metadata(RRDHOST *host)
{
static __thread sqlite3_stmt *res = NULL;
int rc, param = 0;
@@ -340,7 +369,7 @@ static int sql_store_host_info(RRDHOST *host)
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
- return !(store_rc == SQLITE_DONE);
+ return store_rc != SQLITE_DONE;
bind_fail:
error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc);
rc = sqlite3_reset(res);
@@ -349,7 +378,7 @@ bind_fail:
return 1;
}
-static void sql_store_host_system_info_key_value(const char *name, const char *value, void *data)
+static void add_host_sysinfo_key_value(const char *name, const char *value, void *data)
{
struct query_build *lb = data;
@@ -365,44 +394,43 @@ static void sql_store_host_system_info_key_value(const char *name, const char *v
lb->count++;
}
-static BUFFER *sql_store_host_system_info(RRDHOST *host)
+static bool build_host_system_info_statements(RRDHOST *host, BUFFER *work_buffer)
{
struct rrdhost_system_info *system_info = host->system_info;
if (unlikely(!system_info))
- return NULL;
-
- BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ return false;
+ buffer_flush(work_buffer);
struct query_build key_data = {.sql = work_buffer, .count = 0};
uuid_unparse_lower(host->host_uuid, key_data.uuid_str);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data);
-
- return work_buffer;
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data);
+
+ return true;
}
@@ -410,13 +438,10 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host)
* Store a chart in the database
*/
-static int sql_store_chart(
- uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family,
- const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority,
- int update_every, int chart_type, int memory_mode, long history_entries)
+static int store_chart_metadata(RRDSET *st)
{
static __thread sqlite3_stmt *res = NULL;
- int rc, param = 0;
+ int rc, param = 0, store_rc = 0;
if (unlikely(!db_meta)) {
if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
@@ -433,98 +458,83 @@ static int sql_store_chart(
}
}
- param++;
- rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &st->chart_uuid, sizeof(st->chart_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &st->rrdhost->host_uuid, sizeof(st->rrdhost->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(st->parts.type), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(st->parts.id), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
+ const char *name = string2str(st->parts.name);
if (name && *name)
- rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
else
- rc = sqlite3_bind_null(res, 5);
+ rc = sqlite3_bind_null(res, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_family(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_context(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_title(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_units(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_plugin_name(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_module_name(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 12, (int) priority);
+ rc = sqlite3_bind_int(res, ++param, (int) st->priority);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 13, update_every);
+ rc = sqlite3_bind_int(res, ++param, st->update_every);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 14, chart_type);
+ rc = sqlite3_bind_int(res, ++param, st->chart_type);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 15, memory_mode);
+ rc = sqlite3_bind_int(res, ++param, st->rrd_memory_mode);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 16, (int) history_entries);
+ rc = sqlite3_bind_int(res, ++param, (int) st->entries);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = execute_insert(res);
- if (unlikely(rc != SQLITE_DONE))
- error_report("Failed to store chart, rc = %d", rc);
+ store_rc = execute_insert(res);
+ if (unlikely(store_rc != SQLITE_DONE))
+ error_report("Failed to store chart, rc = %d", store_rc);
rc = sqlite3_reset(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement in chart store function, rc = %d", rc);
- return 0;
+ return store_rc != SQLITE_DONE;
bind_fail:
error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc);
@@ -537,9 +547,7 @@ bind_fail:
/*
* Store a dimension
*/
-static int sql_store_dimension(
- uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier,
- collected_number divisor, int algorithm, bool hidden)
+static int store_dimension_metadata(RRDDIM *rd)
{
static __thread sqlite3_stmt *res = NULL;
int rc, param = 0;
@@ -559,35 +567,35 @@ static int sql_store_dimension(
}
}
- rc = sqlite3_bind_blob(res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &rd->metric_uuid, sizeof(rd->metric_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_blob(res, ++param, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &rd->rrdset->chart_uuid, sizeof(rd->rrdset->chart_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(rd->id), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(rd->name), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int) multiplier);
+ rc = sqlite3_bind_int(res, ++param, (int) rd->multiplier);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int ) divisor);
+ rc = sqlite3_bind_int(res, ++param, (int ) rd->divisor);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, algorithm);
+ rc = sqlite3_bind_int(res, ++param, rd->algorithm);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- if (hidden)
+ if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC);
else
rc = sqlite3_bind_null(res, ++param);
@@ -700,7 +708,6 @@ static void cleanup_health_log(void)
//
// EVENT LOOP STARTS HERE
//
-static uv_mutex_t metadata_async_lock;
static void metadata_init_cmd_queue(struct metadata_wc *wc)
{
@@ -856,6 +863,7 @@ static void start_metadata_cleanup(uv_work_t *req)
struct metadata_wc *wc = req->data;
check_dimension_metadata(wc);
cleanup_health_log();
+ (void) sqlite3_wal_checkpoint(db_meta, NULL);
worker_is_idle();
}
@@ -863,9 +871,125 @@ struct scan_metadata_payload {
uv_work_t request;
struct metadata_wc *wc;
struct completion *completion;
+ BUFFER *work_buffer;
uint32_t max_count;
};
+struct host_context_load_thread {
+ uv_thread_t thread;
+ RRDHOST *host;
+ bool busy;
+ bool finished;
+};
+
+static void restore_host_context(void *arg)
+{
+ struct host_context_load_thread *hclt = arg;
+ RRDHOST *host = hclt->host;
+
+ usec_t started_ut = now_monotonic_usec(); (void)started_ut;
+ rrdhost_load_rrdcontext_data(host);
+ usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
+
+#ifdef ENABLE_ACLK
+ aclk_queue_node_info(host, false);
+#endif
+
+ internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host),
+ (double)(ended_ut - started_ut) / USEC_PER_MS);
+
+ __atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE);
+}
+
+// Callback after scan of hosts is done
+static void after_start_host_load_context(uv_work_t *req, int status __maybe_unused)
+{
+ struct scan_metadata_payload *data = req->data;
+ freez(data);
+}
+
+#define MAX_FIND_THREAD_RETRIES (10)
+
+static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait)
+{
+ for (size_t index = 0; index < max_thread_slots; index++) {
+ if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED)
+ || (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) {
+ int rc = uv_thread_join(&(hclt[index].thread));
+ if (rc)
+ error("Failed to join thread, rc = %d",rc);
+ __atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE);
+ __atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE);
+ }
+ }
+}
+
+static size_t find_available_thread_slot(struct host_context_load_thread *hclt, size_t max_thread_slots, size_t *found_index)
+{
+ size_t retries = MAX_FIND_THREAD_RETRIES;
+ while (retries--) {
+ size_t index = 0;
+ while (index < max_thread_slots) {
+ if (false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) {
+ *found_index = index;
+ return true;
+ }
+ index++;
+ }
+ sleep_usec(10 * USEC_PER_MS);
+ }
+ return false;
+}
+
+static void start_all_host_load_context(uv_work_t *req __maybe_unused)
+{
+ register_libuv_worker_jobs();
+
+ struct scan_metadata_payload *data = req->data;
+ UNUSED(data);
+
+ worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD);
+ usec_t started_ut = now_monotonic_usec(); (void)started_ut;
+
+ RRDHOST *host;
+
+ size_t max_threads = MIN(get_netdata_cpus() / 2, 6);
+ struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt));
+
+ size_t thread_index;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) ||
+ !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
+ continue;
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
+ internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host));
+
+ cleanup_finished_threads(hclt, max_threads, false);
+ bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
+
+ if (unlikely(!found_slot)) {
+ struct host_context_load_thread hclt_sync = {.host = host};
+ restore_host_context(&hclt_sync);
+ }
+ else {
+ __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
+ hclt[thread_index].host = host;
+ assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
+ }
+ }
+ dfe_done(host);
+
+ cleanup_finished_threads(hclt, max_threads, true);
+ freez(hclt);
+ usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
+ internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
+
+ worker_is_idle();
+}
+
// Callback after scan of hosts is done
static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
{
@@ -881,13 +1005,15 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
freez(data);
}
-static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) {
+static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_transaction, BUFFER *work_buffer, size_t *query_counter) {
RRDSET *st;
int rc;
bool more_to_do = false;
uint32_t scan_count = 1;
- BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+
+ if (use_transaction)
+ (void)db_execute(db_meta, "BEGIN TRANSACTION;");
rrdset_foreach_reentrant(st, host) {
if (scan_count == max_count) {
@@ -900,27 +1026,16 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_
rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
scan_count++;
- check_and_update_chart_labels(st, work_buffer);
-
- rc = sql_store_chart(
- &st->chart_uuid,
- &st->rrdhost->host_uuid,
- string2str(st->parts.type),
- string2str(st->parts.id),
- string2str(st->parts.name),
- rrdset_family(st),
- rrdset_context(st),
- rrdset_title(st),
- rrdset_units(st),
- rrdset_plugin_name(st),
- rrdset_module_name(st),
- st->priority,
- st->update_every,
- st->chart_type,
- st->rrd_memory_mode,
- st->entries);
+ buffer_flush(work_buffer);
+ rc = check_and_update_chart_labels(st, work_buffer, query_counter);
+ if (unlikely(rc))
+ error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st));
+ else
+ (*query_counter)++;
+
+ rc = store_chart_metadata(st);
if (unlikely(rc))
- internal_error(true, "METADATA: Failed to store chart metadata %s", string2str(st->id));
+ error_report("METADATA: 'host:%s': Failed to store metadata for chart %s", rrdhost_hostname(host), rrdset_name(st));
}
RRDDIM *rd;
@@ -935,116 +1050,145 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_
else
rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
- rc = sql_store_dimension(
- &rd->metric_uuid,
- &rd->rrdset->chart_uuid,
- string2str(rd->id),
- string2str(rd->name),
- rd->multiplier,
- rd->divisor,
- rd->algorithm,
- rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN));
-
+ rc = store_dimension_metadata(rd);
if (unlikely(rc))
- error_report("METADATA: Failed to store dimension %s", string2str(rd->id));
+ error_report("METADATA: 'host:%s': Failed to dimension metadata for chart %s. dimension %s",
+ rrdhost_hostname(host), rrdset_name(st),
+ rrddim_name(rd));
}
}
rrddim_foreach_done(rd);
}
rrdset_foreach_done(st);
- buffer_free(work_buffer);
+ if (use_transaction)
+ (void)db_execute(db_meta, "COMMIT TRANSACTION;");
+
return more_to_do;
}
+static void store_host_and_system_info(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter)
+{
+ bool free_work_buffer = (NULL == work_buffer);
+
+ if (unlikely(free_work_buffer))
+ work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+
+ if (build_host_system_info_statements(host, work_buffer)) {
+ int rc = db_execute(db_meta, buffer_tostring(work_buffer));
+ if (unlikely(rc)) {
+ error_report("METADATA: 'host:%s': Failed to store host updated information in the database", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
+ }
+ else {
+ if (likely(query_counter))
+ (*query_counter)++;
+ }
+ }
+
+ if (unlikely(store_host_metadata(host))) {
+ error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
+ }
+ else {
+ if (likely(query_counter))
+ (*query_counter)++;
+ }
+
+ if (unlikely(free_work_buffer))
+ buffer_free(work_buffer);
+}
+
// Worker thread to scan hosts for pending metadata to store
static void start_metadata_hosts(uv_work_t *req __maybe_unused)
{
register_libuv_worker_jobs();
RRDHOST *host;
+ int transaction_started = 0;
struct scan_metadata_payload *data = req->data;
struct metadata_wc *wc = data->wc;
+ BUFFER *work_buffer = data->work_buffer;
usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut;
internal_error(true, "METADATA: checking all hosts...");
+ usec_t started_ut = now_monotonic_usec(); (void)started_ut;
bool run_again = false;
worker_is_busy(UV_EVENT_METADATA_STORE);
if (!data->max_count)
- db_execute("BEGIN TRANSACTION;");
+ transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;");
+
dfe_start_reentrant(rrdhost_root_index, host) {
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
continue;
size_t query_counter = 0; (void)query_counter;
- usec_t started_ut = now_monotonic_usec(); (void)started_ut;
rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) {
rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS);
+
int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
- if (likely(rc == SQLITE_OK)) {
- BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ if (likely(!rc)) {
+ query_counter++;
+
+ buffer_flush(work_buffer);
struct query_build tmp = {.sql = work_buffer, .count = 0};
uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
- db_execute(buffer_tostring(work_buffer));
- buffer_free(work_buffer);
- query_counter++;
+ rc = db_execute(db_meta, buffer_tostring(work_buffer));
+
+ if (unlikely(rc)) {
+ error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
+ }
+ else
+ query_counter++;
+ } else {
+ error_report("METADATA: 'host:%s': failed to delete old host labels", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
}
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) {
rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID);
uuid_t uuid;
-
+ int rc;
if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid)))
- store_claim_id(&host->host_uuid, &uuid);
+ rc = store_claim_id(&host->host_uuid, &uuid);
else
- store_claim_id(&host->host_uuid, NULL);
-
- query_counter++;
- }
-
- if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
- rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
-
- BUFFER *work_buffer = sql_store_host_system_info(host);
- if(work_buffer) {
- db_execute(buffer_tostring(work_buffer));
- buffer_free(work_buffer);
- query_counter++;
- }
+ rc = store_claim_id(&host->host_uuid, NULL);
- int rc = sql_store_host_info(host);
if (unlikely(rc))
- error_report("METADATA: 'host:%s': failed to store host info", string2str(host->hostname));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE);
else
query_counter++;
}
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
+ store_host_and_system_info(host, work_buffer, &query_counter);
+ }
- if (data->max_count)
- db_execute("BEGIN TRANSACTION;");
- if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) {
+ // For clarity
+ bool use_transaction = data->max_count;
+ if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) {
run_again = true;
rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host));
}
- if (data->max_count)
- db_execute("COMMIT TRANSACTION;");
-
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms",
rrdhost_hostname(host), query_counter,
(double)(ended_ut - started_ut) / USEC_PER_MS);
}
dfe_done(host);
- if (!data->max_count)
- db_execute("COMMIT TRANSACTION;");
+
+ if (!data->max_count && transaction_started)
+ transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;");
usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
@@ -1059,7 +1203,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
static void metadata_event_loop(void *arg)
{
- service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register("METASYNC");
worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
@@ -1067,6 +1210,7 @@ static void metadata_event_loop(void *arg)
worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
+ worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models");
int ret;
uv_loop_t *loop;
@@ -1076,6 +1220,7 @@ static void metadata_event_loop(void *arg)
uv_work_t metadata_cleanup_worker;
uv_thread_set_name_np(wc->thread, "METASYNC");
+// service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
ret = uv_loop_init(loop);
if (ret) {
@@ -1112,11 +1257,12 @@ static void metadata_event_loop(void *arg)
int shutdown = 0;
wc->row_id = 0;
completion_mark_complete(&wc->init_complete);
+ BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ struct scan_metadata_payload *data;
while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
uuid_t *uuid;
RRDHOST *host = NULL;
- int rc;
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
@@ -1145,6 +1291,11 @@ static void metadata_event_loop(void *arg)
case METADATA_DATABASE_TIMER:
break;
+ case METADATA_ML_LOAD_MODELS: {
+ RRDDIM *rd = (RRDDIM *) cmd.param[0];
+ ml_dimension_load_models(rd);
+ break;
+ }
case METADATA_DEL_DIMENSION:
uuid = (uuid_t *) cmd.param[0];
if (likely(dimension_can_be_deleted(uuid)))
@@ -1158,9 +1309,7 @@ static void metadata_event_loop(void *arg)
break;
case METADATA_ADD_HOST_INFO:
host = (RRDHOST *) cmd.param[0];
- rc = sql_store_host_info(host);
- if (unlikely(rc))
- error_report("Failed to store host info in the database for %s", string2str(host->hostname));
+ store_host_and_system_info(host, NULL, NULL);
break;
case METADATA_SCAN_HOSTS:
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
@@ -1169,10 +1318,11 @@ static void metadata_event_loop(void *arg)
if (unittest_running)
break;
- struct scan_metadata_payload *data = mallocz(sizeof(*data));
+ data = mallocz(sizeof(*data));
data->request.data = data;
data->wc = wc;
data->completion = cmd.completion; // Completion by the worker
+ data->work_buffer = work_buffer;
if (unlikely(cmd.completion)) {
data->max_count = 0; // 0 will process all pending updates
@@ -1192,6 +1342,19 @@ static void metadata_event_loop(void *arg)
metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
}
break;
+ case METADATA_LOAD_HOST_CONTEXT:;
+ if (unittest_running)
+ break;
+
+ data = callocz(1,sizeof(*data));
+ data->request.data = data;
+ data->wc = wc;
+ if (unlikely(
+ uv_queue_work(loop,&data->request, start_all_host_load_context,
+ after_start_host_load_context))) {
+ freez(data);
+ }
+ break;
case METADATA_MAINTENANCE:
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
break;
@@ -1220,21 +1383,14 @@ static void metadata_event_loop(void *arg)
if (!uv_timer_stop(&wc->timer_req))
uv_close((uv_handle_t *)&wc->timer_req, NULL);
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour we need to be aware if this becomes
- * an issue in the future.
- */
uv_close((uv_handle_t *)&wc->async, NULL);
- uv_run(loop, UV_RUN_DEFAULT);
-
uv_cond_destroy(&wc->cmd_cond);
int rc;
-
do {
rc = uv_loop_close(loop);
} while (rc != UV_EBUSY);
+ buffer_free(work_buffer);
freez(loop);
worker_unregister();
@@ -1272,6 +1428,9 @@ void metadata_sync_shutdown(void)
void metadata_sync_shutdown_prepare(void)
{
+ if (unlikely(!metasync_worker.loop))
+ return;
+
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -1303,8 +1462,6 @@ void metadata_sync_init(void)
{
struct metadata_wc *wc = &metasync_worker;
- fatal_assert(0 == uv_mutex_init(&metadata_async_lock));
-
memset(wc, 0, sizeof(*wc));
metadata_init_cmd_queue(wc);
completion_init(&wc->init_complete);
@@ -1364,6 +1521,20 @@ void metaqueue_host_update_info(RRDHOST *host)
queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL);
}
+void metaqueue_ml_load_models(RRDDIM *rd)
+{
+ if (unlikely(!metasync_worker.loop))
+ return;
+ queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL);
+}
+
+void metadata_queue_load_host_context(RRDHOST *host)
+{
+ if (unlikely(!metasync_worker.loop))
+ return;
+ queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL);
+}
+
//
// unitests
//
@@ -1419,7 +1590,7 @@ static void *metadata_unittest_threads(void)
unittest_queue_metadata,
&tu);
}
- uv_async_send(&metasync_worker.async);
+ (void) uv_async_send(&metasync_worker.async);
sleep_usec(seconds_to_run * USEC_PER_SEC);
__atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED);
diff --git a/database/sqlite/sqlite_metadata.h b/database/sqlite/sqlite_metadata.h
index d578b7a8..6b0676ee 100644
--- a/database/sqlite/sqlite_metadata.h
+++ b/database/sqlite/sqlite_metadata.h
@@ -14,7 +14,9 @@ void metadata_sync_shutdown_prepare(void);
void metaqueue_delete_dimension_uuid(uuid_t *uuid);
void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid);
void metaqueue_host_update_info(RRDHOST *host);
+void metaqueue_ml_load_models(RRDDIM *rd);
void migrate_localhost(uuid_t *host_uuid);
+void metadata_queue_load_host_context(RRDHOST *host);
// UNIT TEST
int metadata_unittest(void);