summaryrefslogtreecommitdiffstats
path: root/database/sqlite
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/sqlite/sqlite_aclk.c332
-rw-r--r--database/sqlite/sqlite_aclk.h35
-rw-r--r--database/sqlite/sqlite_aclk_alert.c452
-rw-r--r--database/sqlite/sqlite_aclk_alert.h3
-rw-r--r--database/sqlite/sqlite_aclk_node.c130
-rw-r--r--database/sqlite/sqlite_context.c54
-rw-r--r--database/sqlite/sqlite_db_migration.c163
-rw-r--r--database/sqlite/sqlite_functions.c157
-rw-r--r--database/sqlite/sqlite_functions.h5
-rw-r--r--database/sqlite/sqlite_health.c479
-rw-r--r--database/sqlite/sqlite_health.h2
-rw-r--r--database/sqlite/sqlite_metadata.c354
-rw-r--r--database/sqlite/sqlite_metadata.h1
13 files changed, 1126 insertions, 1041 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 1298045c2..ac574879c 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -11,60 +11,46 @@ struct aclk_sync_config_s {
uv_timer_t timer_req;
time_t cleanup_after; // Start a cleanup after this timestamp
uv_async_t async;
- /* FIFO command queue */
- uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
bool initialized;
- volatile unsigned queue_size;
- struct aclk_database_cmdqueue cmd_queue;
+ SPINLOCK cmd_queue_lock;
+ struct aclk_database_cmd *cmd_base;
} aclk_sync_config = { 0 };
-
void sanity_check(void) {
// make sure the compiler will stop on misconfigurations
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
}
-
-int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd)
+static struct aclk_database_cmd aclk_database_deq_cmd(void)
{
- unsigned queue_size;
+ struct aclk_database_cmd ret;
- /* wait for free space in queue */
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- return 1;
+ spinlock_lock(&aclk_sync_config.cmd_queue_lock);
+ if(aclk_sync_config.cmd_base) {
+ struct aclk_database_cmd *t = aclk_sync_config.cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next);
+ ret = *t;
+ freez(t);
}
+ else {
+ ret.opcode = ACLK_DATABASE_NOOP;
+ ret.completion = NULL;
+ }
+ spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
- aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.tail + 1 : 0;
- aclk_sync_config.queue_size = queue_size + 1;
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- return 0;
+ return ret;
}
static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd)
{
- unsigned queue_size;
+ struct aclk_database_cmd *t = mallocz(sizeof(*t));
+ *t = *cmd;
+ t->prev = t->next = NULL;
+
+ spinlock_lock(&aclk_sync_config.cmd_queue_lock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next);
+ spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
- /* wait for free space in queue */
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex);
- }
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
- aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.tail + 1 : 0;
- aclk_sync_config.queue_size = queue_size + 1;
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
-
- /* wake up event loop */
(void) uv_async_send(&aclk_sync_config.async);
}
@@ -84,6 +70,8 @@ enum {
IDX_PROGRAM_VERSION,
IDX_ENTRIES,
IDX_HEALTH_ENABLED,
+ IDX_LAST_CONNECTED,
+ IDX_IS_EPHEMERAL,
};
static int create_host_callback(void *data, int argc, char **argv, char **column)
@@ -92,9 +80,31 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
UNUSED(argc);
UNUSED(column);
+ time_t last_connected =
+ (time_t)(argv[IDX_LAST_CONNECTED] ? str2uint64_t(argv[IDX_LAST_CONNECTED], NULL) : 0);
+
+ if (!last_connected)
+ last_connected = now_realtime_sec();
+
+ time_t age = now_realtime_sec() - last_connected;
+ int is_ephemeral = 0;
+
+ if (argv[IDX_IS_EPHEMERAL])
+ is_ephemeral = str2i(argv[IDX_IS_EPHEMERAL]);
+
char guid[UUID_STR_LEN];
uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid);
+ if (is_ephemeral && age > rrdhost_free_ephemeral_time_s) {
+ netdata_log_info(
+ "Skipping ephemeral hostname \"%s\" with GUID \"%s\", age = %ld seconds (limit %ld seconds)",
+ (const char *)argv[IDX_HOSTNAME],
+ guid,
+ age,
+ rrdhost_free_ephemeral_time_s);
+ return 0;
+ }
+
struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
@@ -103,32 +113,48 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info);
RRDHOST *host = rrdhost_find_or_create(
- (const char *) argv[IDX_HOSTNAME]
- , (const char *) argv[IDX_REGISTRY]
- , guid
- , (const char *) argv[IDX_OS]
- , (const char *) argv[IDX_TIMEZONE]
- , (const char *) argv[IDX_ABBREV_TIMEZONE]
- , (int32_t) (argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0)
- , (const char *) argv[IDX_TAGS]
- , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown")
- , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown")
- , argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1
- , argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0
- , default_rrd_memory_mode
- , 0 // health
- , 0 // rrdpush enabled
- , NULL //destination
- , NULL // api key
- , NULL // send charts matching
- , false // rrdpush_enable_replication
- , 0 // rrdpush_seconds_to_replicate
- , 0 // rrdpush_replication_step
- , system_info
- , 1
- );
- if (likely(host))
+ (const char *)argv[IDX_HOSTNAME],
+ (const char *)argv[IDX_REGISTRY],
+ guid,
+ (const char *)argv[IDX_OS],
+ (const char *)argv[IDX_TIMEZONE],
+ (const char *)argv[IDX_ABBREV_TIMEZONE],
+ (int32_t)(argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET], NULL) : 0),
+ (const char *)argv[IDX_TAGS],
+ (const char *)(argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown"),
+ (const char *)(argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown"),
+ argv[IDX_UPDATE_EVERY] ? str2i(argv[IDX_UPDATE_EVERY]) : 1,
+ argv[IDX_ENTRIES] ? str2i(argv[IDX_ENTRIES]) : 0,
+ default_rrd_memory_mode,
+ 0 // health
+ ,
+ 0 // rrdpush enabled
+ ,
+ NULL //destination
+ ,
+ NULL // api key
+ ,
+ NULL // send charts matching
+ ,
+ false // rrdpush_enable_replication
+ ,
+ 0 // rrdpush_seconds_to_replicate
+ ,
+ 0 // rrdpush_replication_step
+ ,
+ system_info,
+ 1);
+
+ if (likely(host)) {
+ if (is_ephemeral)
+ rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
+
+ if (is_ephemeral)
+ host->child_disconnected_time = now_realtime_sec();
+
host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]);
+ host->last_connected = last_connected;
+ }
(*number_of_chidren)++;
@@ -136,43 +162,14 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
char node_str[UUID_STR_LEN] = "<none>";
if (likely(host->node_id))
uuid_unparse_lower(*host->node_id, node_str);
- internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str);
+ internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\" ephemeral=%d", rrdhost_hostname(host), host->machine_guid, node_str, is_ephemeral);
#endif
return 0;
}
#ifdef ENABLE_ACLK
-static struct aclk_database_cmd aclk_database_deq_cmd(void)
-{
- struct aclk_database_cmd ret;
- unsigned queue_size;
-
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- queue_size = aclk_sync_config.queue_size;
- if (queue_size == 0) {
- memset(&ret, 0, sizeof(ret));
- ret.opcode = ACLK_DATABASE_NOOP;
- ret.completion = NULL;
-
- } else {
- /* dequeue command */
- ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head];
- if (queue_size == 1) {
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
- } else {
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.head + 1 : 0;
- }
- aclk_sync_config.queue_size = queue_size - 1;
- /* wake up producers */
- uv_cond_signal(&aclk_sync_config.cmd_cond);
- }
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- return ret;
-}
-
-#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
+#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id"
static int is_host_available(uuid_t *host_id)
{
sqlite3_stmt *res = NULL;
@@ -231,7 +228,7 @@ static void sql_delete_aclk_table_list(char *host_guid)
BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str);
+ "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index')", uuid_str);
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -255,18 +252,63 @@ fail:
buffer_free(sql);
}
+// OPCODE: ACLK_DATABASE_NODE_UNREGISTER
+static void sql_unregister_node(char *machine_guid)
+{
+ int rc;
+ uuid_t host_uuid;
+
+ if (unlikely(!machine_guid))
+ return;
+
+ rc = uuid_parse(machine_guid, host_uuid);
+ if (rc) {
+ freez(machine_guid);
+ return;
+ }
+
+ sqlite3_stmt *res = NULL;
+
+ rc = sqlite3_prepare_v2(db_meta, "UPDATE node_instance SET node_id = NULL WHERE host_id = @host_id", -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to remove the host node id");
+ freez(machine_guid);
+ return;
+ }
+
+ rc = sqlite3_bind_blob(res, 1, &host_uuid, sizeof(host_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind host_id parameter to remove host node id");
+ goto skip;
+ }
+ rc = sqlite3_step_monitored(res);
+ if (unlikely(rc != SQLITE_DONE)) {
+ error_report("Failed to execute command to remove host node id");
+ } else {
+ // node: machine guid will be freed after processing
+ metadata_delete_host_chart_labels(machine_guid);
+ machine_guid = NULL;
+ }
+
+skip:
+ if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
+ error_report("Failed to finalize statement to remove host node id");
+ freez(machine_guid);
+}
+
+
static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused)
{
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_DELETE_HOST;
cmd.param[0] = strdupz((char *) argv[0]);
- aclk_database_enq_cmd_noblock(&cmd);
+ aclk_database_enq_cmd(&cmd);
return 0;
}
#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
+ "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table')"
static void sql_check_aclk_table_list(void)
{
@@ -278,19 +320,18 @@ static void sql_check_aclk_table_list(void)
}
}
-#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d;"
+#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d"
static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
- char sql[512];
- snprintfz(sql,511, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL);
+ char sql[ACLK_SYNC_QUERY_SIZE];
+ snprintfz(sql,sizeof(sql) - 1, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL);
if (unlikely(db_execute(db_meta, sql)))
error_report("Failed to clean stale ACLK alert entries");
return 0;
}
-
-#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');"
+#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table')"
static void sql_maint_aclk_sync_database_all(void)
{
@@ -304,7 +345,7 @@ static void sql_maint_aclk_sync_database_all(void)
static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
- char uuid_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str);
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
@@ -332,18 +373,15 @@ static void timer_cb(uv_timer_t *handle)
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- time_t now = now_realtime_sec();
-
- if (config->cleanup_after && config->cleanup_after < now) {
+ if (config->cleanup_after < now_realtime_sec()) {
cmd.opcode = ACLK_DATABASE_CLEANUP;
- if (!aclk_database_enq_cmd_noblock(&cmd))
- config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
+ aclk_database_enq_cmd(&cmd);
+ config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
if (aclk_connected) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
- aclk_database_enq_cmd_noblock(&cmd);
-
+ aclk_database_enq_cmd(&cmd);
aclk_check_node_info_and_collectors();
}
}
@@ -414,12 +452,16 @@ static void aclk_synchronization(void *arg __maybe_unused)
case ACLK_DATABASE_NODE_STATE:;
RRDHOST *host = cmd.param[0];
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
- struct aclk_sync_host_config *ahc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *ahc = host->aclk_config;
if (unlikely(!ahc))
sql_create_aclk_table(host, &host->host_uuid, host->node_id);
- aclk_host_state_update(host, live);
+ aclk_host_state_update(host, live, 1);
break;
-// ALERTS
+ case ACLK_DATABASE_NODE_UNREGISTER:
+ sql_unregister_node(cmd.param[0]);
+
+ break;
+ // ALERTS
case ACLK_DATABASE_PUSH_ALERT_CONFIG:
aclk_push_alert_config_event(cmd.param[0], cmd.param[1]);
break;
@@ -444,8 +486,6 @@ static void aclk_synchronization(void *arg __maybe_unused)
uv_close((uv_handle_t *)&config->timer_req, NULL);
uv_close((uv_handle_t *)&config->async, NULL);
-// uv_close((uv_handle_t *)&config->async_exit, NULL);
- uv_cond_destroy(&config->cmd_cond);
(void) uv_loop_close(loop);
worker_unregister();
@@ -455,11 +495,7 @@ static void aclk_synchronization(void *arg __maybe_unused)
static void aclk_synchronization_init(void)
{
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
- aclk_sync_config.queue_size = 0;
- fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond));
- fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex));
-
+ memset(&aclk_sync_config, 0, sizeof(aclk_sync_config));
fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config));
}
#endif
@@ -469,8 +505,8 @@ static void aclk_synchronization_init(void)
void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused)
{
#ifdef ENABLE_ACLK
- char uuid_str[GUID_LEN + 1];
- char host_guid[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
+ char host_guid[UUID_STR_LEN];
int rc;
uuid_unparse_lower_fix(host_uuid, uuid_str);
@@ -478,37 +514,34 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, TABLE_ACLK_ALERT, uuid_str);
+ snprintfz(sql, sizeof(sql) - 1, TABLE_ACLK_ALERT, uuid_str);
rc = db_execute(db_meta, sql);
if (unlikely(rc))
error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid);
else {
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT, uuid_str, uuid_str);
+ snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT1, uuid_str, uuid_str);
rc = db_execute(db_meta, sql);
if (unlikely(rc))
- error_report("Failed to create ACLK alert table index for host %s", host ? string2str(host->hostname) : host_guid);
+ error_report(
+ "Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid);
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT1, uuid_str, uuid_str);
+ snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT2, uuid_str, uuid_str);
rc = db_execute(db_meta, sql);
if (unlikely(rc))
- error_report("Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid);
-
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE -1, INDEX_ACLK_ALERT2, uuid_str, uuid_str);
- rc = db_execute(db_meta, sql);
- if (unlikely(rc))
- error_report("Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid);
+ error_report(
+ "Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid);
}
- if (likely(host) && unlikely(host->aclk_sync_host_config))
+ if (likely(host) && unlikely(host->aclk_config))
return;
if (unlikely(!host))
return;
- struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config));
+ struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t));
if (node_id && !uuid_is_null(*node_id))
uuid_unparse_lower(*node_id, wc->node_id);
- host->aclk_sync_host_config = (void *)wc;
+ host->aclk_config = wc;
if (node_id && !host->node_id) {
host->node_id = mallocz(sizeof(*host->node_id));
uuid_copy(*host->node_id, *node_id);
@@ -522,12 +555,18 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may
#endif
}
-#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \
- "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \
- "program_version, entries, health_enabled FROM host WHERE hops >0;"
+#define SQL_FETCH_ALL_HOSTS \
+ "SELECT host_id, hostname, registry_hostname, update_every, os, " \
+ "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \
+ "program_version, entries, health_enabled, last_connected, " \
+ "(SELECT CASE WHEN hl.label_value = 'true' THEN 1 ELSE 0 END FROM " \
+ "host_label hl WHERE hl.host_id = h.host_id AND hl.label_key = '_is_ephemeral') " \
+ "FROM host h WHERE hops > 0"
+
+#define SQL_FETCH_ALL_INSTANCES \
+ "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \
+ "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL"
-#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \
- "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; "
void sql_aclk_sync_init(void)
{
char *err_msg = NULL;
@@ -622,3 +661,18 @@ void schedule_node_info_update(RRDHOST *host __maybe_unused)
aclk_database_enq_cmd(&cmd);
#endif
}
+
+#ifdef ENABLE_ACLK
+void unregister_node(const char *machine_guid)
+{
+ if (unlikely(!machine_guid))
+ return;
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_NODE_UNREGISTER;
+ cmd.param[0] = strdupz(machine_guid);
+ cmd.completion = NULL;
+ aclk_database_enq_cmd(&cmd);
+}
+#endif \ No newline at end of file
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 850ca434e..0db2647bf 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -5,14 +5,13 @@
#include "sqlite3.h"
-
#ifndef ACLK_MAX_CHART_BATCH
#define ACLK_MAX_CHART_BATCH (200)
#endif
#ifndef ACLK_MAX_CHART_BATCH_COUNT
#define ACLK_MAX_CHART_BATCH_COUNT (10)
#endif
-#define ACLK_MAX_ALERT_UPDATES (5)
+#define ACLK_MAX_ALERT_UPDATES "5"
#define ACLK_DATABASE_CLEANUP_FIRST (1200)
#define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
#define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400)
@@ -41,13 +40,13 @@ static inline int claimed()
return localhost->aclk_state.claimed_id != NULL;
}
-#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
- "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
- "unique(alert_unique_id));"
+#define TABLE_ACLK_ALERT \
+ "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
+ "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
+ "UNIQUE(alert_unique_id))"
-#define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);"
-#define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id);"
-#define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted);"
+#define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id)"
+#define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted)"
enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0,
@@ -60,6 +59,7 @@ enum aclk_database_opcode {
ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,
ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
+ ACLK_DATABASE_NODE_UNREGISTER,
ACLK_DATABASE_TIMER,
// leave this last
@@ -71,16 +71,10 @@ struct aclk_database_cmd {
enum aclk_database_opcode opcode;
void *param[2];
struct completion *completion;
+ struct aclk_database_cmd *prev, *next;
};
-#define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024)
-
-struct aclk_database_cmdqueue {
- unsigned head, tail;
- struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE];
-};
-
-struct aclk_sync_host_config {
+typedef struct aclk_sync_cfg_t {
RRDHOST *host;
int alert_updates;
int alert_checkpoint_req;
@@ -92,17 +86,16 @@ struct aclk_sync_host_config {
char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested
uint64_t alerts_log_first_sequence_id;
uint64_t alerts_log_last_sequence_id;
-};
-
-extern sqlite3 *db_meta;
+} aclk_sync_cfg_t;
-int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd);
void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
void sql_aclk_sync_init(void);
void aclk_push_alert_config(const char *node_id, const char *config_hash);
void aclk_push_node_alert_snapshot(const char *node_id);
-void aclk_push_node_health_log(const char *node_id);
void aclk_push_node_removed_alerts(const char *node_id);
void schedule_node_info_update(RRDHOST *host);
+#ifdef ENABLE_ACLK
+void unregister_node(const char *machine_guid);
+#endif
#endif //NETDATA_SQLITE_ACLK_H
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index 60bf5dbdc..9bd060f96 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -13,16 +13,42 @@
sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \
})
-
#define SQL_UPDATE_FILTERED_ALERT \
- "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u, date_created = unixepoch() where filtered_alert_unique_id = %u"
+ "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \
+ "WHERE filtered_alert_unique_id = @old_alert"
-static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str)
+static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str)
{
+ sqlite3_stmt *res = NULL;
+
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id);
- sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL);
- ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+ snprintfz(sql, sizeof(sql) - 1, SQL_UPDATE_FILTERED_ALERT, uuid_str);
+ int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement when trying to update_filtered");
+ return;
+ }
+
+ rc = sqlite3_bind_int64(res, 1, ae->unique_id);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind ae unique_id for update_filtered");
+ goto done;
+ }
+
+ rc = sqlite3_bind_int64(res, 2, unique_id);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind unique_id for update_filtered");
+ goto done;
+ }
+
+ rc = sqlite3_step_monitored(res);
+ if (likely(rc == SQLITE_DONE))
+ ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+
+done:
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when trying to update_filtered, rc = %d", rc);
}
#define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \
@@ -30,35 +56,35 @@ static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str)
"WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \
"AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL"
-static inline bool is_event_from_alert_variable_config(uint32_t unique_id, uuid_t *host_id)
+static inline bool is_event_from_alert_variable_config(int64_t unique_id, uuid_t *host_id)
{
sqlite3_stmt *res = NULL;
- int rc = 0;
- bool ret = false;
- rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0);
+ int rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to check for alert variables.");
return false;
}
- rc = sqlite3_bind_int(res, 1, (int) unique_id);
+ bool ret = false;
+
+ rc = sqlite3_bind_int64(res, 1, unique_id);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind unique_id for checking alert variable.");
- goto fail;
+ goto done;
}
rc = sqlite3_bind_blob(res, 2, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for checking alert variable.");
- goto fail;
+ goto done;
}
rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW))
ret = true;
-fail:
+done:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc);
@@ -71,32 +97,25 @@ fail:
//decide if some events should be sent or not
#define SQL_SELECT_ALERT_BY_ID \
"SELECT hld.new_status, hl.config_hash_id, hld.unique_id FROM health_log hl, aclk_alert_%s aa, health_log_detail hld " \
- "WHERE hl.host_id = @host_id AND +hld.unique_id = aa.filtered_alert_unique_id " \
+ "WHERE hl.host_id = @host_id AND hld.unique_id = aa.filtered_alert_unique_id " \
"AND hld.alarm_id = @alarm_id AND hl.health_log_id = hld.health_log_id " \
- "ORDER BY hld.rowid DESC LIMIT 1;"
+ "ORDER BY hld.rowid DESC LIMIT 1"
static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
{
sqlite3_stmt *res = NULL;
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
-
- bool send = false;
if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED)
return 0;
- if (unlikely(uuid_is_null(ae->config_hash_id)))
+ if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config))
return 0;
char sql[ACLK_SYNC_QUERY_SIZE];
- uuid_t config_hash_id;
- RRDCALC_STATUS status;
- uint32_t unique_id;
//get the previous sent event of this alarm_id
//base the search on the last filtered event
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_SELECT_ALERT_BY_ID, uuid_str);
+ snprintfz(sql, sizeof(sql) - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str);
int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -104,6 +123,8 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
return true;
}
+ bool send = false;
+
rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for checking should_send_to_cloud");
@@ -119,17 +140,18 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW)) {
- status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
+ uuid_t config_hash_id;
+ RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
if (sqlite3_column_type(res, 1) != SQLITE_NULL)
uuid_copy(config_hash_id, *((uuid_t *)sqlite3_column_blob(res, 1)));
- unique_id = (uint32_t)sqlite3_column_int64(res, 2);
+ int64_t unique_id = sqlite3_column_int64(res, 2);
if (ae->new_status != (RRDCALC_STATUS)status || uuid_memcmp(&ae->config_hash_id, &config_hash_id))
send = true;
else
- update_filtered(ae, unique_id, uuid_str);
+ update_filtered(ae, unique_id, host->aclk_config->uuid_str);
} else
send = true;
@@ -143,13 +165,12 @@ done:
#define SQL_QUEUE_ALERT_TO_CLOUD \
"INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
- "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING;"
+ "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING"
void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
{
sqlite3_stmt *res_alert = NULL;
char sql[ACLK_SYNC_QUERY_SIZE];
- char uuid_str[UUID_STR_LEN];
if (!service_running(SERVICE_ACLK))
return;
@@ -163,8 +184,7 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
return;
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str);
+ snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str);
int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0);
if (unlikely(rc != SQLITE_OK)) {
@@ -172,18 +192,18 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
return;
}
- rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id);
+ rc = sqlite3_bind_int64(res_alert, 1, ae->unique_id);
if (unlikely(rc != SQLITE_OK))
- goto bind_fail;
+ goto done;
rc = execute_insert(res_alert);
if (unlikely(rc == SQLITE_DONE)) {
ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
} else
- error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
+ error_report("Failed to store alert event %"PRId64", rc = %d", ae->unique_id, rc);
-bind_fail:
+done:
if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
error_report("Failed to reset statement in store alert event, rc = %d", rc);
}
@@ -239,15 +259,13 @@ static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) {
}
-void aclk_push_alert_event(struct aclk_sync_host_config *wc)
+static void aclk_push_alert_event(struct aclk_sync_cfg_t *wc __maybe_unused)
{
-#ifndef ENABLE_ACLK
- UNUSED(wc);
-#else
+#ifdef ENABLE_ACLK
int rc;
if (unlikely(!wc->alert_updates)) {
- netdata_log_access(
+ nd_log(NDLS_ACCESS, NDLP_NOTICE,
"ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.",
wc->node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A");
@@ -265,23 +283,20 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
- int limit = ACLK_MAX_ALERT_UPDATES;
-
sqlite3_stmt *res = NULL;
buffer_sprintf(
sql,
- "select aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, "
+ "SELECT aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, "
" hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, "
" hl.chart, hl.exec, hl.recipient, ha.source, hl.units, hld.info, hld.exec_code, hld.new_status, "
" hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, "
" hld.alarm_event_id, hl.chart_name, hld.summary "
- " from health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld "
- " where hld.unique_id = aa.alert_unique_id and hl.config_hash_id = ha.hash_id and aa.date_submitted is null "
- " and hl.host_id = @host_id and hl.health_log_id = hld.health_log_id "
- " order by aa.sequence_id asc limit %d;",
- wc->uuid_str,
- limit);
+ " FROM health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld "
+ " WHERE hld.unique_id = aa.alert_unique_id AND hl.config_hash_id = ha.hash_id AND aa.date_submitted IS NULL "
+ " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id "
+ " ORDER BY aa.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES,
+ wc->uuid_str);
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -292,13 +307,6 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
rc = db_execute(db_meta, buffer_tostring(sql_fix));
if (unlikely(rc))
error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
-
- else {
- buffer_flush(sql_fix);
- buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str);
- if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix))))
- error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
- }
buffer_free(sql_fix);
// Try again
@@ -315,10 +323,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid, sizeof(wc->host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for pushing alert event.");
- sqlite3_finalize(res);
- buffer_free(sql);
- freez(claim_id);
- return;
+ goto done;
}
uint64_t first_sequence_id = 0;
@@ -395,9 +400,13 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
if (first_sequence_id) {
buffer_flush(sql);
- buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
- "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
- wc->uuid_str, first_sequence_id, last_sequence_id);
+ buffer_sprintf(
+ sql,
+ "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
+ "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64,
+ wc->uuid_str,
+ first_sequence_id,
+ last_sequence_id);
if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host));
@@ -407,7 +416,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
} else {
if (wc->alerts_log_first_sequence_id)
- netdata_log_access(
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
"ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "",
wc->node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A",
@@ -417,6 +426,7 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
wc->alerts_log_last_sequence_id = 0;
}
+done:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
@@ -437,7 +447,7 @@ void aclk_push_alert_events_for_all_hosts(void)
rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (likely(wc))
aclk_push_alert_event(wc);
}
@@ -446,59 +456,54 @@ void aclk_push_alert_events_for_all_hosts(void)
void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
{
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
sqlite3_stmt *res = NULL;
int rc;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+
+ BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+
rw_spinlock_write_lock(&host->health_log.spinlock);
- buffer_sprintf(sql, "delete from aclk_alert_%s; ", uuid_str);
- if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) {
- rw_spinlock_write_unlock(&host->health_log.spinlock);
- buffer_free(sql);
- return;
- }
+ buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str);
+ if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
+ goto skip;
buffer_flush(sql);
+
buffer_sprintf(
sql,
- "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
- "select hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id from health_log_detail hld, health_log hl "
- "where hld.new_status <> 0 and hld.new_status <> -2 and hl.health_log_id = hld.health_log_id and hl.config_hash_id is not null "
- "and hld.updated_by_id = 0 and hl.host_id = @host_id order by hld.unique_id asc on conflict (alert_unique_id) do nothing;",
- uuid_str);
+ "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
+ "SELECT hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id FROM health_log_detail hld, health_log hl "
+ "WHERE hld.new_status <> 0 AND hld.new_status <> -2 AND hl.health_log_id = hld.health_log_id AND hl.config_hash_id IS NOT NULL "
+ "AND hld.updated_by_id = 0 AND hl.host_id = @host_id ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING",
+ wc->uuid_str);
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to queue existing alerts.");
- rw_spinlock_write_unlock(&host->health_log.spinlock);
- buffer_free(sql);
- return;
+ goto skip;
}
rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for when trying to queue existing alerts.");
- sqlite3_finalize(res);
- rw_spinlock_write_unlock(&host->health_log.spinlock);
- buffer_free(sql);
- return;
+ goto done;
}
rc = execute_insert(res);
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to queue existing alerts, rc = %d", rc);
-
+ else
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+done:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to queue existing alerts, rc = %d", rc);
+skip:
rw_spinlock_write_unlock(&host->health_log.spinlock);
-
buffer_free(sql);
- rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
void aclk_send_alarm_configuration(char *config_hash)
@@ -506,12 +511,12 @@ void aclk_send_alarm_configuration(char *config_hash)
if (unlikely(!config_hash))
return;
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = localhost->aclk_config;
if (unlikely(!wc))
return;
- netdata_log_access(
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
"ACLK REQ [%s (%s)]: Request to send alert config %s.",
wc->node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A",
@@ -524,43 +529,33 @@ void aclk_send_alarm_configuration(char *config_hash)
"SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
"module, charts, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
"options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
- "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id;"
+ "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id"
-int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
+void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
{
- int rc = 0;
-
#ifdef ENABLE_ACLK
-
- CHECK_SQLITE_CONNECTION(db_meta);
+ int rc;
sqlite3_stmt *res = NULL;
+ struct aclk_sync_cfg_t *wc;
- struct aclk_sync_host_config *wc = NULL;
RRDHOST *host = find_host_by_node_id(node_id);
- if (unlikely(!host)) {
+ if (unlikely(!host || !(wc = host->aclk_config))) {
freez(config_hash);
freez(node_id);
- return 1;
- }
-
- wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
- if (unlikely(!wc)) {
- freez(config_hash);
- freez(node_id);
- return 1;
+ return;
}
rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to fetch an alarm hash configuration");
- return 1;
+ return;
}
uuid_t hash_uuid;
if (uuid_parse(config_hash, hash_uuid))
- return 1;
+ return;
rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
@@ -632,13 +627,13 @@ int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash
}
if (likely(p_alarm_config.cfg_hash)) {
- netdata_log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
aclk_send_provide_alarm_cfg(&p_alarm_config);
freez(p_alarm_config.cfg_hash);
destroy_aclk_alarm_configuration(&alarm_config);
}
else
- netdata_log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
bind_fail:
rc = sqlite3_finalize(res);
@@ -648,7 +643,6 @@ bind_fail:
freez(config_hash);
freez(node_id);
#endif
- return rc;
}
@@ -660,51 +654,50 @@ void aclk_start_alert_streaming(char *node_id, bool resets)
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
- RRDHOST *host = find_host_by_node_id(node_id);
-
- if (unlikely(!host))
- return;
-
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc;
- if (unlikely(!wc))
+ RRDHOST *host = find_host_by_node_id(node_id);
+ if (unlikely(!host || !(wc = host->aclk_config)))
return;
if (unlikely(!host->health.health_enabled)) {
- netdata_log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
return;
}
if (resets) {
- netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
sql_queue_existing_alerts_to_aclk(host);
} else
- netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
wc->alert_updates = 1;
wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
}
-#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
+#define SQL_QUEUE_REMOVE_ALERTS \
+ "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
"SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \
- "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \
- "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \
- "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \
- "AND hl.name || hl.chart NOT IN (select name || chart from health_log where name = hl.name and chart = hl.chart and alarm_id > hl.alarm_id and host_id = hl.host_id) " \
- "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING;"
+ "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \
+ "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \
+ "AND hl.config_hash_id NOT IN (SELECT hash_id FROM alert_hash WHERE warn IS NULL AND crit IS NULL) " \
+ "AND hl.name || hl.chart NOT IN (select name || chart FROM health_log WHERE name = hl.name AND " \
+ "chart = hl.chart AND alarm_id > hl.alarm_id AND host_id = hl.host_id) " \
+ "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING"
+
void sql_process_queue_removed_alerts_to_aclk(char *node_id)
{
- struct aclk_sync_host_config *wc;
+ struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
freez(node_id);
- if (unlikely(!host || !(wc = host->aclk_sync_host_config)))
+ if (unlikely(!host || !(wc = host->aclk_config)))
return;
char sql[ACLK_SYNC_QUERY_SIZE * 2];
sqlite3_stmt *res = NULL;
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str);
+ snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str);
int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -715,33 +708,25 @@ void sql_process_queue_removed_alerts_to_aclk(char *node_id)
rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for when trying to queue remvoed alerts.");
- sqlite3_finalize(res);
- return;
+ goto skip;
}
rc = execute_insert(res);
- if (unlikely(rc != SQLITE_DONE)) {
- sqlite3_finalize(res);
- error_report("Failed to queue removed alerts, rc = %d", rc);
- return;
+ if (likely(rc == SQLITE_DONE)) {
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
+ rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ wc->alert_queue_removed = 0;
}
+skip:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to queue removed alerts, rc = %d", rc);
-
- netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
-
- rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
- wc->alert_queue_removed = 0;
}
void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
{
- if (unlikely(!host->aclk_sync_host_config))
- return;
-
- if (!claimed() || !host->node_id)
+ if (unlikely(!host->aclk_config || !claimed() || !host->node_id))
return;
char node_id[UUID_STR_LEN];
@@ -753,32 +738,28 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
{
uuid_t node_uuid;
+
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
+ struct aclk_sync_cfg_t *wc;
+
RRDHOST *host = find_host_by_node_id(node_id);
- if (unlikely(!host)) {
- netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
+ if (unlikely(!host || !(wc = host->aclk_config))) {
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
return;
}
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
-
- if (unlikely(!wc)) {
- netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
- return;
- }
-
- netdata_log_access(
+ nd_log(NDLS_ACCESS, NDLP_DEBUG,
"IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A",
snapshot_uuid);
+
if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
return;
- __sync_synchronize();
+
wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
- __sync_synchronize();
aclk_push_node_alert_snapshot(node_id);
}
@@ -795,9 +776,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->chart = strdupz(ae_chart_id(ae));
alarm_log->name = strdupz(ae_name(ae));
- alarm_log->batch_id = 0;
- alarm_log->sequence_id = 0;
- alarm_log->when = (time_t)ae->when;
+ alarm_log->when = ae->when;
alarm_log->config_hash = strdupz((char *)config_hash_id);
@@ -812,7 +791,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
- alarm_log->delay = (int)ae->delay;
+ alarm_log->delay = ae->delay;
alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
alarm_log->last_repeat = (time_t)ae->last_repeat;
@@ -842,18 +821,18 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
#endif
#ifdef ENABLE_ACLK
-static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark)
+static bool have_recent_alarm(RRDHOST *host, int64_t alarm_id, int64_t mark)
{
ALARM_ENTRY *ae = host->health_log.alarms;
while (ae) {
if (ae->alarm_id == alarm_id && ae->unique_id >mark &&
(ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
- return 1;
+ return true;
ae = ae->next;
}
- return 0;
+ return false;
}
#endif
@@ -864,17 +843,17 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
RRDHOST *host = find_host_by_node_id(node_id);
if (unlikely(!host)) {
- netdata_log_access("AC [%s (N/A)]: Node id not found", node_id);
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", node_id);
freez(node_id);
return;
}
freez(node_id);
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
// we perhaps we don't need this for snapshots
if (unlikely(!wc->alert_updates)) {
- netdata_log_access(
+ nd_log(NDLS_ACCESS, NDLP_NOTICE,
"ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.",
wc->node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A");
@@ -888,11 +867,9 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
if (unlikely(!claim_id))
return;
- netdata_log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
uint32_t cnt = 0;
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
rw_spinlock_read_lock(&host->health_log.spinlock);
@@ -915,7 +892,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
}
if (cnt) {
- uint32_t chunk = 1, chunks = 0;
+ uint32_t chunks;
chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
ae = host->health_log.alarms;
@@ -926,15 +903,12 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
alarm_snap.claim_id = claim_id;
alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
alarm_snap.chunks = chunks;
- alarm_snap.chunk = chunk;
+ alarm_snap.chunk = 1;
alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
for (; ae; ae = ae->next) {
- if (likely(ae->updated_by_id))
- continue;
-
- if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
+ if (likely(ae->updated_by_id) || unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
continue;
if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
@@ -957,19 +931,9 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
if (cnt == ALARM_EVENTS_PER_CHUNK) {
aclk_send_alarm_snapshot(snapshot_proto);
-
cnt = 0;
-
- if (chunk < chunks) {
- chunk++;
-
- struct alarm_snapshot alarm_snap;
- alarm_snap.node_id = wc->node_id;
- alarm_snap.claim_id = claim_id;
- alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
- alarm_snap.chunks = chunks;
- alarm_snap.chunk = chunk;
-
+ if (alarm_snap.chunk < chunks) {
+ alarm_snap.chunk++;
snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
}
}
@@ -986,51 +950,70 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
#endif
}
-#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created + %d < UNIXEPOCH();"
+#define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created < UNIXEPOCH() - @period"
+
void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
{
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+ if (unlikely(!wc))
+ return;
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_DELETE_ALERT_ENTRIES, uuid_str, MAX_REMOVED_PERIOD);
+ snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_ALERT_ENTRIES, wc->uuid_str);
- char *err_msg = NULL;
- int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg);
+ sqlite3_stmt *res = NULL;
+ int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
- error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg);
- sqlite3_free(err_msg);
+ error_report("Failed to prepare statement for cleaning stale ACLK alert entries.");
+ return;
}
+
+ rc = sqlite3_bind_int64(res, 1, MAX_REMOVED_PERIOD);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind MAX_REMOVED_PERIOD parameter.");
+ goto skip;
+ }
+
+ rc = sqlite3_step_monitored(res);
+ if (rc != SQLITE_DONE)
+ error_report("Failed to execute DELETE query for cleaning stale ACLK alert entries.");
+
+skip:
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement for cleaning stale ACLK alert entries.");
}
#define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \
"(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \
- "FROM aclk_alert_%s WHERE date_submitted IS NULL;"
+ "FROM aclk_alert_%s WHERE date_submitted IS NULL"
int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
{
- int rc;
- struct aclk_sync_host_config *wc = NULL;
- wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
+
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (!wc)
return 1;
proto_alert_status->alert_updates = wc->alert_updates;
char sql[ACLK_SYNC_QUERY_SIZE];
- sqlite3_stmt *res = NULL;
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
+ sqlite3_stmt *res = NULL;
+ snprintfz(sql, sizeof(sql) - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
- rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
+ int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to get alert log status from the database.");
return 1;
}
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
- proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
- proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
- proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
+ proto_alert_status->pending_min_sequence_id =
+ sqlite3_column_bytes(res, 0) > 0 ? (uint64_t)sqlite3_column_int64(res, 0) : 0;
+ proto_alert_status->pending_max_sequence_id =
+ sqlite3_column_bytes(res, 1) > 0 ? (uint64_t)sqlite3_column_int64(res, 1) : 0;
+ proto_alert_status->last_submitted_sequence_id =
+ sqlite3_column_bytes(res, 2) > 0 ? (uint64_t)sqlite3_column_int64(res, 2) : 0;
}
rc = sqlite3_finalize(res);
@@ -1045,21 +1028,15 @@ void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused)
if (unlikely(!node_id))
return;
- struct aclk_sync_host_config *wc = NULL;
+ struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
- if (unlikely(!host))
- return;
-
- wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
- if (unlikely(!wc)) {
- netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
- return;
+ if (unlikely(!host || !(wc = host->aclk_config)))
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
+ else {
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
+ wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
}
-
- netdata_log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
-
- wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
}
typedef struct active_alerts {
@@ -1068,15 +1045,14 @@ typedef struct active_alerts {
RRDCALC_STATUS status;
} active_alerts_t;
-static inline int compare_active_alerts(const void * a, const void * b) {
+static inline int compare_active_alerts(const void *a, const void *b)
+{
active_alerts_t *active_alerts_a = (active_alerts_t *)a;
active_alerts_t *active_alerts_b = (active_alerts_t *)b;
- if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) )
- {
- return strcmp(active_alerts_a->chart, active_alerts_b->chart);
- }
- else
+ if (!(strcmp(active_alerts_a->name, active_alerts_b->name))) {
+ return strcmp(active_alerts_a->chart, active_alerts_b->chart);
+ } else
return strcmp(active_alerts_a->name, active_alerts_b->name);
}
@@ -1084,16 +1060,16 @@ static inline int compare_active_alerts(const void * a, const void * b) {
void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
{
#ifdef ENABLE_ACLK
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (unlikely(!wc)) {
- netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
return;
}
if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) {
//postpone checkpoint send
- wc->alert_checkpoint_req+=3;
- netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
+ wc->alert_checkpoint_req += 3;
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
return;
}
@@ -1126,16 +1102,16 @@ void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
BUFFER *alarms_to_hash;
if (cnt) {
- qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
+ qsort(active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
alarms_to_hash = buffer_create(len, NULL);
- for (uint32_t i=0;i<cnt;i++) {
+ for (uint32_t i = 0; i < cnt; i++) {
buffer_strcat(alarms_to_hash, active_alerts[i].name);
buffer_strcat(alarms_to_hash, active_alerts[i].chart);
if (active_alerts[i].status == RRDCALC_STATUS_WARNING)
- buffer_strcat(alarms_to_hash, "W");
+ buffer_fast_strcat(alarms_to_hash, "W", 1);
else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL)
- buffer_strcat(alarms_to_hash, "C");
+ buffer_fast_strcat(alarms_to_hash, "C", 1);
}
} else {
alarms_to_hash = buffer_create(1, NULL);
@@ -1156,10 +1132,10 @@ void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
aclk_send_provide_alarm_checkpoint(&alarm_checkpoint);
freez(claim_id);
- netdata_log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
- } else {
- netdata_log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
- }
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
+ } else
+ nd_log(NDLS_ACCESS, NDLP_ERR, "ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
+
wc->alert_checkpoint_req = 0;
buffer_free(alarms_to_hash);
#endif
diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h
index c92aef083..cfb3468b9 100644
--- a/database/sqlite/sqlite_aclk_alert.h
+++ b/database/sqlite/sqlite_aclk_alert.h
@@ -15,9 +15,8 @@ struct proto_alert_status {
uint64_t last_submitted_sequence_id;
};
-void aclk_push_alert_event(struct aclk_sync_host_config *wc);
void aclk_send_alarm_configuration (char *config_hash);
-int aclk_push_alert_config_event(char *node_id, char *config_hash);
+void aclk_push_alert_config_event(char *node_id, char *config_hash);
void aclk_start_alert_streaming(char *node_id, bool resets);
void sql_queue_removed_alerts_to_aclk(RRDHOST *host);
void sql_process_queue_removed_alerts_to_aclk(char *node_id);
diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c
index 82927854a..dcc8c375c 100644
--- a/database/sqlite/sqlite_aclk_node.c
+++ b/database/sqlite/sqlite_aclk_node.c
@@ -7,17 +7,16 @@
#include "../../aclk/aclk_capas.h"
#ifdef ENABLE_ACLK
+
DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) {
RRDSET *st;
char name[500];
- rrdset_foreach_read(st, host) {
+ rrdset_foreach_read(st, host)
+ {
if (rrdset_is_available_for_viewers(st)) {
- struct collector_info col = {
- .plugin = rrdset_plugin_name(st),
- .module = rrdset_module_name(st)
- };
- snprintfz(name, 499, "%s:%s", col.plugin, col.module);
+ struct collector_info col = {.plugin = rrdset_plugin_name(st), .module = rrdset_module_name(st)};
+ snprintfz(name, sizeof(name) - 1, "%s:%s", col.plugin, col.module);
dictionary_set(dict, name, &col, sizeof(struct collector_info));
}
}
@@ -26,17 +25,9 @@ DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) {
return dict;
}
-static void build_node_collectors(char *node_id __maybe_unused)
+static void build_node_collectors(RRDHOST *host)
{
-
- RRDHOST *host = find_host_by_node_id(node_id);
-
- if (unlikely(!host))
- return;
-
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
- if (unlikely(!wc))
- return;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
struct update_node_collectors upd_node_collectors;
DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED);
@@ -50,45 +41,33 @@ static void build_node_collectors(char *node_id __maybe_unused)
dictionary_destroy(dict);
freez(upd_node_collectors.claim_id);
- netdata_log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", node_id, rrdhost_hostname(host));
-
- freez(node_id);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(host));
}
-static void build_node_info(char *node_id __maybe_unused)
+static void build_node_info(RRDHOST *host)
{
struct update_node_info node_info;
- RRDHOST *host = find_host_by_node_id(node_id);
-
- if (unlikely((!host))) {
- freez(node_id);
- return;
- }
-
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
-
- if (unlikely(!wc)) {
- freez(node_id);
- return;
- }
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
rrd_rdlock();
node_info.node_id = wc->node_id;
node_info.claim_id = get_agent_claimid();
node_info.machine_guid = host->machine_guid;
- node_info.child = (wc->host != localhost);
+ node_info.child = (host != localhost);
node_info.ml_info.ml_capable = ml_capable();
- node_info.ml_info.ml_enabled = ml_enabled(wc->host);
+ node_info.ml_info.ml_enabled = ml_enabled(host);
- node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host);
+ node_info.node_instance_capabilities = aclk_get_node_instance_capas(host);
now_realtime_timeval(&node_info.updated_at);
char *host_version = NULL;
if (host != localhost) {
netdata_mutex_lock(&host->receiver_lock);
- host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : rrdhost_program_version(host));
+ host_version = strdupz(
+ host->receiver && host->receiver->program_version ? host->receiver->program_version :
+ rrdhost_program_version(host));
netdata_mutex_unlock(&host->receiver_lock);
}
@@ -112,10 +91,11 @@ static void build_node_info(char *node_id __maybe_unused)
node_info.data.machine_guid = host->machine_guid;
struct capability node_caps[] = {
- { .name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled },
- { .name = "mc", .version = host->system_info->mc_version ? host->system_info->mc_version : 0, .enabled = host->system_info->mc_version ? 1 : 0 },
- { .name = NULL, .version = 0, .enabled = 0 }
- };
+ {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled},
+ {.name = "mc",
+ .version = host->system_info->mc_version ? host->system_info->mc_version : 0,
+ .enabled = host->system_info->mc_version ? 1 : 0},
+ {.name = NULL, .version = 0, .enabled = 0}};
node_info.node_capabilities = node_caps;
node_info.data.ml_info.ml_capable = host->system_info->ml_capable;
@@ -124,7 +104,14 @@ static void build_node_info(char *node_id __maybe_unused)
node_info.data.host_labels_ptr = host->rrdlabels;
aclk_update_node_info(&node_info);
- netdata_log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), host->machine_guid, wc->host == localhost ? "parent" : "child");
+ nd_log(
+ NDLS_ACCESS,
+ NDLP_DEBUG,
+ "ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)",
+ wc->node_id,
+ rrdhost_hostname(host),
+ host->machine_guid,
+ host == localhost ? "parent" : "child");
rrd_unlock();
freez(node_info.claim_id);
@@ -132,10 +119,21 @@ static void build_node_info(char *node_id __maybe_unused)
freez(host_version);
wc->node_collectors_send = now_realtime_sec();
- freez(node_id);
-
}
+static bool host_is_replicating(RRDHOST *host)
+{
+ bool replicating = false;
+ RRDSET *st;
+ rrdset_foreach_reentrant(st, host) {
+ if (rrdset_is_replicating(st)) {
+ replicating = true;
+ break;
+ }
+ }
+ rrdset_foreach_done(st);
+ return replicating;
+}
void aclk_check_node_info_and_collectors(void)
{
@@ -144,35 +142,59 @@ void aclk_check_node_info_and_collectors(void)
if (unlikely(!aclk_connected))
return;
- size_t pending = 0;
- dfe_start_reentrant(rrdhost_root_index, host) {
+ size_t context_loading = 0;
+ size_t replicating = 0;
+ size_t context_pp = 0;
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ time_t now = now_realtime_sec();
+ dfe_start_reentrant(rrdhost_root_index, host)
+ {
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (unlikely(!wc))
continue;
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host));
- pending++;
+ context_loading++;
continue;
}
- if (wc->node_info_send_time && wc->node_info_send_time + 30 < now_realtime_sec()) {
+ if (unlikely(host_is_replicating(host))) {
+ internal_error(true, "ACLK SYNC: Host %s is still replicating", rrdhost_hostname(host));
+ replicating++;
+ continue;
+ }
+
+ bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue));
+
+ if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send))
+ context_pp++;
+
+ if (pp_queue_empty && wc->node_info_send_time && wc->node_info_send_time + 30 < now) {
wc->node_info_send_time = 0;
- build_node_info(strdupz(wc->node_id));
+ build_node_info(host);
internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host));
}
- if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) {
- build_node_collectors(strdupz(wc->node_id));
+ if (pp_queue_empty && wc->node_collectors_send && wc->node_collectors_send + 30 < now) {
+ build_node_collectors(host);
internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host));
wc->node_collectors_send = 0;
}
}
dfe_done(host);
- if(pending)
- netdata_log_info("ACLK: %zu nodes are pending for contexts to load, skipped sending node info for them", pending);
+ if (context_loading || replicating || context_pp) {
+ nd_log_limit_static_thread_var(erl, 10, 100 * USEC_PER_MS);
+ nd_log_limit(
+ &erl,
+ NDLS_DAEMON,
+ NDLP_INFO,
+ "%zu nodes loading contexts, %zu replicating data, %zu pending context post processing",
+ context_loading,
+ replicating,
+ context_pp);
+ }
}
#endif
diff --git a/database/sqlite/sqlite_context.c b/database/sqlite/sqlite_context.c
index d4b15e99d..26ed8a96a 100644
--- a/database/sqlite/sqlite_context.c
+++ b/database/sqlite/sqlite_context.c
@@ -7,16 +7,16 @@
#define DB_CONTEXT_METADATA_VERSION 1
const char *database_context_config[] = {
- "CREATE TABLE IF NOT EXISTS context (host_id BLOB, id TEXT NOT NULL, version INT NOT NULL, title TEXT NOT NULL, " \
+ "CREATE TABLE IF NOT EXISTS context (host_id BLOB, id TEXT NOT NULL, version INT NOT NULL, title TEXT NOT NULL, "
"chart_type TEXT NOT NULL, unit TEXT NOT NULL, priority INT NOT NULL, first_time_t INT NOT NULL, "
"last_time_t INT NOT NULL, deleted INT NOT NULL, "
- "family TEXT, PRIMARY KEY (host_id, id));",
+ "family TEXT, PRIMARY KEY (host_id, id))",
NULL
};
const char *database_context_cleanup[] = {
- "VACUUM;",
+ "VACUUM",
NULL
};
@@ -31,7 +31,7 @@ int sql_init_context_database(int memory)
int rc;
if (likely(!memory))
- snprintfz(sqlite_database, FILENAME_MAX, "%s/context-meta.db", netdata_configured_cache_dir);
+ snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/context-meta.db", netdata_configured_cache_dir);
else
strcpy(sqlite_database, ":memory:");
@@ -56,9 +56,9 @@ int sql_init_context_database(int memory)
return 1;
if (likely(!memory))
- snprintfz(buf, 1024, "ATTACH DATABASE \"%s/netdata-meta.db\" as meta;", netdata_configured_cache_dir);
+ snprintfz(buf, sizeof(buf) - 1, "ATTACH DATABASE \"%s/netdata-meta.db\" as meta", netdata_configured_cache_dir);
else
- snprintfz(buf, 1024, "ATTACH DATABASE ':memory:' as meta;");
+ snprintfz(buf, sizeof(buf) - 1, "ATTACH DATABASE ':memory:' as meta");
if(init_database_batch(db_context_meta, list)) return 1;
@@ -92,7 +92,7 @@ void sql_close_context_database(void)
// Fetching data
//
#define CTX_GET_CHART_LIST "SELECT c.chart_id, c.type||'.'||c.id, c.name, c.context, c.title, c.unit, c.priority, " \
- "c.update_every, c.chart_type, c.family FROM meta.chart c WHERE c.host_id = @host_id and c.chart_id is not null; "
+ "c.update_every, c.chart_type, c.family FROM chart c WHERE c.host_id = @host_id AND c.chart_id IS NOT NULL"
void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, void *), void *data)
{
@@ -105,7 +105,7 @@ void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, voi
}
if (unlikely(!res)) {
- rc = prepare_statement(db_context_meta, CTX_GET_CHART_LIST, &res);
+ rc = prepare_statement(db_meta, CTX_GET_CHART_LIST, &res);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to fetch chart list");
return;
@@ -141,14 +141,14 @@ skip_load:
// Dimension list
#define CTX_GET_DIMENSION_LIST "SELECT d.dim_id, d.id, d.name, CASE WHEN INSTR(d.options,\"hidden\") > 0 THEN 1 ELSE 0 END " \
- "FROM meta.dimension d WHERE d.chart_id = @id and d.dim_id is not null ORDER BY d.rowid ASC;"
+ "FROM dimension d WHERE d.chart_id = @id AND d.dim_id IS NOT NULL ORDER BY d.rowid ASC"
void ctx_get_dimension_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_DIMENSION_DATA *, void *), void *data)
{
int rc;
static __thread sqlite3_stmt *res = NULL;
if (unlikely(!res)) {
- rc = prepare_statement(db_context_meta, CTX_GET_DIMENSION_LIST, &res);
+ rc = prepare_statement(db_meta, CTX_GET_DIMENSION_LIST, &res);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to fetch chart dimension data");
return;
@@ -178,7 +178,8 @@ failed:
}
// LABEL LIST
-#define CTX_GET_LABEL_LIST "SELECT l.label_key, l.label_value, l.source_type FROM meta.chart_label l WHERE l.chart_id = @id;"
+#define CTX_GET_LABEL_LIST "SELECT l.label_key, l.label_value, l.source_type FROM meta.chart_label l WHERE l.chart_id = @id"
+
void ctx_get_label_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_CLABEL_DATA *, void *), void *data)
{
int rc;
@@ -215,7 +216,8 @@ failed:
// CONTEXT LIST
#define CTX_GET_CONTEXT_LIST "SELECT id, version, title, chart_type, unit, priority, first_time_t, " \
- "last_time_t, deleted, family FROM context c WHERE c.host_id = @host_id;"
+ "last_time_t, deleted, family FROM context c WHERE c.host_id = @host_id"
+
void ctx_get_context_list(uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEXT_DATA *, void *), void *data)
{
@@ -266,9 +268,10 @@ failed:
//
// Storing Data
//
-#define CTX_STORE_CONTEXT "INSERT OR REPLACE INTO context " \
- "(host_id, id, version, title, chart_type, unit, priority, first_time_t, last_time_t, deleted, family) " \
- "VALUES (@host_id, @context, @version, @title, @chart_type, @unit, @priority, @first_time_t, @last_time_t, @deleted, @family);"
+#define CTX_STORE_CONTEXT \
+ "INSERT OR REPLACE INTO context " \
+ "(host_id, id, version, title, chart_type, unit, priority, first_time_t, last_time_t, deleted, family) " \
+ "VALUES (@host_id, @context, @version, @title, @chart_type, @unit, @priority, @first_t, @last_t, @delete, @family)"
int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
{
@@ -292,7 +295,7 @@ int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
rc = bind_text_null(res, 2, context_data->id, 0);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind context to store details");
+ error_report("Failed to bind context to store context details");
goto skip_store;
}
@@ -304,19 +307,19 @@ int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
rc = bind_text_null(res, 4, context_data->title, 0);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind context to store details");
+ error_report("Failed to bind context to store context details");
goto skip_store;
}
rc = bind_text_null(res, 5, context_data->chart_type, 0);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind context to store details");
+ error_report("Failed to bind context to store context details");
goto skip_store;
}
rc = bind_text_null(res, 6, context_data->units, 0);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind context to store details");
+ error_report("Failed to bind context to store context details");
goto skip_store;
}
@@ -365,7 +368,7 @@ skip_store:
// Delete a context
-#define CTX_DELETE_CONTEXT "DELETE FROM context WHERE host_id = @host_id AND id = @context;"
+#define CTX_DELETE_CONTEXT "DELETE FROM context WHERE host_id = @host_id AND id = @context"
int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
{
int rc, rc_stored = 1;
@@ -382,13 +385,13 @@ int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id to delete context data");
+ error_report("Failed to bind host_id for context data deletion");
goto skip_delete;
}
rc = sqlite3_bind_text(res, 2, context_data->id, -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind context id for data deletion");
+ error_report("Failed to bind context id for context data deletion");
goto skip_delete;
}
@@ -396,13 +399,6 @@ int ctx_delete_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
if (rc_stored != SQLITE_DONE)
error_report("Failed to delete context %s, rc = %d", context_data->id, rc_stored);
-#ifdef NETDATA_INTERNAL_CHECKS
- else {
- char host_uuid_str[UUID_STR_LEN];
- uuid_unparse_lower(*host_uuid, host_uuid_str);
- netdata_log_info("%s: Deleted context %s under host %s", __FUNCTION__, context_data->id, host_uuid_str);
- }
-#endif
skip_delete:
rc = sqlite3_finalize(res);
diff --git a/database/sqlite/sqlite_db_migration.c b/database/sqlite/sqlite_db_migration.c
index a011d0fef..29da6c249 100644
--- a/database/sqlite/sqlite_db_migration.c
+++ b/database/sqlite/sqlite_db_migration.c
@@ -7,7 +7,7 @@ static int return_int_cb(void *data, int argc, char **argv, char **column)
int *status = data;
UNUSED(argc);
UNUSED(column);
- *status = str2uint32_t(argv[0], NULL);
+ *status = (int) str2uint32_t(argv[0], NULL);
return 0;
}
@@ -18,7 +18,7 @@ static int get_auto_vaccum(sqlite3 *database)
int exists = 0;
- snprintf(sql, 127, "PRAGMA auto_vacuum");
+ snprintf(sql, sizeof(sql) - 1, "PRAGMA auto_vacuum");
int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &exists, &err_msg);
if (rc != SQLITE_OK) {
@@ -35,7 +35,7 @@ int db_table_count(sqlite3 *database)
char sql[128];
int count = 0;
- snprintf(sql, 127, "select count(1) from sqlite_schema where type = 'table'");
+ snprintf(sql, sizeof(sql) - 1, "select count(1) from sqlite_schema where type = 'table'");
int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &count, &err_msg);
if (rc != SQLITE_OK) {
netdata_log_info("Error checking database table count; %s", err_msg);
@@ -51,7 +51,7 @@ int table_exists_in_database(sqlite3 *database, const char *table)
int exists = 0;
- snprintf(sql, 127, "select 1 from sqlite_schema where type = 'table' and name = '%s';", table);
+ snprintf(sql, sizeof(sql) - 1, "select 1 from sqlite_schema where type = 'table' and name = '%s'", table);
int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &exists, &err_msg);
if (rc != SQLITE_OK) {
@@ -69,7 +69,7 @@ static int column_exists_in_table(sqlite3 *database, const char *table, const ch
int exists = 0;
- snprintf(sql, 127, "SELECT 1 FROM pragma_table_info('%s') where name = '%s';", table, column);
+ snprintf(sql, sizeof(sql) - 1, "SELECT 1 FROM pragma_table_info('%s') where name = '%s'", table, column);
int rc = sqlite3_exec_monitored(database, sql, return_int_cb, (void *) &exists, &err_msg);
if (rc != SQLITE_OK) {
@@ -92,64 +92,64 @@ static int get_database_user_version(sqlite3 *database)
}
const char *database_migrate_v1_v2[] = {
- "ALTER TABLE host ADD hops INTEGER NOT NULL DEFAULT 0;",
+ "ALTER TABLE host ADD hops INTEGER NOT NULL DEFAULT 0",
NULL
};
const char *database_migrate_v2_v3[] = {
- "ALTER TABLE host ADD memory_mode INT NOT NULL DEFAULT 0;",
- "ALTER TABLE host ADD abbrev_timezone TEXT NOT NULL DEFAULT '';",
- "ALTER TABLE host ADD utc_offset INT NOT NULL DEFAULT 0;",
- "ALTER TABLE host ADD program_name TEXT NOT NULL DEFAULT 'unknown';",
- "ALTER TABLE host ADD program_version TEXT NOT NULL DEFAULT 'unknown';",
- "ALTER TABLE host ADD entries INT NOT NULL DEFAULT 0;",
- "ALTER TABLE host ADD health_enabled INT NOT NULL DEFAULT 0;",
+ "ALTER TABLE host ADD memory_mode INT NOT NULL DEFAULT 0",
+ "ALTER TABLE host ADD abbrev_timezone TEXT NOT NULL DEFAULT ''",
+ "ALTER TABLE host ADD utc_offset INT NOT NULL DEFAULT 0",
+ "ALTER TABLE host ADD program_name TEXT NOT NULL DEFAULT 'unknown'",
+ "ALTER TABLE host ADD program_version TEXT NOT NULL DEFAULT 'unknown'",
+ "ALTER TABLE host ADD entries INT NOT NULL DEFAULT 0",
+ "ALTER TABLE host ADD health_enabled INT NOT NULL DEFAULT 0",
NULL
};
const char *database_migrate_v4_v5[] = {
- "DROP TABLE IF EXISTS chart_active;",
- "DROP TABLE IF EXISTS dimension_active;",
- "DROP TABLE IF EXISTS chart_hash;",
- "DROP TABLE IF EXISTS chart_hash_map;",
- "DROP VIEW IF EXISTS v_chart_hash;",
+ "DROP TABLE IF EXISTS chart_active",
+ "DROP TABLE IF EXISTS dimension_active",
+ "DROP TABLE IF EXISTS chart_hash",
+ "DROP TABLE IF EXISTS chart_hash_map",
+ "DROP VIEW IF EXISTS v_chart_hash",
NULL
};
const char *database_migrate_v5_v6[] = {
- "DROP TRIGGER IF EXISTS tr_dim_del;",
- "DROP TABLE IF EXISTS dimension_delete;",
+ "DROP TRIGGER IF EXISTS tr_dim_del",
+ "DROP TABLE IF EXISTS dimension_delete",
NULL
};
const char *database_migrate_v9_v10[] = {
- "ALTER TABLE alert_hash ADD chart_labels TEXT;",
+ "ALTER TABLE alert_hash ADD chart_labels TEXT",
NULL
};
const char *database_migrate_v10_v11[] = {
- "ALTER TABLE health_log ADD chart_name TEXT;",
+ "ALTER TABLE health_log ADD chart_name TEXT",
NULL
};
const char *database_migrate_v11_v12[] = {
- "ALTER TABLE health_log_detail ADD summary TEXT;",
- "ALTER TABLE alert_hash ADD summary TEXT;",
+ "ALTER TABLE health_log_detail ADD summary TEXT",
+ "ALTER TABLE alert_hash ADD summary TEXT",
NULL
};
const char *database_migrate_v12_v13_detail[] = {
- "ALTER TABLE health_log_detail ADD summary TEXT;",
+ "ALTER TABLE health_log_detail ADD summary TEXT",
NULL
};
const char *database_migrate_v12_v13_hash[] = {
- "ALTER TABLE alert_hash ADD summary TEXT;",
+ "ALTER TABLE alert_hash ADD summary TEXT",
NULL
};
const char *database_migrate_v13_v14[] = {
- "ALTER TABLE host ADD last_connected INT NOT NULL DEFAULT 0;",
+ "ALTER TABLE host ADD last_connected INT NOT NULL DEFAULT 0",
NULL
};
@@ -173,7 +173,7 @@ static int do_migration_v3_v4(sqlite3 *database)
int rc;
sqlite3_stmt *res = NULL;
- snprintfz(sql, 255, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%';");
+ snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%'");
rc = sqlite3_prepare_v2(database, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to alter health_log tables");
@@ -183,7 +183,7 @@ static int do_migration_v3_v4(sqlite3 *database)
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
char *table = strdupz((char *) sqlite3_column_text(res, 0));
if (!column_exists_in_table(database, table, "chart_context")) {
- snprintfz(sql, 255, "ALTER TABLE %s ADD chart_context text", table);
+ snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE %s ADD chart_context text", table);
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
}
freez(table);
@@ -212,7 +212,7 @@ static int do_migration_v6_v7(sqlite3 *database)
int rc;
sqlite3_stmt *res = NULL;
- snprintfz(sql, 255, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'aclk_alert_%%';");
+ snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'aclk_alert_%%'");
rc = sqlite3_prepare_v2(database, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to alter aclk_alert tables");
@@ -222,9 +222,9 @@ static int do_migration_v6_v7(sqlite3 *database)
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
char *table = strdupz((char *) sqlite3_column_text(res, 0));
if (!column_exists_in_table(database, table, "filtered_alert_unique_id")) {
- snprintfz(sql, 255, "ALTER TABLE %s ADD filtered_alert_unique_id", table);
+ snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE %s ADD filtered_alert_unique_id", table);
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 255, "UPDATE %s SET filtered_alert_unique_id = alert_unique_id", table);
+ snprintfz(sql, sizeof(sql) - 1, "UPDATE %s SET filtered_alert_unique_id = alert_unique_id", table);
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
}
freez(table);
@@ -243,7 +243,7 @@ static int do_migration_v7_v8(sqlite3 *database)
int rc;
sqlite3_stmt *res = NULL;
- snprintfz(sql, 255, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%';");
+ snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%'");
rc = sqlite3_prepare_v2(database, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to alter health_log tables");
@@ -253,7 +253,7 @@ static int do_migration_v7_v8(sqlite3 *database)
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
char *table = strdupz((char *) sqlite3_column_text(res, 0));
if (!column_exists_in_table(database, table, "transition_id")) {
- snprintfz(sql, 255, "ALTER TABLE %s ADD transition_id blob", table);
+ snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE %s ADD transition_id blob", table);
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
}
freez(table);
@@ -273,38 +273,38 @@ static int do_migration_v8_v9(sqlite3 *database)
sqlite3_stmt *res = NULL;
//create the health_log table and it's index
- snprintfz(sql, 2047, "CREATE TABLE IF NOT EXISTS health_log (health_log_id INTEGER PRIMARY KEY, host_id blob, alarm_id int, " \
+ snprintfz(sql, sizeof(sql) - 1, "CREATE TABLE IF NOT EXISTS health_log (health_log_id INTEGER PRIMARY KEY, host_id blob, alarm_id int, " \
"config_hash_id blob, name text, chart text, family text, recipient text, units text, exec text, " \
- "chart_context text, last_transition_id blob, UNIQUE (host_id, alarm_id)) ;");
+ "chart_context text, last_transition_id blob, UNIQUE (host_id, alarm_id))");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
//TODO indexes
- snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id);");
+ snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id)");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "CREATE TABLE IF NOT EXISTS health_log_detail (health_log_id int, unique_id int, alarm_id int, alarm_event_id int, " \
+ snprintfz(sql, sizeof(sql) - 1, "CREATE TABLE IF NOT EXISTS health_log_detail (health_log_id int, unique_id int, alarm_id int, alarm_event_id int, " \
"updated_by_id int, updates_id int, when_key int, duration int, non_clear_duration int, " \
"flags int, exec_run_timestamp int, delay_up_to_timestamp int, " \
"info text, exec_code int, new_status real, old_status real, delay int, " \
- "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text, host_id blob);");
+ "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text, host_id blob)");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_1 ON health_log_detail (unique_id);");
+ snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_1 ON health_log_detail (unique_id)");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id);");
+ snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id)");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id);");
+ snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id)");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS health_log_d_ind_4 ON health_log_detail (health_log_id);");
+ snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS health_log_d_ind_4 ON health_log_detail (health_log_id)");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "ALTER TABLE alert_hash ADD source text;");
+ snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE alert_hash ADD source text");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "CREATE INDEX IF NOT EXISTS alert_hash_index ON alert_hash (hash_id);");
+ snprintfz(sql, sizeof(sql) - 1, "CREATE INDEX IF NOT EXISTS alert_hash_index ON alert_hash (hash_id)");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
- snprintfz(sql, 2047, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%' AND name <> 'health_log_detail';");
+ snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'health_log_%%' AND name <> 'health_log_detail'");
rc = sqlite3_prepare_v2(database, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to alter health_log tables");
@@ -332,7 +332,7 @@ static int do_migration_v8_v9(sqlite3 *database)
dfe_done(table);
dictionary_destroy(dict_tables);
- snprintfz(sql, 2047, "ALTER TABLE health_log_detail DROP COLUMN host_id;");
+ snprintfz(sql, sizeof(sql) - 1, "ALTER TABLE health_log_detail DROP COLUMN host_id");
sqlite3_exec_monitored(database, sql, 0, 0, NULL);
return 0;
@@ -353,7 +353,7 @@ static int do_migration_v10_v11(sqlite3 *database)
return 0;
}
-#define MIGR_11_12_UPD_HEALTH_LOG_DETAIL "UPDATE health_log_detail SET summary = (select name from health_log where health_log_id = health_log_detail.health_log_id);"
+#define MIGR_11_12_UPD_HEALTH_LOG_DETAIL "UPDATE health_log_detail SET summary = (select name from health_log where health_log_id = health_log_detail.health_log_id)"
static int do_migration_v11_v12(sqlite3 *database)
{
int rc = 0;
@@ -368,6 +368,68 @@ static int do_migration_v11_v12(sqlite3 *database)
return rc;
}
+static int do_migration_v14_v15(sqlite3 *database)
+{
+ char sql[256];
+
+ int rc;
+ sqlite3_stmt *res = NULL;
+ snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type = \"index\" AND name LIKE \"aclk_alert_index@_%%\" ESCAPE \"@\"");
+ rc = sqlite3_prepare_v2(database, sql, -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to drop unused indices");
+ return 1;
+ }
+
+ BUFFER *wb = buffer_create(128, NULL);
+ size_t count = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ buffer_sprintf(wb, "DROP INDEX IF EXISTS %s; ", (char *)sqlite3_column_text(res, 0));
+ count++;
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when dropping unused indices, rc = %d", rc);
+
+ if (count)
+ (void) db_execute(database, buffer_tostring(wb));
+
+ buffer_free(wb);
+ return 0;
+}
+
+static int do_migration_v15_v16(sqlite3 *database)
+{
+ char sql[256];
+
+ int rc;
+ sqlite3_stmt *res = NULL;
+ snprintfz(sql, sizeof(sql) - 1, "SELECT name FROM sqlite_schema WHERE type = \"table\" AND name LIKE \"aclk_alert_%%\"");
+ rc = sqlite3_prepare_v2(database, sql, -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to drop unused indices");
+ return 1;
+ }
+
+ BUFFER *wb = buffer_create(128, NULL);
+ size_t count = 0;
+ while (sqlite3_step_monitored(res) == SQLITE_ROW) {
+ buffer_sprintf(wb, "ANALYZE %s ; ", (char *)sqlite3_column_text(res, 0));
+ count++;
+ }
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when running ANALYZE on aclk_alert_tables, rc = %d", rc);
+
+ if (count)
+ (void) db_execute(database, buffer_tostring(wb));
+
+ buffer_free(wb);
+ return 0;
+}
+
static int do_migration_v12_v13(sqlite3 *database)
{
int rc = 0;
@@ -425,7 +487,7 @@ static int migrate_database(sqlite3 *database, int target_version, char *db_name
int user_version = 0;
char *err_msg = NULL;
- int rc = sqlite3_exec_monitored(database, "PRAGMA user_version;", return_int_cb, (void *) &user_version, &err_msg);
+ int rc = sqlite3_exec_monitored(database, "PRAGMA user_version", return_int_cb, (void *) &user_version, &err_msg);
if (rc != SQLITE_OK) {
netdata_log_info("Error checking the %s database version; %s", db_name, err_msg);
sqlite3_free(err_msg);
@@ -446,7 +508,6 @@ static int migrate_database(sqlite3 *database, int target_version, char *db_name
}
}
return target_version;
-
}
DATABASE_FUNC_MIGRATION_LIST migration_action[] = {
@@ -464,6 +525,8 @@ DATABASE_FUNC_MIGRATION_LIST migration_action[] = {
{.name = "v11 to v12", .func = do_migration_v11_v12},
{.name = "v12 to v13", .func = do_migration_v12_v13},
{.name = "v13 to v14", .func = do_migration_v13_v14},
+ {.name = "v14 to v15", .func = do_migration_v14_v15},
+ {.name = "v15 to v16", .func = do_migration_v15_v16},
// the terminator of this array
{.name = NULL, .func = NULL}
};
diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c
index 393d6a238..e3537bf5a 100644
--- a/database/sqlite/sqlite_functions.c
+++ b/database/sqlite/sqlite_functions.c
@@ -4,7 +4,7 @@
#include "sqlite3recover.h"
#include "sqlite_db_migration.h"
-#define DB_METADATA_VERSION 14
+#define DB_METADATA_VERSION 16
const char *database_config[] = {
"CREATE TABLE IF NOT EXISTS host(host_id BLOB PRIMARY KEY, hostname TEXT NOT NULL, "
@@ -14,70 +14,77 @@ const char *database_config[] = {
"memory_mode INT DEFAULT 0, abbrev_timezone TEXT DEFAULT '', utc_offset INT NOT NULL DEFAULT 0,"
"program_name TEXT NOT NULL DEFAULT 'unknown', program_version TEXT NOT NULL DEFAULT 'unknown', "
"entries INT NOT NULL DEFAULT 0,"
- "health_enabled INT NOT NULL DEFAULT 0, last_connected INT NOT NULL DEFAULT 0);",
+ "health_enabled INT NOT NULL DEFAULT 0, last_connected INT NOT NULL DEFAULT 0)",
"CREATE TABLE IF NOT EXISTS chart(chart_id blob PRIMARY KEY, host_id blob, type text, id text, name text, "
"family text, context text, title text, unit text, plugin text, module text, priority int, update_every int, "
- "chart_type int, memory_mode int, history_entries);",
+ "chart_type int, memory_mode int, history_entries)",
+
"CREATE TABLE IF NOT EXISTS dimension(dim_id blob PRIMARY KEY, chart_id blob, id text, name text, "
- "multiplier int, divisor int , algorithm int, options text);",
+ "multiplier int, divisor int , algorithm int, options text)",
+
+ "CREATE TABLE IF NOT EXISTS metadata_migration(filename text, file_size, date_created int)",
+
+ "CREATE INDEX IF NOT EXISTS ind_d2 on dimension (chart_id)",
+
+ "CREATE INDEX IF NOT EXISTS ind_c3 on chart (host_id)",
- "CREATE TABLE IF NOT EXISTS metadata_migration(filename text, file_size, date_created int);",
- "CREATE INDEX IF NOT EXISTS ind_d2 on dimension (chart_id);",
- "CREATE INDEX IF NOT EXISTS ind_c3 on chart (host_id);",
"CREATE TABLE IF NOT EXISTS chart_label(chart_id blob, source_type int, label_key text, "
- "label_value text, date_created int, PRIMARY KEY (chart_id, label_key));",
- "CREATE TABLE IF NOT EXISTS node_instance (host_id blob PRIMARY KEY, claim_id, node_id, date_created);",
+ "label_value text, date_created int, PRIMARY KEY (chart_id, label_key))",
+
+ "CREATE TABLE IF NOT EXISTS node_instance (host_id blob PRIMARY KEY, claim_id, node_id, date_created)",
+
"CREATE TABLE IF NOT EXISTS alert_hash(hash_id blob PRIMARY KEY, date_updated int, alarm text, template text, "
"on_key text, class text, component text, type text, os text, hosts text, lookup text, "
"every text, units text, calc text, families text, plugin text, module text, charts text, green text, "
"red text, warn text, crit text, exec text, to_key text, info text, delay text, options text, "
"repeat text, host_labels text, p_db_lookup_dimensions text, p_db_lookup_method text, p_db_lookup_options int, "
- "p_db_lookup_after int, p_db_lookup_before int, p_update_every int, source text, chart_labels text, summary text);",
+ "p_db_lookup_after int, p_db_lookup_before int, p_update_every int, source text, chart_labels text, summary text)",
"CREATE TABLE IF NOT EXISTS host_info(host_id blob, system_key text NOT NULL, system_value text NOT NULL, "
- "date_created INT, PRIMARY KEY(host_id, system_key));",
+ "date_created INT, PRIMARY KEY(host_id, system_key))",
"CREATE TABLE IF NOT EXISTS host_label(host_id blob, source_type int, label_key text NOT NULL, "
- "label_value text NOT NULL, date_created INT, PRIMARY KEY (host_id, label_key));",
+ "label_value text NOT NULL, date_created INT, PRIMARY KEY (host_id, label_key))",
"CREATE TRIGGER IF NOT EXISTS ins_host AFTER INSERT ON host BEGIN INSERT INTO node_instance (host_id, date_created)"
- " SELECT new.host_id, unixepoch() WHERE new.host_id NOT IN (SELECT host_id FROM node_instance); END;",
+ " SELECT new.host_id, unixepoch() WHERE new.host_id NOT IN (SELECT host_id FROM node_instance); END",
"CREATE TABLE IF NOT EXISTS health_log (health_log_id INTEGER PRIMARY KEY, host_id blob, alarm_id int, "
"config_hash_id blob, name text, chart text, family text, recipient text, units text, exec text, "
- "chart_context text, last_transition_id blob, chart_name text, UNIQUE (host_id, alarm_id)) ;",
+ "chart_context text, last_transition_id blob, chart_name text, UNIQUE (host_id, alarm_id))",
- "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id);",
+ "CREATE INDEX IF NOT EXISTS health_log_ind_1 ON health_log (host_id)",
"CREATE TABLE IF NOT EXISTS health_log_detail (health_log_id int, unique_id int, alarm_id int, alarm_event_id int, "
"updated_by_id int, updates_id int, when_key int, duration int, non_clear_duration int, "
"flags int, exec_run_timestamp int, delay_up_to_timestamp int, "
"info text, exec_code int, new_status real, old_status real, delay int, "
- "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text);",
+ "new_value double, old_value double, last_repeat int, transition_id blob, global_id int, summary text)",
- "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id);",
- "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id);",
- "CREATE INDEX IF NOT EXISTS health_log_d_ind_5 ON health_log_detail (health_log_id, unique_id DESC);",
+ "CREATE INDEX IF NOT EXISTS health_log_d_ind_2 ON health_log_detail (global_id)",
+ "CREATE INDEX IF NOT EXISTS health_log_d_ind_3 ON health_log_detail (transition_id)",
+ "CREATE INDEX IF NOT EXISTS health_log_d_ind_9 ON health_log_detail (unique_id DESC, health_log_id)",
"CREATE INDEX IF NOT EXISTS health_log_d_ind_6 on health_log_detail (health_log_id, when_key)",
- "CREATE INDEX IF NOT EXISTS health_log_d_ind_7 on health_log_detail (alarm_id);",
- "CREATE INDEX IF NOT EXISTS health_log_d_ind_8 on health_log_detail (new_status, updated_by_id);",
+ "CREATE INDEX IF NOT EXISTS health_log_d_ind_7 on health_log_detail (alarm_id)",
+ "CREATE INDEX IF NOT EXISTS health_log_d_ind_8 on health_log_detail (new_status, updated_by_id)",
NULL
};
const char *database_cleanup[] = {
- "DELETE FROM host WHERE host_id NOT IN (SELECT host_id FROM chart);",
- "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);",
- "DELETE FROM host_info WHERE host_id NOT IN (SELECT host_id FROM host);",
- "DELETE FROM host_label WHERE host_id NOT IN (SELECT host_id FROM host);",
- "DROP TRIGGER IF EXISTS tr_dim_del;",
- "DROP INDEX IF EXISTS ind_d1;",
- "DROP INDEX IF EXISTS ind_c1;",
- "DROP INDEX IF EXISTS ind_c2;",
- "DROP INDEX IF EXISTS alert_hash_index;",
- "DROP INDEX IF EXISTS health_log_d_ind_4;",
- "DROP INDEX IF EXISTS health_log_d_ind_1;",
+ "DELETE FROM host WHERE host_id NOT IN (SELECT host_id FROM chart)",
+ "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host)",
+ "DELETE FROM host_info WHERE host_id NOT IN (SELECT host_id FROM host)",
+ "DELETE FROM host_label WHERE host_id NOT IN (SELECT host_id FROM host)",
+ "DROP TRIGGER IF EXISTS tr_dim_del",
+ "DROP INDEX IF EXISTS ind_d1",
+ "DROP INDEX IF EXISTS ind_c1",
+ "DROP INDEX IF EXISTS ind_c2",
+ "DROP INDEX IF EXISTS alert_hash_index",
+ "DROP INDEX IF EXISTS health_log_d_ind_4",
+ "DROP INDEX IF EXISTS health_log_d_ind_1",
+ "DROP INDEX IF EXISTS health_log_d_ind_5",
NULL
};
@@ -202,42 +209,42 @@ int configure_sqlite_database(sqlite3 *database, int target_version)
// https://www.sqlite.org/pragma.html#pragma_auto_vacuum
// PRAGMA schema.auto_vacuum = 0 | NONE | 1 | FULL | 2 | INCREMENTAL;
- snprintfz(buf, 1024, "PRAGMA auto_vacuum=%s;", config_get(CONFIG_SECTION_SQLITE, "auto vacuum", "INCREMENTAL"));
+ snprintfz(buf, sizeof(buf) - 1, "PRAGMA auto_vacuum=%s", config_get(CONFIG_SECTION_SQLITE, "auto vacuum", "INCREMENTAL"));
if (init_database_batch(database, list))
return 1;
// https://www.sqlite.org/pragma.html#pragma_synchronous
// PRAGMA schema.synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL | 3 | EXTRA;
- snprintfz(buf, 1024, "PRAGMA synchronous=%s;", config_get(CONFIG_SECTION_SQLITE, "synchronous", "NORMAL"));
+ snprintfz(buf, sizeof(buf) - 1, "PRAGMA synchronous=%s", config_get(CONFIG_SECTION_SQLITE, "synchronous", "NORMAL"));
if (init_database_batch(database, list))
return 1;
// https://www.sqlite.org/pragma.html#pragma_journal_mode
// PRAGMA schema.journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF
- snprintfz(buf, 1024, "PRAGMA journal_mode=%s;", config_get(CONFIG_SECTION_SQLITE, "journal mode", "WAL"));
+ snprintfz(buf, sizeof(buf) - 1, "PRAGMA journal_mode=%s", config_get(CONFIG_SECTION_SQLITE, "journal mode", "WAL"));
if (init_database_batch(database, list))
return 1;
// https://www.sqlite.org/pragma.html#pragma_temp_store
// PRAGMA temp_store = 0 | DEFAULT | 1 | FILE | 2 | MEMORY;
- snprintfz(buf, 1024, "PRAGMA temp_store=%s;", config_get(CONFIG_SECTION_SQLITE, "temp store", "MEMORY"));
+ snprintfz(buf, sizeof(buf) - 1, "PRAGMA temp_store=%s", config_get(CONFIG_SECTION_SQLITE, "temp store", "MEMORY"));
if (init_database_batch(database, list))
return 1;
// https://www.sqlite.org/pragma.html#pragma_journal_size_limit
// PRAGMA schema.journal_size_limit = N ;
- snprintfz(buf, 1024, "PRAGMA journal_size_limit=%lld;", config_get_number(CONFIG_SECTION_SQLITE, "journal size limit", 16777216));
+ snprintfz(buf, sizeof(buf) - 1, "PRAGMA journal_size_limit=%lld", config_get_number(CONFIG_SECTION_SQLITE, "journal size limit", 16777216));
if (init_database_batch(database, list))
return 1;
// https://www.sqlite.org/pragma.html#pragma_cache_size
// PRAGMA schema.cache_size = pages;
// PRAGMA schema.cache_size = -kibibytes;
- snprintfz(buf, 1024, "PRAGMA cache_size=%lld;", config_get_number(CONFIG_SECTION_SQLITE, "cache size", -2000));
+ snprintfz(buf, sizeof(buf) - 1, "PRAGMA cache_size=%lld", config_get_number(CONFIG_SECTION_SQLITE, "cache size", -2000));
if (init_database_batch(database, list))
return 1;
- snprintfz(buf, 1024, "PRAGMA user_version=%d;", target_version);
+ snprintfz(buf, sizeof(buf) - 1, "PRAGMA user_version=%d", target_version);
if (init_database_batch(database, list))
return 1;
@@ -384,13 +391,13 @@ int sql_init_database(db_check_action_type_t rebuild, int memory)
int rc;
if (likely(!memory)) {
- snprintfz(sqlite_database, FILENAME_MAX, "%s/.netdata-meta.db.recover", netdata_configured_cache_dir);
+ snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/.netdata-meta.db.recover", netdata_configured_cache_dir);
rc = unlink(sqlite_database);
snprintfz(sqlite_database, FILENAME_MAX, "%s/netdata-meta.db", netdata_configured_cache_dir);
if (rc == 0 || (rebuild & DB_CHECK_RECOVER)) {
char new_sqlite_database[FILENAME_MAX + 1];
- snprintfz(new_sqlite_database, FILENAME_MAX, "%s/netdata-meta-recover.db", netdata_configured_cache_dir);
+ snprintfz(new_sqlite_database, sizeof(new_sqlite_database) - 1, "%s/netdata-meta-recover.db", netdata_configured_cache_dir);
recover_database(sqlite_database, new_sqlite_database);
if (rebuild & DB_CHECK_RECOVER)
return 0;
@@ -410,7 +417,7 @@ int sql_init_database(db_check_action_type_t rebuild, int memory)
if (rebuild & DB_CHECK_RECLAIM_SPACE) {
netdata_log_info("Reclaiming space of %s", sqlite_database);
- rc = sqlite3_exec_monitored(db_meta, "VACUUM;", 0, 0, &err_msg);
+ rc = sqlite3_exec_monitored(db_meta, "VACUUM", 0, 0, &err_msg);
if (rc != SQLITE_OK) {
error_report("Failed to execute VACUUM rc = %d (%s)", rc, err_msg);
sqlite3_free(err_msg);
@@ -422,6 +429,20 @@ int sql_init_database(db_check_action_type_t rebuild, int memory)
return 1;
}
+ if (rebuild & DB_CHECK_ANALYZE) {
+ netdata_log_info("Running ANALYZE on %s", sqlite_database);
+ rc = sqlite3_exec_monitored(db_meta, "ANALYZE", 0, 0, &err_msg);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to execute ANALYZE rc = %d (%s)", rc, err_msg);
+ sqlite3_free(err_msg);
+ }
+ else {
+ (void) db_execute(db_meta, "select count(*) from sqlite_master limit 0");
+ (void) sqlite3_close(db_meta);
+ }
+ return 1;
+ }
+
netdata_log_info("SQLite database %s initialization", sqlite_database);
rc = sqlite3_create_function(db_meta, "u2h", 1, SQLITE_ANY | SQLITE_DETERMINISTIC, 0, sqlite_uuid_parse, 0, 0);
@@ -471,6 +492,9 @@ void sql_close_database(void)
add_stmt_to_list(NULL);
+ (void) db_execute(db_meta, "PRAGMA analysis_limit=10000");
+ (void) db_execute(db_meta, "PRAGMA optimize");
+
rc = sqlite3_close_v2(db_meta);
if (unlikely(rc != SQLITE_OK))
error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc));
@@ -511,22 +535,25 @@ int db_execute(sqlite3 *db, const char *cmd)
{
int rc;
int cnt = 0;
+
while (cnt < SQL_MAX_RETRY) {
char *err_msg;
rc = sqlite3_exec_monitored(db, cmd, 0, 0, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Failed to execute '%s', rc = %d (%s) -- attempt %d", cmd, rc, err_msg, cnt);
- sqlite3_free(err_msg);
- if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) {
- usleep(SQLITE_INSERT_DELAY * USEC_PER_MS);
- }
- else
- break;
- }
- else
+ if (likely(rc == SQLITE_OK))
break;
++cnt;
+ error_report("Failed to execute '%s', rc = %d (%s) -- attempt %d", cmd, rc, err_msg, cnt);
+ sqlite3_free(err_msg);
+
+ if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) {
+ usleep(SQLITE_INSERT_DELAY * USEC_PER_MS);
+ continue;
+ }
+
+ if (rc == SQLITE_CORRUPT)
+ mark_database_to_recover(NULL, db);
+ break;
}
return (rc != SQLITE_OK);
}
@@ -542,7 +569,7 @@ static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id)
return;
}
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (unlikely(!host->node_id)) {
uuid_t *t = mallocz(sizeof(*host->node_id));
@@ -559,7 +586,7 @@ static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id)
uuid_unparse_lower(*node_id, wc->node_id);
}
-#define SQL_UPDATE_NODE_ID "update node_instance set node_id = @node_id where host_id = @host_id;"
+#define SQL_UPDATE_NODE_ID "UPDATE node_instance SET node_id = @node_id WHERE host_id = @host_id"
int update_node_id(uuid_t *host_id, uuid_t *node_id)
{
@@ -611,7 +638,7 @@ failed:
return rc - 1;
}
-#define SQL_SELECT_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id AND node_id IS NOT NULL;"
+#define SQL_SELECT_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id AND node_id IS NOT NULL"
int get_node_id(uuid_t *host_id, uuid_t *node_id)
{
@@ -647,8 +674,9 @@ failed:
return (rc == SQLITE_ROW) ? 0 : -1;
}
-#define SQL_INVALIDATE_NODE_INSTANCES "UPDATE node_instance SET node_id = NULL WHERE EXISTS " \
- "(SELECT host_id FROM node_instance WHERE host_id = @host_id AND (@claim_id IS NULL OR claim_id <> @claim_id));"
+#define SQL_INVALIDATE_NODE_INSTANCES \
+ "UPDATE node_instance SET node_id = NULL WHERE EXISTS " \
+ "(SELECT host_id FROM node_instance WHERE host_id = @host_id AND (@claim_id IS NULL OR claim_id <> @claim_id))"
void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id)
{
@@ -692,8 +720,9 @@ failed:
error_report("Failed to finalize the prepared statement when invalidating node instance information");
}
-#define SQL_GET_NODE_INSTANCE_LIST "SELECT ni.node_id, ni.host_id, h.hostname " \
- "FROM node_instance ni, host h WHERE ni.host_id = h.host_id AND h.hops >=0;"
+#define SQL_GET_NODE_INSTANCE_LIST \
+ "SELECT ni.node_id, ni.host_id, h.hostname " \
+ "FROM node_instance ni, host h WHERE ni.host_id = h.host_id AND h.hops >=0"
struct node_instance_list *get_node_list(void)
{
@@ -762,7 +791,7 @@ failed:
return node_list;
};
-#define SQL_GET_HOST_NODE_ID "select node_id from node_instance where host_id = @host_id;"
+#define SQL_GET_HOST_NODE_ID "SELECT node_id FROM node_instance WHERE host_id = @host_id"
void sql_load_node_id(RRDHOST *host)
{
@@ -801,7 +830,7 @@ failed:
};
-#define SELECT_HOST_INFO "SELECT system_key, system_value FROM host_info WHERE host_id = @host_id;"
+#define SELECT_HOST_INFO "SELECT system_key, system_value FROM host_info WHERE host_id = @host_id"
void sql_build_host_system_info(uuid_t *host_id, struct rrdhost_system_info *system_info)
{
@@ -832,7 +861,7 @@ skip:
}
#define SELECT_HOST_LABELS "SELECT label_key, label_value, source_type FROM host_label WHERE host_id = @host_id " \
- "AND label_key IS NOT NULL AND label_value IS NOT NULL;"
+ "AND label_key IS NOT NULL AND label_value IS NOT NULL"
RRDLABELS *sql_load_host_labels(uuid_t *host_id)
{
@@ -888,7 +917,7 @@ int sql_metadata_cache_stats(int op)
return count;
}
-#define SQL_DROP_TABLE "DROP table %s;"
+#define SQL_DROP_TABLE "DROP table %s"
void sql_drop_table(const char *table)
{
@@ -896,7 +925,7 @@ void sql_drop_table(const char *table)
return;
char wstr[255];
- snprintfz(wstr, 254, SQL_DROP_TABLE, table);
+ snprintfz(wstr, sizeof(wstr) - 1, SQL_DROP_TABLE, table);
int rc = sqlite3_exec_monitored(db_meta, wstr, 0, 0, NULL);
if (rc != SQLITE_OK) {
diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h
index 9cd1f7ad4..40b5af464 100644
--- a/database/sqlite/sqlite_functions.h
+++ b/database/sqlite/sqlite_functions.h
@@ -21,8 +21,9 @@ struct node_instance_list {
typedef enum db_check_action_type {
DB_CHECK_NONE = (1 << 0),
DB_CHECK_RECLAIM_SPACE = (1 << 1),
- DB_CHECK_CONT = (1 << 2),
- DB_CHECK_RECOVER = (1 << 3),
+ DB_CHECK_ANALYZE = (1 << 2),
+ DB_CHECK_CONT = (1 << 3),
+ DB_CHECK_RECOVER = (1 << 4),
} db_check_action_type_t;
#define SQL_MAX_RETRY (100)
diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c
index 6fc6a2e64..7d79ff70b 100644
--- a/database/sqlite/sqlite_health.c
+++ b/database/sqlite/sqlite_health.c
@@ -95,18 +95,22 @@ failed:
/* Health related SQL queries
Inserts an entry in the table
*/
+
#define SQL_INSERT_HEALTH_LOG \
"INSERT INTO health_log (host_id, alarm_id, " \
- "config_hash_id, name, chart, exec, recipient, units, chart_context, last_transition_id, chart_name) " \
- "VALUES (?,?,?,?,?,?,?,?,?,?,?) " \
- "ON CONFLICT (host_id, alarm_id) DO UPDATE SET last_transition_id = excluded.last_transition_id, " \
- "chart_name = excluded.chart_name RETURNING health_log_id; "
-
-#define SQL_INSERT_HEALTH_LOG_DETAIL \
- "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, " \
- "updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, " \
+ "config_hash_id, name, chart, exec, recipient, units, chart_context, last_transition_id, chart_name) " \
+ "VALUES (@host_id,@alarm_id, @config_hash_id,@name,@chart,@exec,@recipient,@units,@chart_context," \
+ "@last_transition_id,@chart_name) ON CONFLICT (host_id, alarm_id) DO UPDATE " \
+ "SET last_transition_id = excluded.last_transition_id, chart_name = excluded.chart_name RETURNING health_log_id"
+
+#define SQL_INSERT_HEALTH_LOG_DETAIL \
+ "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, " \
+ "updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, " \
"info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \
- "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,@global_id,?); "
+ "VALUES (@health_log_id,@unique_id,@alarm_id,@alarm_event_id,@updated_by_id,@updates_id,@when_key,@duration," \
+ "@non_clear_duration,@flags,@exec_run_timestamp,@delay_up_to_timestamp, @info,@exec_code,@new_status,@old_status," \
+ "@delay,@new_value,@old_value,@last_repeat,@transition_id,@global_id,@summary)"
+
static void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) {
sqlite3_stmt *res = NULL;
int rc;
@@ -353,7 +357,6 @@ static void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) {
}
ae->flags |= HEALTH_ENTRY_FLAG_SAVED;
- host->health.health_log_entries_written++;
failed:
if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
@@ -374,48 +377,6 @@ void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae)
}
}
-/* Health related SQL queries
- Get a count of rows from health log table
-*/
-#define SQL_COUNT_HEALTH_LOG_DETAIL "SELECT count(1) FROM health_log_detail hld, health_log hl " \
- "where hl.host_id = @host_id and hl.health_log_id = hld.health_log_id"
-
-static int sql_health_alarm_log_count(RRDHOST *host) {
- sqlite3_stmt *res = NULL;
- int rc;
-
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
- error_report("Database has not been initialized");
- return -1;
- }
-
- int entries_in_db = -1;
-
- rc = sqlite3_prepare_v2(db_meta, SQL_COUNT_HEALTH_LOG_DETAIL, -1, &res, 0);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement to count health log entries from db");
- goto done;
- }
-
- rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id for SQL_COUNT_HEALTH_LOG.");
- goto done;
- }
-
- rc = sqlite3_step_monitored(res);
- if (likely(rc == SQLITE_ROW))
- entries_in_db = (int) sqlite3_column_int64(res, 0);
-
-done:
- rc = sqlite3_finalize(res);
- if (unlikely(rc != SQLITE_OK))
- error_report("Failed to finalize the prepared statement to count health log entries from db");
-
- return entries_in_db;
-}
-
/*
*
* Health related SQL queries
@@ -423,19 +384,22 @@ done:
*
*/
-#define SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED "DELETE FROM health_log_detail WHERE health_log_id IN " \
- "(SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < unixepoch() - @history " \
- "AND updated_by_id <> 0 AND transition_id NOT IN " \
- "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id);"
-
-#define SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(guid) "DELETE from health_log_detail WHERE unique_id NOT IN " \
- "(SELECT filtered_alert_unique_id FROM aclk_alert_%s) " \
- "AND unique_id IN (SELECT hld.unique_id FROM health_log hl, health_log_detail hld WHERE " \
- "hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id) " \
- "AND health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id) " \
- "AND when_key < unixepoch() - @history " \
- "AND updated_by_id <> 0 AND transition_id NOT IN " \
- "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id);", guid
+#define SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED \
+ "DELETE FROM health_log_detail WHERE health_log_id IN " \
+ "(SELECT health_log_id FROM health_log WHERE host_id = @host_id) AND when_key < UNIXEPOCH() - @history " \
+ "AND updated_by_id <> 0 AND transition_id NOT IN " \
+ "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)"
+
+#define SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(guid) \
+ "DELETE from health_log_detail WHERE unique_id NOT IN " \
+ "(SELECT filtered_alert_unique_id FROM aclk_alert_%s) " \
+ "AND unique_id IN (SELECT hld.unique_id FROM health_log hl, health_log_detail hld WHERE " \
+ "hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id) " \
+ "AND health_log_id IN (SELECT health_log_id FROM health_log WHERE host_id = @host_id) " \
+ "AND when_key < unixepoch() - @history " \
+ "AND updated_by_id <> 0 AND transition_id NOT IN " \
+ "(SELECT last_transition_id FROM health_log hl WHERE hl.host_id = @host_id)", \
+ guid
void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) {
sqlite3_stmt *res = NULL;
@@ -450,14 +414,14 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) {
char uuid_str[UUID_STR_LEN];
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- snprintfz(command, MAX_HEALTH_SQL_SIZE, "aclk_alert_%s", uuid_str);
+ snprintfz(command, sizeof(command) - 1, "aclk_alert_%s", uuid_str);
bool aclk_table_exists = table_exists_in_database(db_meta, command);
char *sql = SQL_CLEANUP_HEALTH_LOG_DETAIL_NOT_CLAIMED;
if (claimed && aclk_table_exists) {
- snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(uuid_str));
+ snprintfz(command, sizeof(command) - 1, SQL_CLEANUP_HEALTH_LOG_DETAIL_CLAIMED(uuid_str));
sql = command;
}
@@ -483,10 +447,6 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed) {
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to cleanup health log detail table, rc = %d", rc);
- int rows = sql_health_alarm_log_count(host);
- if (rows >= 0)
- host->health.health_log_entries_written = rows;
-
if (aclk_table_exists)
sql_aclk_alert_clean_dead_entries(host);
@@ -497,17 +457,25 @@ done:
}
#define SQL_INJECT_REMOVED \
- "insert into health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \
+ "INSERT INTO health_log_detail (health_log_id, unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, " \
"duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, " \
"delay, new_value, old_value, last_repeat, transition_id, global_id, summary) " \
- "select health_log_id, ?1, ?2, ?3, 0, ?4, unixepoch(), 0, 0, flags, exec_run_timestamp, unixepoch(), info, exec_code, -2, " \
- "new_status, delay, NULL, new_value, 0, ?5, now_usec(0), summary from health_log_detail where unique_id = ?6 and transition_id = ?7;"
-
-#define SQL_INJECT_REMOVED_UPDATE_DETAIL "update health_log_detail set flags = flags | ?1, updated_by_id = ?2 where unique_id = ?3 and transition_id = ?4;"
-
-#define SQL_INJECT_REMOVED_UPDATE_LOG "update health_log set last_transition_id = ?1 where alarm_id = ?2 and last_transition_id = ?3 and host_id = ?4;"
-
-void sql_inject_removed_status(RRDHOST *host, uint32_t alarm_id, uint32_t alarm_event_id, uint32_t unique_id, uint32_t max_unique_id, uuid_t *prev_transition_id)
+ "SELECT health_log_id, ?1, ?2, ?3, 0, ?4, UNIXEPOCH(), 0, 0, flags, exec_run_timestamp, UNIXEPOCH(), info, exec_code, -2, " \
+ "new_status, delay, NULL, new_value, 0, ?5, NOW_USEC(0), summary FROM health_log_detail WHERE unique_id = ?6 AND transition_id = ?7"
+
+#define SQL_INJECT_REMOVED_UPDATE_DETAIL \
+ "UPDATE health_log_detail SET flags = flags | ?1, updated_by_id = ?2 WHERE unique_id = ?3 AND transition_id = ?4"
+
+#define SQL_INJECT_REMOVED_UPDATE_LOG \
+ "UPDATE health_log SET last_transition_id = ?1 WHERE alarm_id = ?2 AND last_transition_id = ?3 AND host_id = ?4"
+
+void sql_inject_removed_status(
+ RRDHOST *host,
+ uint32_t alarm_id,
+ uint32_t alarm_event_id,
+ uint32_t unique_id,
+ uint32_t max_unique_id,
+ uuid_t *prev_transition_id)
{
int rc;
@@ -737,13 +705,14 @@ void sql_check_removed_alerts_state(RRDHOST *host)
/* Health related SQL queries
Load from the health log table
*/
-#define SQL_LOAD_HEALTH_LOG "SELECT hld.unique_id, hld.alarm_id, hld.alarm_event_id, hl.config_hash_id, hld.updated_by_id, " \
- "hld.updates_id, hld.when_key, hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, " \
- "hld.delay_up_to_timestamp, hl.name, hl.chart, hl.exec, hl.recipient, ah.source, hl.units, " \
- "hld.info, hld.exec_code, hld.new_status, hld.old_status, hld.delay, hld.new_value, hld.old_value, " \
- "hld.last_repeat, ah.class, ah.component, ah.type, hl.chart_context, hld.transition_id, hld.global_id, hl.chart_name, hld.summary " \
- "FROM health_log hl, alert_hash ah, health_log_detail hld " \
- "WHERE hl.config_hash_id = ah.hash_id and hl.host_id = @host_id and hl.last_transition_id = hld.transition_id;"
+#define SQL_LOAD_HEALTH_LOG \
+ "SELECT hld.unique_id, hld.alarm_id, hld.alarm_event_id, hl.config_hash_id, hld.updated_by_id, " \
+ "hld.updates_id, hld.when_key, hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, " \
+ "hld.delay_up_to_timestamp, hl.name, hl.chart, hl.exec, hl.recipient, ah.source, hl.units, " \
+ "hld.info, hld.exec_code, hld.new_status, hld.old_status, hld.delay, hld.new_value, hld.old_value, " \
+ "hld.last_repeat, ah.class, ah.component, ah.type, hl.chart_context, hld.transition_id, hld.global_id, " \
+ "hl.chart_name, hld.summary FROM health_log hl, alert_hash ah, health_log_detail hld " \
+ "WHERE hl.config_hash_id = ah.hash_id and hl.host_id = @host_id and hl.last_transition_id = hld.transition_id"
void sql_health_alarm_log_load(RRDHOST *host)
{
@@ -751,8 +720,6 @@ void sql_health_alarm_log_load(RRDHOST *host)
int ret;
ssize_t errored = 0, loaded = 0;
- host->health.health_log_entries_written = 0;
-
if (unlikely(!db_meta)) {
if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
error_report("HEALTH [%s]: Database has not been initialized", rrdhost_hostname(host));
@@ -914,27 +881,28 @@ void sql_health_alarm_log_load(RRDHOST *host)
if (unlikely(!host->health_log.next_alarm_id || host->health_log.next_alarm_id <= host->health_max_alarm_id))
host->health_log.next_alarm_id = host->health_max_alarm_id + 1;
- netdata_log_health("[%s]: Table health_log, loaded %zd alarm entries, errors in %zd entries.", rrdhost_hostname(host), loaded, errored);
+ nd_log(NDLS_DAEMON, errored ? NDLP_WARNING : NDLP_DEBUG,
+ "[%s]: Table health_log, loaded %zd alarm entries, errors in %zd entries.",
+ rrdhost_hostname(host), loaded, errored);
ret = sqlite3_finalize(res);
if (unlikely(ret != SQLITE_OK))
error_report("Failed to finalize the health log read statement");
-
- int rows = sql_health_alarm_log_count(host);
-
- if (rows >= 0)
- host->health.health_log_entries_written = rows;
}
/*
* Store an alert config hash in the database
*/
-#define SQL_STORE_ALERT_CONFIG_HASH "insert or replace into alert_hash (hash_id, date_updated, alarm, template, " \
- "on_key, class, component, type, os, hosts, lookup, every, units, calc, plugin, module, " \
- "charts, green, red, warn, crit, exec, to_key, info, delay, options, repeat, host_labels, " \
- "p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after, " \
- "p_db_lookup_before, p_update_every, source, chart_labels, summary) values (?1,unixepoch(),?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12," \
- "?13,?14,?15,?16,?17,?18,?19,?20,?21,?22,?23,?24,?25,?26,?27,?28,?29,?30,?31,?32,?33,?34,?35,?36);"
+#define SQL_STORE_ALERT_CONFIG_HASH \
+ "insert or replace into alert_hash (hash_id, date_updated, alarm, template, " \
+ "on_key, class, component, type, os, hosts, lookup, every, units, calc, plugin, module, " \
+ "charts, green, red, warn, crit, exec, to_key, info, delay, options, repeat, host_labels, " \
+ "p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after, " \
+ "p_db_lookup_before, p_update_every, source, chart_labels, summary) values (@hash_id,UNIXEPOCH(),@alarm,@template," \
+ "@on_key,@class,@component,@type,@os,@hosts,@lookup,@every,@units,@calc,@plugin,@module," \
+ "@charts,@green,@red,@warn,@crit,@exec,@to_key,@info,@delay,@options,@repeat,@host_labels," \
+ "@p_db_lookup_dimensions,@p_db_lookup_method,@p_db_lookup_options,@p_db_lookup_after," \
+ "@p_db_lookup_before,@p_update_every,@source,@chart_labels,@summary)"
int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg)
{
@@ -1212,7 +1180,7 @@ int alert_hash_and_store_config(
#define SQL_SELECT_HEALTH_LAST_EXECUTED_EVENT \
"SELECT hld.new_status FROM health_log hl, health_log_detail hld " \
"WHERE hl.host_id = @host_id AND hl.alarm_id = @alarm_id AND hld.unique_id != @unique_id AND hld.flags & @flags " \
- "AND hl.health_log_id = hld.health_log_id ORDER BY hld.unique_id DESC LIMIT 1;"
+ "AND hl.health_log_id = hld.health_log_id ORDER BY hld.unique_id DESC LIMIT 1"
int sql_health_get_last_executed_event(RRDHOST *host, ALARM_ENTRY *ae, RRDCALC_STATUS *last_executed_status)
{
@@ -1270,191 +1238,162 @@ done:
"hl.units, hld.info, hld.exec_code, hld.new_status, hld.old_status, hld.delay, hld.new_value, hld.old_value, " \
"hld.last_repeat, ah.class, ah.component, ah.type, hl.chart_context, hld.transition_id, hld.summary " \
"FROM health_log hl, alert_hash ah, health_log_detail hld WHERE hl.config_hash_id = ah.hash_id and " \
- "hl.health_log_id = hld.health_log_id and hl.host_id = @host_id "
+ "hl.health_log_id = hld.health_log_id and hl.host_id = @host_id AND hld.unique_id > @after "
-void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, uint32_t after, char *chart) {
+void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, time_t after, const char *chart)
+{
+ unsigned int max = host->health_log.max;
- buffer_strcat(wb, "[");
+ static __thread sqlite3_stmt *stmt_no_chart = NULL;
+ static __thread sqlite3_stmt *stmt_with_chart = NULL;
- unsigned int max = host->health_log.max;
- unsigned int count = 0;
+ sqlite3_stmt **active_stmt;
+ sqlite3_stmt *stmt_query;
- sqlite3_stmt *res = NULL;
- int rc;
+ int rc;
- BUFFER *command = buffer_create(MAX_HEALTH_SQL_SIZE, NULL);
- buffer_sprintf(command, SQL_SELECT_HEALTH_LOG);
+ active_stmt = chart ? &stmt_with_chart : &stmt_no_chart;
- if (chart) {
- char chart_sql[MAX_HEALTH_SQL_SIZE + 1];
- snprintfz(chart_sql, MAX_HEALTH_SQL_SIZE, "AND hl.chart = '%s' ", chart);
- buffer_strcat(command, chart_sql);
- }
+ if (!*active_stmt) {
- if (after) {
- char after_sql[MAX_HEALTH_SQL_SIZE + 1];
- snprintfz(after_sql, MAX_HEALTH_SQL_SIZE, "AND hld.unique_id > %u ", after);
- buffer_strcat(command, after_sql);
- }
+ BUFFER *command = buffer_create(MAX_HEALTH_SQL_SIZE, NULL);
+ buffer_sprintf(command, SQL_SELECT_HEALTH_LOG);
- {
- char limit_sql[MAX_HEALTH_SQL_SIZE + 1];
- snprintfz(limit_sql, MAX_HEALTH_SQL_SIZE, "ORDER BY hld.unique_id DESC LIMIT %u ", max);
- buffer_strcat(command, limit_sql);
- }
+ if (chart)
+ buffer_strcat(command, " AND hl.chart = @chart ");
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(command), -1, &res, 0);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement SQL_SELECT_HEALTH_LOG");
- buffer_free(command);
- return;
- }
+ buffer_strcat(command, " ORDER BY hld.unique_id DESC LIMIT @limit");
- rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id for SQL_SELECT_HEALTH_LOG.");
- sqlite3_finalize(res);
- buffer_free(command);
- return;
- }
+ rc = prepare_statement(db_meta, buffer_tostring(command), active_stmt);
+ buffer_free(command);
- while (sqlite3_step(res) == SQLITE_ROW) {
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement SQL_SELECT_HEALTH_LOG");
+ return;
+ }
+ }
- char old_value_string[100 + 1];
- char new_value_string[100 + 1];
+ stmt_query = *active_stmt;
- char config_hash_id[UUID_STR_LEN];
- uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), config_hash_id);
+ int param = 0;
+ rc = sqlite3_bind_blob(stmt_query, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind host_id for SQL_SELECT_HEALTH_LOG.");
+ goto finish;
+ }
- char transition_id[UUID_STR_LEN] = {0};
- if (sqlite3_column_type(res, 30) != SQLITE_NULL)
- uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 30)), transition_id);
-
- char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) : strdupz("UNKNOWN=0=UNKNOWN");
-
- if (count)
- buffer_sprintf(wb, ",");
-
- count++;
-
- buffer_sprintf(
- wb,
- "\n\t{\n"
- "\t\t\"hostname\": \"%s\",\n"
- "\t\t\"utc_offset\": %d,\n"
- "\t\t\"timezone\": \"%s\",\n"
- "\t\t\"unique_id\": %u,\n"
- "\t\t\"alarm_id\": %u,\n"
- "\t\t\"alarm_event_id\": %u,\n"
- "\t\t\"config_hash_id\": \"%s\",\n"
- "\t\t\"transition_id\": \"%s\",\n"
- "\t\t\"name\": \"%s\",\n"
- "\t\t\"chart\": \"%s\",\n"
- "\t\t\"context\": \"%s\",\n"
- "\t\t\"class\": \"%s\",\n"
- "\t\t\"component\": \"%s\",\n"
- "\t\t\"type\": \"%s\",\n"
- "\t\t\"processed\": %s,\n"
- "\t\t\"updated\": %s,\n"
- "\t\t\"exec_run\": %lu,\n"
- "\t\t\"exec_failed\": %s,\n"
- "\t\t\"exec\": \"%s\",\n"
- "\t\t\"recipient\": \"%s\",\n"
- "\t\t\"exec_code\": %d,\n"
- "\t\t\"source\": \"%s\",\n"
- "\t\t\"command\": \"%s\",\n"
- "\t\t\"units\": \"%s\",\n"
- "\t\t\"when\": %lu,\n"
- "\t\t\"duration\": %lu,\n"
- "\t\t\"non_clear_duration\": %lu,\n"
- "\t\t\"status\": \"%s\",\n"
- "\t\t\"old_status\": \"%s\",\n"
- "\t\t\"delay\": %d,\n"
- "\t\t\"delay_up_to_timestamp\": %lu,\n"
- "\t\t\"updated_by_id\": %u,\n"
- "\t\t\"updates_id\": %u,\n"
- "\t\t\"value_string\": \"%s\",\n"
- "\t\t\"old_value_string\": \"%s\",\n"
- "\t\t\"last_repeat\": %lu,\n"
- "\t\t\"silenced\": \"%s\",\n",
- rrdhost_hostname(host),
- host->utc_offset,
- rrdhost_abbrev_timezone(host),
- (unsigned int) sqlite3_column_int64(res, 0),
- (unsigned int) sqlite3_column_int64(res, 1),
- (unsigned int) sqlite3_column_int64(res, 2),
- config_hash_id,
- transition_id,
- sqlite3_column_text(res, 12),
- sqlite3_column_text(res, 13),
- sqlite3_column_text(res, 29),
- sqlite3_column_text(res, 26) ? (const char *) sqlite3_column_text(res, 26) : (char *) "Unknown",
- sqlite3_column_text(res, 27) ? (const char *) sqlite3_column_text(res, 27) : (char *) "Unknown",
- sqlite3_column_text(res, 28) ? (const char *) sqlite3_column_text(res, 28) : (char *) "Unknown",
- (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_PROCESSED)?"true":"false",
- (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_UPDATED)?"true":"false",
- (long unsigned int)sqlite3_column_int64(res, 10),
- (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_EXEC_FAILED)?"true":"false",
- sqlite3_column_text(res, 14) ? (const char *) sqlite3_column_text(res, 14) : string2str(host->health.health_default_exec),
- sqlite3_column_text(res, 15) ? (const char *) sqlite3_column_text(res, 15) : string2str(host->health.health_default_recipient),
- sqlite3_column_int(res, 19),
- sqlite3_column_text(res, 16) ? (const char *) sqlite3_column_text(res, 16) : (char *) "Unknown",
- edit_command,
- sqlite3_column_text(res, 17),
- (long unsigned int)sqlite3_column_int64(res, 6),
- (long unsigned int)sqlite3_column_int64(res, 7),
- (long unsigned int)sqlite3_column_int64(res, 8),
- rrdcalc_status2string(sqlite3_column_int(res, 20)),
- rrdcalc_status2string(sqlite3_column_int(res, 21)),
- sqlite3_column_int(res, 22),
- (long unsigned int)sqlite3_column_int64(res, 11),
- (unsigned int)sqlite3_column_int64(res, 4),
- (unsigned int)sqlite3_column_int64(res, 5),
- sqlite3_column_type(res, 23) == SQLITE_NULL ? "-" : format_value_and_unit(new_value_string, 100, sqlite3_column_double(res, 23), (char *) sqlite3_column_text(res, 17), -1),
- sqlite3_column_type(res, 24) == SQLITE_NULL ? "-" : format_value_and_unit(old_value_string, 100, sqlite3_column_double(res, 24), (char *) sqlite3_column_text(res, 17), -1),
- (long unsigned int)sqlite3_column_int64(res, 25),
- (sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_SILENCED)?"true":"false");
-
- health_string2json(wb, "\t\t", "summary", (char *) sqlite3_column_text(res, 31), ",\n");
- health_string2json(wb, "\t\t", "info", (char *) sqlite3_column_text(res, 18), ",\n");
-
- if(unlikely(sqlite3_column_int64(res, 9) & HEALTH_ENTRY_FLAG_NO_CLEAR_NOTIFICATION)) {
- buffer_strcat(wb, "\t\t\"no_clear_notification\": true,\n");
- }
+ rc = sqlite3_bind_int64(stmt_query, ++param, after);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind after for SQL_SELECT_HEALTH_LOG.");
+ goto finish;
+ }
- buffer_strcat(wb, "\t\t\"value\":");
- if (sqlite3_column_type(res, 23) == SQLITE_NULL)
- buffer_strcat(wb, "null");
- else
- buffer_print_netdata_double(wb, sqlite3_column_double(res, 23));
- buffer_strcat(wb, ",\n");
+ if (chart) {
+ rc = sqlite3_bind_text(stmt_query, ++param, chart, -1, SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind after for SQL_SELECT_HEALTH_LOG.");
+ goto finish;
+ }
+ }
- buffer_strcat(wb, "\t\t\"old_value\":");
- if (sqlite3_column_type(res, 24) == SQLITE_NULL)
- buffer_strcat(wb, "null");
+ rc = sqlite3_bind_int64(stmt_query, ++param, max);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind max lines for SQL_SELECT_HEALTH_LOG.");
+ goto finish;
+ }
+
+ buffer_json_initialize(wb, "\"", "\"", 0, false, BUFFER_JSON_OPTIONS_DEFAULT);
+ buffer_json_member_add_array(wb, NULL);
+
+ while (sqlite3_step(stmt_query) == SQLITE_ROW) {
+ char old_value_string[100 + 1];
+ char new_value_string[100 + 1];
+
+ char config_hash_id[UUID_STR_LEN];
+ uuid_unparse_lower(*((uuid_t *)sqlite3_column_blob(stmt_query, 3)), config_hash_id);
+
+ char transition_id[UUID_STR_LEN] = {0};
+ if (sqlite3_column_type(stmt_query, 30) != SQLITE_NULL)
+ uuid_unparse_lower(*((uuid_t *)sqlite3_column_blob(stmt_query, 30)), transition_id);
+
+ char *edit_command = sqlite3_column_bytes(stmt_query, 16) > 0 ?
+ health_edit_command_from_source((char *)sqlite3_column_text(stmt_query, 16)) :
+ strdupz("UNKNOWN=0=UNKNOWN");
+
+ buffer_json_add_array_item_object(wb); // this node
+
+ buffer_json_member_add_string_or_empty(wb, "hostname", rrdhost_hostname(host));
+ buffer_json_member_add_int64(wb, "utc_offset", (int64_t)host->utc_offset);
+ buffer_json_member_add_string_or_empty(wb, "timezone", rrdhost_abbrev_timezone(host));
+ buffer_json_member_add_int64(wb, "unique_id", (int64_t) sqlite3_column_int64(stmt_query, 0));
+ buffer_json_member_add_int64(wb, "alarm_id", (int64_t) sqlite3_column_int64(stmt_query, 1));
+ buffer_json_member_add_int64(wb, "alarm_event_id", (int64_t) sqlite3_column_int64(stmt_query, 2));
+ buffer_json_member_add_string_or_empty(wb, "config_hash_id", config_hash_id);
+ buffer_json_member_add_string_or_empty(wb, "transition_id", transition_id);
+ buffer_json_member_add_string_or_empty(wb, "name", (const char *) sqlite3_column_text(stmt_query, 12));
+ buffer_json_member_add_string_or_empty(wb, "chart", (const char *) sqlite3_column_text(stmt_query, 13));
+ buffer_json_member_add_string_or_empty(wb, "context", (const char *) sqlite3_column_text(stmt_query, 29));
+ buffer_json_member_add_string_or_empty(wb, "class", sqlite3_column_text(stmt_query, 26) ? (const char *) sqlite3_column_text(stmt_query, 26) : (char *) "Unknown");
+ buffer_json_member_add_string_or_empty(wb, "component", sqlite3_column_text(stmt_query, 27) ? (const char *) sqlite3_column_text(stmt_query, 27) : (char *) "Unknown");
+ buffer_json_member_add_string_or_empty(wb, "type", sqlite3_column_text(stmt_query, 28) ? (const char *) sqlite3_column_text(stmt_query, 28) : (char *) "Unknown");
+ buffer_json_member_add_boolean(wb, "processed", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_PROCESSED));
+ buffer_json_member_add_boolean(wb, "updated", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_UPDATED));
+ buffer_json_member_add_int64(wb, "exec_run", (int64_t)sqlite3_column_int64(stmt_query, 10));
+ buffer_json_member_add_boolean(wb, "exec_failed", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_EXEC_FAILED));
+ buffer_json_member_add_string_or_empty(wb, "exec", sqlite3_column_text(stmt_query, 14) ? (const char *) sqlite3_column_text(stmt_query, 14) : string2str(host->health.health_default_exec));
+ buffer_json_member_add_string_or_empty(wb, "recipient", sqlite3_column_text(stmt_query, 15) ? (const char *) sqlite3_column_text(stmt_query, 15) : string2str(host->health.health_default_recipient));
+ buffer_json_member_add_int64(wb, "exec_code", sqlite3_column_int(stmt_query, 19));
+ buffer_json_member_add_string_or_empty(wb, "source", sqlite3_column_text(stmt_query, 16) ? (const char *) sqlite3_column_text(stmt_query, 16) : (char *) "Unknown");
+ buffer_json_member_add_string_or_empty(wb, "command", edit_command);
+ buffer_json_member_add_string_or_empty(wb, "units", (const char *) sqlite3_column_text(stmt_query, 17));
+ buffer_json_member_add_int64(wb, "when", (int64_t)sqlite3_column_int64(stmt_query, 6));
+ buffer_json_member_add_int64(wb, "duration", (int64_t)sqlite3_column_int64(stmt_query, 7));
+ buffer_json_member_add_int64(wb, "non_clear_duration", (int64_t)sqlite3_column_int64(stmt_query, 8));
+ buffer_json_member_add_string_or_empty(wb, "status", rrdcalc_status2string(sqlite3_column_int(stmt_query, 20)));
+ buffer_json_member_add_string_or_empty(wb, "old_status", rrdcalc_status2string(sqlite3_column_int(stmt_query, 21)));
+ buffer_json_member_add_int64(wb, "delay", sqlite3_column_int(stmt_query, 22));
+ buffer_json_member_add_int64(wb, "delay_up_to_timestamp",(int64_t)sqlite3_column_int64(stmt_query, 11));
+ buffer_json_member_add_int64(wb, "updated_by_id", (unsigned int)sqlite3_column_int64(stmt_query, 4));
+ buffer_json_member_add_int64(wb, "updates_id", (unsigned int)sqlite3_column_int64(stmt_query, 5));
+ buffer_json_member_add_string_or_empty(wb, "value_string", sqlite3_column_type(stmt_query, 23) == SQLITE_NULL ? "-" :
+ format_value_and_unit(new_value_string, 100, sqlite3_column_double(stmt_query, 23), (char *) sqlite3_column_text(stmt_query, 17), -1));
+ buffer_json_member_add_string_or_empty(wb, "old_value_string", sqlite3_column_type(stmt_query, 24) == SQLITE_NULL ? "-" :
+ format_value_and_unit(old_value_string, 100, sqlite3_column_double(stmt_query, 24), (char *) sqlite3_column_text(stmt_query, 17), -1));
+ buffer_json_member_add_int64(wb, "last_repeat", (int64_t)sqlite3_column_int64(stmt_query, 25));
+ buffer_json_member_add_boolean(wb, "silenced", (sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_SILENCED));
+ buffer_json_member_add_string_or_empty(wb, "summary", (const char *) sqlite3_column_text(stmt_query, 31));
+ buffer_json_member_add_string_or_empty(wb, "info", (const char *) sqlite3_column_text(stmt_query, 18));
+ buffer_json_member_add_boolean(wb, "no_clear_notification",(sqlite3_column_int64(stmt_query, 9) & HEALTH_ENTRY_FLAG_NO_CLEAR_NOTIFICATION));
+
+ if (sqlite3_column_type(stmt_query, 23) == SQLITE_NULL)
+ buffer_json_member_add_string(wb, "value", NULL);
else
- buffer_print_netdata_double(wb, sqlite3_column_double(res, 24));
- buffer_strcat(wb, "\n");
+ buffer_json_member_add_double(wb, "value", sqlite3_column_double(stmt_query, 23));
- buffer_strcat(wb, "\t}");
+ if (sqlite3_column_type(stmt_query, 24) == SQLITE_NULL)
+ buffer_json_member_add_string(wb, "old_value", NULL);
+ else
+ buffer_json_member_add_double(wb, "old_value", sqlite3_column_double(stmt_query, 23));
freez(edit_command);
+
+ buffer_json_object_close(wb);
}
- buffer_strcat(wb, "\n]");
+ buffer_json_array_close(wb);
+ buffer_json_finalize(wb);
- rc = sqlite3_finalize(res);
- if (unlikely(rc != SQLITE_OK))
- error_report("Failed to finalize statement for SQL_SELECT_HEALTH_LOG");
-
- buffer_free(command);
+finish:
+ rc = sqlite3_reset(stmt_query);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement for SQL_SELECT_HEALTH_LOG");
}
-#define SQL_COPY_HEALTH_LOG(table) "INSERT OR IGNORE INTO health_log (host_id, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context) SELECT ?1, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context from %s;", table
-#define SQL_COPY_HEALTH_LOG_DETAIL(table) "INSERT INTO health_log_detail (unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, host_id) SELECT unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, now_usec(1), ?1 from %s;", table
-#define SQL_UPDATE_HEALTH_LOG_DETAIL_TRANSITION_ID "update health_log_detail set transition_id = uuid_random() where transition_id is null;"
-#define SQL_UPDATE_HEALTH_LOG_DETAIL_HEALTH_LOG_ID "update health_log_detail set health_log_id = (select health_log_id from health_log where host_id = ?1 and alarm_id = health_log_detail.alarm_id) where health_log_id is null and host_id = ?2;"
-#define SQL_UPDATE_HEALTH_LOG_LAST_TRANSITION_ID "update health_log set last_transition_id = (select transition_id from health_log_detail where health_log_id = health_log.health_log_id and alarm_id = health_log.alarm_id group by (alarm_id) having max(alarm_event_id)) where host_id = ?1;"
+#define SQL_COPY_HEALTH_LOG(table) "INSERT OR IGNORE INTO health_log (host_id, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context) SELECT ?1, alarm_id, config_hash_id, name, chart, family, exec, recipient, units, chart_context from %s", table
+#define SQL_COPY_HEALTH_LOG_DETAIL(table) "INSERT INTO health_log_detail (unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, global_id, host_id) SELECT unique_id, alarm_id, alarm_event_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, transition_id, now_usec(1), ?1 from %s", table
+#define SQL_UPDATE_HEALTH_LOG_DETAIL_TRANSITION_ID "update health_log_detail set transition_id = uuid_random() where transition_id is null"
+#define SQL_UPDATE_HEALTH_LOG_DETAIL_HEALTH_LOG_ID "update health_log_detail set health_log_id = (select health_log_id from health_log where host_id = ?1 and alarm_id = health_log_detail.alarm_id) where health_log_id is null and host_id = ?2"
+#define SQL_UPDATE_HEALTH_LOG_LAST_TRANSITION_ID "update health_log set last_transition_id = (select transition_id from health_log_detail where health_log_id = health_log.health_log_id and alarm_id = health_log.alarm_id group by (alarm_id) having max(alarm_event_id)) where host_id = ?1"
int health_migrate_old_health_log_table(char *table) {
if (!table)
return 0;
@@ -1476,7 +1415,7 @@ int health_migrate_old_health_log_table(char *table) {
int rc;
char command[MAX_HEALTH_SQL_SIZE + 1];
sqlite3_stmt *res = NULL;
- snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_COPY_HEALTH_LOG(table));
+ snprintfz(command, sizeof(command) - 1, SQL_COPY_HEALTH_LOG(table));
rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to copy health log, rc = %d", rc);
@@ -1503,7 +1442,7 @@ int health_migrate_old_health_log_table(char *table) {
}
//detail
- snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_COPY_HEALTH_LOG_DETAIL(table));
+ snprintfz(command, sizeof(command) - 1, SQL_COPY_HEALTH_LOG_DETAIL(table));
rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to copy health log detail, rc = %d", rc);
@@ -1913,12 +1852,12 @@ void sql_alert_transitions(
goto run_query;
}
- snprintfz(sql, 511, SQL_BUILD_ALERT_TRANSITION, nodes);
+ snprintfz(sql, sizeof(sql) - 1, SQL_BUILD_ALERT_TRANSITION, nodes);
rc = db_execute(db_meta, sql);
if (rc)
return;
- snprintfz(sql, 511, SQL_POPULATE_TEMP_ALERT_TRANSITION_TABLE, nodes);
+ snprintfz(sql, sizeof(sql) - 1, SQL_POPULATE_TEMP_ALERT_TRANSITION_TABLE, nodes);
// Prepare statement to add things
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
@@ -2046,7 +1985,7 @@ done:
done_only_drop:
if (likely(!transition)) {
- (void)snprintfz(sql, 511, "DROP TABLE IF EXISTS v_%p", nodes);
+ (void)snprintfz(sql, sizeof(sql) - 1, "DROP TABLE IF EXISTS v_%p", nodes);
(void)db_execute(db_meta, sql);
buffer_free(command);
}
@@ -2078,12 +2017,12 @@ int sql_get_alert_configuration(
if (unlikely(!configs))
return added;
- snprintfz(sql, 511, SQL_BUILD_CONFIG_TARGET_LIST, configs);
+ snprintfz(sql, sizeof(sql) - 1, SQL_BUILD_CONFIG_TARGET_LIST, configs);
rc = db_execute(db_meta, sql);
if (rc)
return added;
- snprintfz(sql, 511, SQL_POPULATE_TEMP_CONFIG_TARGET_TABLE, configs);
+ snprintfz(sql, sizeof(sql) - 1, SQL_POPULATE_TEMP_CONFIG_TARGET_TABLE, configs);
// Prepare statement to add things
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
@@ -2180,7 +2119,7 @@ int sql_get_alert_configuration(
error_report("Failed to finalize statement for sql_get_alert_configuration");
fail_only_drop:
- (void)snprintfz(sql, 511, "DROP TABLE IF EXISTS c_%p", configs);
+ (void)snprintfz(sql, sizeof(sql) - 1, "DROP TABLE IF EXISTS c_%p", configs);
(void)db_execute(db_meta, sql);
buffer_free(command);
return added;
diff --git a/database/sqlite/sqlite_health.h b/database/sqlite/sqlite_health.h
index e21912368..5549b7525 100644
--- a/database/sqlite/sqlite_health.h
+++ b/database/sqlite/sqlite_health.h
@@ -13,7 +13,7 @@ void sql_health_alarm_log_cleanup(RRDHOST *host, bool claimed);
int alert_hash_and_store_config(uuid_t hash_id, struct alert_config *cfg, int store_hash);
void sql_aclk_alert_clean_dead_entries(RRDHOST *host);
int sql_health_get_last_executed_event(RRDHOST *host, ALARM_ENTRY *ae, RRDCALC_STATUS *last_executed_status);
-void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, uint32_t after, char *chart);
+void sql_health_alarm_log2json(RRDHOST *host, BUFFER *wb, time_t after, const char *chart);
int health_migrate_old_health_log_table(char *table);
uint32_t sql_get_alarm_id(RRDHOST *host, STRING *chart, STRING *name, uint32_t *next_event_id, uuid_t *config_hash_id);
uint32_t sql_get_alarm_id_check_zero_hash(RRDHOST *host, STRING *chart, STRING *name, uint32_t *next_event_id, uuid_t *config_hash_id);
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 143783163..636f51966 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -4,11 +4,12 @@
// SQL statements
-#define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \
- "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \
- "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;"
+#define SQL_STORE_CLAIM_ID \
+ "INSERT INTO node_instance " \
+ "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, UNIXEPOCH()) " \
+ "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id"
-#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
+#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid"
#define STORE_HOST_LABEL \
"INSERT INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
@@ -18,13 +19,13 @@
#define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())"
-#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;"
+#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid"
#define SQL_STORE_HOST_INFO \
"INSERT OR REPLACE INTO host (host_id, hostname, registry_hostname, update_every, os, timezone, tags, hops, " \
"memory_mode, abbrev_timezone, utc_offset, program_name, program_version, entries, health_enabled, last_connected) " \
"VALUES (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, " \
- "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected);"
+ "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected)"
#define SQL_STORE_CHART \
"INSERT INTO chart (chart_id, host_id, type, id, name, family, context, title, unit, plugin, module, priority, " \
@@ -51,11 +52,10 @@
"(@uuid, @name, @value, UNIXEPOCH())"
#define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \
- "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);"
-#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;"
-#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"
+ "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0)"
+#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id"
+#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host)"
-#define METADATA_CMD_Q_MAX_SIZE (2048) // Max queue size; callers will block until there is room
#define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds
#define METADATA_MAINTENANCE_REPEAT (60) // Repeat if last run for dimensions, charts, labels needs more work
#define METADATA_HEALTH_LOG_INTERVAL (3600) // Repeat maintenance for health
@@ -81,10 +81,10 @@ enum metadata_opcode {
METADATA_ADD_HOST_INFO,
METADATA_SCAN_HOSTS,
METADATA_LOAD_HOST_CONTEXT,
+ METADATA_DELETE_HOST_CHART_LABELS,
METADATA_MAINTENANCE,
METADATA_SYNC_SHUTDOWN,
METADATA_UNITTEST,
- METADATA_ML_LOAD_MODELS,
// leave this last
// we need it to check for worker utilization
METADATA_MAX_ENUMERATIONS_DEFINED
@@ -98,14 +98,9 @@ struct metadata_cmd {
struct metadata_cmd *prev, *next;
};
-struct metadata_database_cmdqueue {
- struct metadata_cmd *cmd_base;
-};
-
typedef enum {
METADATA_FLAG_PROCESSING = (1 << 0), // store or cleanup
METADATA_FLAG_SHUTDOWN = (1 << 1), // Shutting down
- METADATA_FLAG_ML_LOADING = (1 << 2), // ML model load in progress
} METADATA_FLAG;
struct metadata_wc {
@@ -114,19 +109,20 @@ struct metadata_wc {
uv_async_t async;
uv_timer_t timer_req;
time_t metadata_check_after;
- volatile unsigned queue_size;
METADATA_FLAG flags;
- struct completion init_complete;
+ struct completion start_stop_complete;
struct completion *scan_complete;
/* FIFO command queue */
- uv_mutex_t cmd_mutex;
- struct metadata_database_cmdqueue cmd_queue;
+ SPINLOCK cmd_queue_lock;
+ struct metadata_cmd *cmd_base;
};
#define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag))
#define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST)
#define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST)
+struct metadata_wc metasync_worker = {.loop = NULL};
+
//
// For unittest
//
@@ -146,6 +142,33 @@ struct query_build {
char uuid_str[UUID_STR_LEN];
};
+#define SQL_DELETE_CHART_LABELS_BY_HOST \
+ "DELETE FROM chart_label WHERE chart_id in (SELECT chart_id FROM chart WHERE host_id = @host_id)"
+
+static void delete_host_chart_labels(uuid_t *host_uuid)
+{
+ sqlite3_stmt *res = NULL;
+
+ int rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_LABELS_BY_HOST, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to delete chart labels by host");
+ return;
+ }
+
+ rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind host_id parameter to host chart labels");
+ goto failed;
+ }
+ rc = sqlite3_step_monitored(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to execute command to remove host chart labels");
+
+failed:
+ if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
+ error_report("Failed to finalize statement to remove host chart labels");
+}
+
static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
struct query_build *lb = data;
if (unlikely(!lb->count))
@@ -168,8 +191,8 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value
return 1;
}
-#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;"
-#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;"
+#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id"
+#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id"
static void clean_old_chart_labels(RRDSET *st)
{
@@ -177,9 +200,9 @@ static void clean_old_chart_labels(RRDSET *st)
time_t first_time_s = rrdset_first_entry_s(st);
if (unlikely(!first_time_s))
- snprintfz(sql, 511,SQL_DELETE_CHART_LABEL);
+ snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL);
else
- snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
+ snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
int rc = exec_statement_with_uuid(sql, &st->chart_uuid);
if (unlikely(rc))
@@ -873,7 +896,7 @@ static void check_dimension_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_DIM_CHECK_INTERVAL;
}
- netdata_log_info(
+ internal_error(true,
"METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
@@ -940,7 +963,7 @@ static void check_chart_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_CHART_CHECK_INTERVAL;
}
- netdata_log_info(
+ internal_error(true,
"METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
@@ -1009,7 +1032,7 @@ static void check_label_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL;
}
- netdata_log_info(
+ internal_error(true,
"METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
@@ -1059,21 +1082,15 @@ static void cleanup_health_log(struct metadata_wc *wc)
// EVENT LOOP STARTS HERE
//
-static void metadata_init_cmd_queue(struct metadata_wc *wc)
-{
- wc->cmd_queue.cmd_base = NULL;
- fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
-}
-
static void metadata_free_cmd_queue(struct metadata_wc *wc)
{
- uv_mutex_lock(&wc->cmd_mutex);
- while(wc->cmd_queue.cmd_base) {
- struct metadata_cmd *t = wc->cmd_queue.cmd_base;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ spinlock_lock(&wc->cmd_queue_lock);
+ while(wc->cmd_base) {
+ struct metadata_cmd *t = wc->cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next);
freez(t);
}
- uv_mutex_unlock(&wc->cmd_mutex);
+ spinlock_unlock(&wc->cmd_queue_lock);
}
static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
@@ -1090,9 +1107,9 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
*t = *cmd;
t->prev = t->next = NULL;
- uv_mutex_lock(&wc->cmd_mutex);
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
- uv_mutex_unlock(&wc->cmd_mutex);
+ spinlock_lock(&wc->cmd_queue_lock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_base, t, prev, next);
+ spinlock_unlock(&wc->cmd_queue_lock);
wakeup_event_loop:
(void) uv_async_send(&wc->async);
@@ -1102,10 +1119,10 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
{
struct metadata_cmd ret;
- uv_mutex_lock(&wc->cmd_mutex);
- if(wc->cmd_queue.cmd_base) {
- struct metadata_cmd *t = wc->cmd_queue.cmd_base;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ spinlock_lock(&wc->cmd_queue_lock);
+ if(wc->cmd_base) {
+ struct metadata_cmd *t = wc->cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next);
ret = *t;
freez(t);
}
@@ -1113,7 +1130,7 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
ret.opcode = METADATA_DATABASE_NOOP;
ret.completion = NULL;
}
- uv_mutex_unlock(&wc->cmd_mutex);
+ spinlock_unlock(&wc->cmd_queue_lock);
return ret;
}
@@ -1136,9 +1153,7 @@ static void timer_cb(uv_timer_t* handle)
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- time_t now = now_realtime_sec();
-
- if (wc->metadata_check_after && wc->metadata_check_after < now) {
+ if (wc->metadata_check_after < now_realtime_sec()) {
cmd.opcode = METADATA_SCAN_HOSTS;
metadata_enq_cmd(wc, &cmd);
}
@@ -1158,10 +1173,10 @@ void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int
if (free_pages > (total_pages * threshold / 100)) {
int do_free_pages = (int) (free_pages * vacuum_pc / 100);
- netdata_log_info("%s: Freeing %d database pages", db_alias, do_free_pages);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s: Freeing %d database pages", db_alias, do_free_pages);
char sql[128];
- snprintfz(sql, 127, "PRAGMA incremental_vacuum(%d)", do_free_pages);
+ snprintfz(sql, sizeof(sql) - 1, "PRAGMA incremental_vacuum(%d)", do_free_pages);
(void) db_execute(database, sql);
}
}
@@ -1184,16 +1199,10 @@ void run_metadata_cleanup(struct metadata_wc *wc)
(void) sqlite3_wal_checkpoint(db_meta, NULL);
}
-struct ml_model_payload {
- uv_work_t request;
- struct metadata_wc *wc;
- Pvoid_t JudyL;
- size_t count;
-};
-
struct scan_metadata_payload {
uv_work_t request;
struct metadata_wc *wc;
+ void *data;
BUFFER *work_buffer;
uint32_t max_count;
};
@@ -1271,7 +1280,7 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)
register_libuv_worker_jobs();
struct scan_metadata_payload *data = req->data;
- UNUSED(data);
+ struct metadata_wc *wc = data->wc;
worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD);
usec_t started_ut = now_monotonic_usec(); (void)started_ut;
@@ -1279,6 +1288,9 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)
RRDHOST *host;
size_t max_threads = MIN(get_netdata_cpus() / 2, 6);
+ if (max_threads < 1)
+ max_threads = 1;
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Using %zu threads for context loading", max_threads);
struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt));
size_t thread_index;
@@ -1290,25 +1302,28 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)
rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host));
- cleanup_finished_threads(hclt, max_threads, false);
- bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
+ bool found_slot = false;
+ do {
+ if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
+ break;
- if (unlikely(!found_slot)) {
- struct host_context_load_thread hclt_sync = {.host = host};
- restore_host_context(&hclt_sync);
- }
- else {
- __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
- hclt[thread_index].host = host;
- assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
- }
+ cleanup_finished_threads(hclt, max_threads, false);
+ found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
+ } while (!found_slot);
+
+ if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
+ break;
+
+ __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
+ hclt[thread_index].host = host;
+ fatal_assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
}
dfe_done(host);
cleanup_finished_threads(hclt, max_threads, true);
freez(hclt);
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
- internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
worker_is_idle();
}
@@ -1335,6 +1350,10 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
bool more_to_do = false;
uint32_t scan_count = 1;
+ sqlite3_stmt *ml_load_stmt = NULL;
+
+ bool load_ml_models = max_count;
+
if (use_transaction)
(void)db_execute(db_meta, "BEGIN TRANSACTION");
@@ -1379,6 +1398,14 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
rrdhost_hostname(host), rrdset_name(st),
rrddim_name(rd));
}
+
+ if(rrddim_flag_check(rd, RRDDIM_FLAG_ML_MODEL_LOAD)) {
+ rrddim_flag_clear(rd, RRDDIM_FLAG_ML_MODEL_LOAD);
+ if (likely(load_ml_models))
+ (void) ml_dimension_load_models(rd, &ml_load_stmt);
+ }
+
+ worker_is_idle();
}
rrddim_foreach_done(rd);
}
@@ -1387,6 +1414,11 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
if (use_transaction)
(void)db_execute(db_meta, "COMMIT TRANSACTION");
+ if (ml_load_stmt) {
+ sqlite3_finalize(ml_load_stmt);
+ ml_load_stmt = NULL;
+ }
+
return more_to_do;
}
@@ -1411,6 +1443,11 @@ static void store_host_and_system_info(RRDHOST *host, size_t *query_counter)
}
}
+struct host_chart_label_cleanup {
+ Pvoid_t JudyL;
+ Word_t count;
+};
+
// Worker thread to scan hosts for pending metadata to store
static void start_metadata_hosts(uv_work_t *req __maybe_unused)
{
@@ -1427,11 +1464,33 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
internal_error(true, "METADATA: checking all hosts...");
usec_t started_ut = now_monotonic_usec(); (void)started_ut;
+ struct host_chart_label_cleanup *cl_cleanup_data = data->data;
+
+ if (cl_cleanup_data) {
+ Word_t Index = 0;
+ bool first = true;
+ Pvoid_t *PValue;
+ while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) {
+ char *machine_guid = *PValue;
+
+ host = rrdhost_find_by_guid(machine_guid);
+ if (likely(!host)) {
+ uuid_t host_uuid;
+ if (!uuid_parse(machine_guid, host_uuid))
+ delete_host_chart_labels(&host_uuid);
+ }
+
+ freez(machine_guid);
+ }
+ JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0);
+ freez(cl_cleanup_data);
+ }
+
bool run_again = false;
worker_is_busy(UV_EVENT_METADATA_STORE);
if (!data->max_count)
- transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;");
+ transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION");
dfe_start_reentrant(rrdhost_root_index, host) {
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
@@ -1501,7 +1560,7 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
dfe_done(host);
if (!data->max_count && transaction_started)
- transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;");
+ transaction_started = db_execute(db_meta, "COMMIT TRANSACTION");
usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
@@ -1516,42 +1575,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
worker_is_idle();
}
-// Callback after scan of hosts is done
-static void after_start_ml_model_load(uv_work_t *req, int status __maybe_unused)
-{
- struct ml_model_payload *ml_data = req->data;
- struct metadata_wc *wc = ml_data->wc;
- metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING);
- JudyLFreeArray(&ml_data->JudyL, PJE0);
- freez(ml_data);
-}
-
-static void start_ml_model_load(uv_work_t *req __maybe_unused)
-{
- register_libuv_worker_jobs();
-
- struct ml_model_payload *ml_data = req->data;
-
- worker_is_busy(UV_EVENT_METADATA_ML_LOAD);
-
- Pvoid_t *PValue;
- Word_t Index = 0;
- bool first = true;
- RRDDIM *rd;
- RRDDIM_ACQUIRED *rda;
- internal_error(true, "Batch ML load loader, %zu items", ml_data->count);
- while((PValue = JudyLFirstThenNext(ml_data->JudyL, &Index, &first))) {
- UNUSED(PValue);
- rda = (RRDDIM_ACQUIRED *) Index;
- rd = rrddim_acquired_to_rrddim(rda);
- ml_dimension_load_models(rd);
- rrddim_acquired_release(rda);
- }
- worker_is_idle();
-}
-
-
-
static void metadata_event_loop(void *arg)
{
worker_register("METASYNC");
@@ -1561,7 +1584,6 @@ static void metadata_event_loop(void *arg)
worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
- worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models");
int ret;
uv_loop_t *loop;
@@ -1593,7 +1615,7 @@ static void metadata_event_loop(void *arg)
wc->timer_req.data = wc;
fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
- netdata_log_info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Starting metadata sync thread");
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -1602,11 +1624,11 @@ static void metadata_event_loop(void *arg)
wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
int shutdown = 0;
- completion_mark_complete(&wc->init_complete);
+ completion_mark_complete(&wc->start_stop_complete);
BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
struct scan_metadata_payload *data;
+ struct host_chart_label_cleanup *cl_cleanup_data = NULL;
- struct ml_model_payload *ml_data = NULL;
while (shutdown == 0 || (wc->flags & METADATA_FLAG_PROCESSING)) {
uuid_t *uuid;
RRDHOST *host = NULL;
@@ -1633,43 +1655,10 @@ static void metadata_event_loop(void *arg)
if (likely(opcode != METADATA_DATABASE_NOOP))
worker_is_busy(opcode);
- // Have pending ML models to load?
- if (opcode != METADATA_ML_LOAD_MODELS && ml_data && ml_data->count) {
- static usec_t ml_submit_last = 0;
- usec_t now = now_monotonic_usec();
- if (!ml_submit_last)
- ml_submit_last = now;
-
- if (!metadata_flag_check(wc, METADATA_FLAG_ML_LOADING) && (now - ml_submit_last > 150 * USEC_PER_MS)) {
- metadata_flag_set(wc, METADATA_FLAG_ML_LOADING);
- if (unlikely(uv_queue_work(loop, &ml_data->request, start_ml_model_load, after_start_ml_model_load)))
- metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING);
- else {
- ml_submit_last = now;
- ml_data = NULL;
- }
- }
- }
-
switch (opcode) {
case METADATA_DATABASE_NOOP:
case METADATA_DATABASE_TIMER:
break;
-
- case METADATA_ML_LOAD_MODELS: {
- RRDDIM *rd = (RRDDIM *) cmd.param[0];
- RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(rd->rrdset, rrddim_id(rd));
- if (likely(rda)) {
- if (!ml_data) {
- ml_data = callocz(1,sizeof(*ml_data));
- ml_data->request.data = ml_data;
- ml_data->wc = wc;
- }
- JudyLIns(&ml_data->JudyL, (Word_t)rda, PJE0);
- ml_data->count++;
- }
- break;
- }
case METADATA_DEL_DIMENSION:
uuid = (uuid_t *) cmd.param[0];
if (likely(dimension_can_be_deleted(uuid, NULL, false)))
@@ -1695,7 +1684,9 @@ static void metadata_event_loop(void *arg)
data = mallocz(sizeof(*data));
data->request.data = data;
data->wc = wc;
+ data->data = cl_cleanup_data;
data->work_buffer = work_buffer;
+ cl_cleanup_data = NULL;
if (unlikely(cmd.completion)) {
data->max_count = 0; // 0 will process all pending updates
@@ -1711,6 +1702,7 @@ static void metadata_event_loop(void *arg)
after_metadata_hosts))) {
// Failed to launch worker -- let the event loop handle completion
cmd.completion = wc->scan_complete;
+ cl_cleanup_data = data->data;
freez(data);
metadata_flag_clear(wc, METADATA_FLAG_PROCESSING);
}
@@ -1728,6 +1720,15 @@ static void metadata_event_loop(void *arg)
freez(data);
}
break;
+ case METADATA_DELETE_HOST_CHART_LABELS:;
+ if (!cl_cleanup_data)
+ cl_cleanup_data = callocz(1,sizeof(*cl_cleanup_data));
+
+ Pvoid_t *PValue = JudyLIns(&cl_cleanup_data->JudyL, (Word_t) ++cl_cleanup_data->count, PJE0);
+ if (PValue)
+ *PValue = (void *) cmd.param[0];
+
+ break;
case METADATA_UNITTEST:;
struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0];
sleep_usec(1000); // processing takes 1ms
@@ -1755,10 +1756,12 @@ static void metadata_event_loop(void *arg)
freez(loop);
worker_unregister();
- netdata_log_info("METADATA: Shutting down event loop");
- completion_mark_complete(&wc->init_complete);
- completion_destroy(wc->scan_complete);
- freez(wc->scan_complete);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down metadata thread");
+ completion_mark_complete(&wc->start_stop_complete);
+ if (wc->scan_complete) {
+ completion_destroy(wc->scan_complete);
+ freez(wc->scan_complete);
+ }
metadata_free_cmd_queue(wc);
return;
@@ -1771,23 +1774,21 @@ error_after_loop_init:
worker_unregister();
}
-struct metadata_wc metasync_worker = {.loop = NULL};
-
void metadata_sync_shutdown(void)
{
- completion_init(&metasync_worker.init_complete);
+ completion_init(&metasync_worker.start_stop_complete);
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- netdata_log_info("METADATA: Sending a shutdown command");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a shutdown command");
cmd.opcode = METADATA_SYNC_SHUTDOWN;
metadata_enq_cmd(&metasync_worker, &cmd);
/* wait for metadata thread to shut down */
- netdata_log_info("METADATA: Waiting for shutdown ACK");
- completion_wait_for(&metasync_worker.init_complete);
- completion_destroy(&metasync_worker.init_complete);
- netdata_log_info("METADATA: Shutdown complete");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for shutdown ACK");
+ completion_wait_for(&metasync_worker.start_stop_complete);
+ completion_destroy(&metasync_worker.start_stop_complete);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Shutdown complete");
}
void metadata_sync_shutdown_prepare(void)
@@ -1804,20 +1805,20 @@ void metadata_sync_shutdown_prepare(void)
completion_init(compl);
__atomic_store_n(&wc->scan_complete, compl, __ATOMIC_RELAXED);
- netdata_log_info("METADATA: Sending a scan host command");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a scan host command");
uint32_t max_wait_iterations = 2000;
while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_PROCESSING)) && max_wait_iterations--) {
if (max_wait_iterations == 1999)
- netdata_log_info("METADATA: Current worker is running; waiting to finish");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Current worker is running; waiting to finish");
sleep_usec(1000);
}
cmd.opcode = METADATA_SCAN_HOSTS;
metadata_enq_cmd(&metasync_worker, &cmd);
- netdata_log_info("METADATA: Waiting for host scan completion");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for host scan completion");
completion_wait_for(wc->scan_complete);
- netdata_log_info("METADATA: Host scan complete; can continue with shutdown");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Host scan complete; can continue with shutdown");
}
// -------------------------------------------------------------
@@ -1828,15 +1829,14 @@ void metadata_sync_init(void)
struct metadata_wc *wc = &metasync_worker;
memset(wc, 0, sizeof(*wc));
- metadata_init_cmd_queue(wc);
- completion_init(&wc->init_complete);
+ completion_init(&wc->start_stop_complete);
fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc));
- completion_wait_for(&wc->init_complete);
- completion_destroy(&wc->init_complete);
+ completion_wait_for(&wc->start_stop_complete);
+ completion_destroy(&wc->start_stop_complete);
- netdata_log_info("SQLite metadata sync initialization complete");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "SQLite metadata sync initialization complete");
}
@@ -1887,9 +1887,7 @@ void metaqueue_host_update_info(RRDHOST *host)
void metaqueue_ml_load_models(RRDDIM *rd)
{
- if (unlikely(!metasync_worker.loop))
- return;
- queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL);
+ rrddim_flag_set(rd, RRDDIM_FLAG_ML_MODEL_LOAD);
}
void metadata_queue_load_host_context(RRDHOST *host)
@@ -1897,8 +1895,22 @@ void metadata_queue_load_host_context(RRDHOST *host)
if (unlikely(!metasync_worker.loop))
return;
queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command to load host contexts");
}
+void metadata_delete_host_chart_labels(char *machine_guid)
+{
+ if (unlikely(!metasync_worker.loop)) {
+ freez(machine_guid);
+ return;
+ }
+
+ // Node machine guid is already strdup-ed
+ queue_metadata_cmd(METADATA_DELETE_HOST_CHART_LABELS, machine_guid, NULL);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command delete chart labels for host %s", machine_guid);
+}
+
+
//
// unitests
//
@@ -1946,7 +1958,7 @@ static void *metadata_unittest_threads(void)
tu.join = 0;
for (int i = 0; i < threads_to_create; i++) {
char buf[100 + 1];
- snprintf(buf, 100, "META[%d]", i);
+ snprintf(buf, sizeof(buf) - 1, "META[%d]", i);
netdata_thread_create(
&threads[i],
buf,
diff --git a/database/sqlite/sqlite_metadata.h b/database/sqlite/sqlite_metadata.h
index f75a9ab00..6860cfedf 100644
--- a/database/sqlite/sqlite_metadata.h
+++ b/database/sqlite/sqlite_metadata.h
@@ -17,6 +17,7 @@ void metaqueue_host_update_info(RRDHOST *host);
void metaqueue_ml_load_models(RRDDIM *rd);
void migrate_localhost(uuid_t *host_uuid);
void metadata_queue_load_host_context(RRDHOST *host);
+void metadata_delete_host_chart_labels(char *machine_guid);
void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int vacuum_pc);
// UNIT TEST