summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_metadata.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/sqlite/sqlite_metadata.c354
1 files changed, 183 insertions, 171 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 143783163..636f51966 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -4,11 +4,12 @@
// SQL statements
-#define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \
- "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \
- "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;"
+#define SQL_STORE_CLAIM_ID \
+ "INSERT INTO node_instance " \
+ "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, UNIXEPOCH()) " \
+ "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id"
-#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
+#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid"
#define STORE_HOST_LABEL \
"INSERT INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
@@ -18,13 +19,13 @@
#define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())"
-#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;"
+#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid"
#define SQL_STORE_HOST_INFO \
"INSERT OR REPLACE INTO host (host_id, hostname, registry_hostname, update_every, os, timezone, tags, hops, " \
"memory_mode, abbrev_timezone, utc_offset, program_name, program_version, entries, health_enabled, last_connected) " \
"VALUES (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, " \
- "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected);"
+ "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected)"
#define SQL_STORE_CHART \
"INSERT INTO chart (chart_id, host_id, type, id, name, family, context, title, unit, plugin, module, priority, " \
@@ -51,11 +52,10 @@
"(@uuid, @name, @value, UNIXEPOCH())"
#define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \
- "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);"
-#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;"
-#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"
+ "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0)"
+#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id"
+#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host)"
-#define METADATA_CMD_Q_MAX_SIZE (2048) // Max queue size; callers will block until there is room
#define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds
#define METADATA_MAINTENANCE_REPEAT (60) // Repeat if last run for dimensions, charts, labels needs more work
#define METADATA_HEALTH_LOG_INTERVAL (3600) // Repeat maintenance for health
@@ -81,10 +81,10 @@ enum metadata_opcode {
METADATA_ADD_HOST_INFO,
METADATA_SCAN_HOSTS,
METADATA_LOAD_HOST_CONTEXT,
+ METADATA_DELETE_HOST_CHART_LABELS,
METADATA_MAINTENANCE,
METADATA_SYNC_SHUTDOWN,
METADATA_UNITTEST,
- METADATA_ML_LOAD_MODELS,
// leave this last
// we need it to check for worker utilization
METADATA_MAX_ENUMERATIONS_DEFINED
@@ -98,14 +98,9 @@ struct metadata_cmd {
struct metadata_cmd *prev, *next;
};
-struct metadata_database_cmdqueue {
- struct metadata_cmd *cmd_base;
-};
-
typedef enum {
METADATA_FLAG_PROCESSING = (1 << 0), // store or cleanup
METADATA_FLAG_SHUTDOWN = (1 << 1), // Shutting down
- METADATA_FLAG_ML_LOADING = (1 << 2), // ML model load in progress
} METADATA_FLAG;
struct metadata_wc {
@@ -114,19 +109,20 @@ struct metadata_wc {
uv_async_t async;
uv_timer_t timer_req;
time_t metadata_check_after;
- volatile unsigned queue_size;
METADATA_FLAG flags;
- struct completion init_complete;
+ struct completion start_stop_complete;
struct completion *scan_complete;
/* FIFO command queue */
- uv_mutex_t cmd_mutex;
- struct metadata_database_cmdqueue cmd_queue;
+ SPINLOCK cmd_queue_lock;
+ struct metadata_cmd *cmd_base;
};
#define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag))
#define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST)
#define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST)
+struct metadata_wc metasync_worker = {.loop = NULL};
+
//
// For unittest
//
@@ -146,6 +142,33 @@ struct query_build {
char uuid_str[UUID_STR_LEN];
};
+#define SQL_DELETE_CHART_LABELS_BY_HOST \
+ "DELETE FROM chart_label WHERE chart_id in (SELECT chart_id FROM chart WHERE host_id = @host_id)"
+
+static void delete_host_chart_labels(uuid_t *host_uuid)
+{
+ sqlite3_stmt *res = NULL;
+
+ int rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_LABELS_BY_HOST, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to delete chart labels by host");
+ return;
+ }
+
+ rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind host_id parameter to host chart labels");
+ goto failed;
+ }
+ rc = sqlite3_step_monitored(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to execute command to remove host chart labels");
+
+failed:
+ if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
+ error_report("Failed to finalize statement to remove host chart labels");
+}
+
static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
struct query_build *lb = data;
if (unlikely(!lb->count))
@@ -168,8 +191,8 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value
return 1;
}
-#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;"
-#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;"
+#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id"
+#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id"
static void clean_old_chart_labels(RRDSET *st)
{
@@ -177,9 +200,9 @@ static void clean_old_chart_labels(RRDSET *st)
time_t first_time_s = rrdset_first_entry_s(st);
if (unlikely(!first_time_s))
- snprintfz(sql, 511,SQL_DELETE_CHART_LABEL);
+ snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL);
else
- snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
+ snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
int rc = exec_statement_with_uuid(sql, &st->chart_uuid);
if (unlikely(rc))
@@ -873,7 +896,7 @@ static void check_dimension_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_DIM_CHECK_INTERVAL;
}
- netdata_log_info(
+ internal_error(true,
"METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
@@ -940,7 +963,7 @@ static void check_chart_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_CHART_CHECK_INTERVAL;
}
- netdata_log_info(
+ internal_error(true,
"METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
@@ -1009,7 +1032,7 @@ static void check_label_metadata(struct metadata_wc *wc)
next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL;
}
- netdata_log_info(
+ internal_error(true,
"METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds",
total_checked,
total_deleted,
@@ -1059,21 +1082,15 @@ static void cleanup_health_log(struct metadata_wc *wc)
// EVENT LOOP STARTS HERE
//
-static void metadata_init_cmd_queue(struct metadata_wc *wc)
-{
- wc->cmd_queue.cmd_base = NULL;
- fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
-}
-
static void metadata_free_cmd_queue(struct metadata_wc *wc)
{
- uv_mutex_lock(&wc->cmd_mutex);
- while(wc->cmd_queue.cmd_base) {
- struct metadata_cmd *t = wc->cmd_queue.cmd_base;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ spinlock_lock(&wc->cmd_queue_lock);
+ while(wc->cmd_base) {
+ struct metadata_cmd *t = wc->cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next);
freez(t);
}
- uv_mutex_unlock(&wc->cmd_mutex);
+ spinlock_unlock(&wc->cmd_queue_lock);
}
static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
@@ -1090,9 +1107,9 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
*t = *cmd;
t->prev = t->next = NULL;
- uv_mutex_lock(&wc->cmd_mutex);
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
- uv_mutex_unlock(&wc->cmd_mutex);
+ spinlock_lock(&wc->cmd_queue_lock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_base, t, prev, next);
+ spinlock_unlock(&wc->cmd_queue_lock);
wakeup_event_loop:
(void) uv_async_send(&wc->async);
@@ -1102,10 +1119,10 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
{
struct metadata_cmd ret;
- uv_mutex_lock(&wc->cmd_mutex);
- if(wc->cmd_queue.cmd_base) {
- struct metadata_cmd *t = wc->cmd_queue.cmd_base;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ spinlock_lock(&wc->cmd_queue_lock);
+ if(wc->cmd_base) {
+ struct metadata_cmd *t = wc->cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next);
ret = *t;
freez(t);
}
@@ -1113,7 +1130,7 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
ret.opcode = METADATA_DATABASE_NOOP;
ret.completion = NULL;
}
- uv_mutex_unlock(&wc->cmd_mutex);
+ spinlock_unlock(&wc->cmd_queue_lock);
return ret;
}
@@ -1136,9 +1153,7 @@ static void timer_cb(uv_timer_t* handle)
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- time_t now = now_realtime_sec();
-
- if (wc->metadata_check_after && wc->metadata_check_after < now) {
+ if (wc->metadata_check_after < now_realtime_sec()) {
cmd.opcode = METADATA_SCAN_HOSTS;
metadata_enq_cmd(wc, &cmd);
}
@@ -1158,10 +1173,10 @@ void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int
if (free_pages > (total_pages * threshold / 100)) {
int do_free_pages = (int) (free_pages * vacuum_pc / 100);
- netdata_log_info("%s: Freeing %d database pages", db_alias, do_free_pages);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s: Freeing %d database pages", db_alias, do_free_pages);
char sql[128];
- snprintfz(sql, 127, "PRAGMA incremental_vacuum(%d)", do_free_pages);
+ snprintfz(sql, sizeof(sql) - 1, "PRAGMA incremental_vacuum(%d)", do_free_pages);
(void) db_execute(database, sql);
}
}
@@ -1184,16 +1199,10 @@ void run_metadata_cleanup(struct metadata_wc *wc)
(void) sqlite3_wal_checkpoint(db_meta, NULL);
}
-struct ml_model_payload {
- uv_work_t request;
- struct metadata_wc *wc;
- Pvoid_t JudyL;
- size_t count;
-};
-
struct scan_metadata_payload {
uv_work_t request;
struct metadata_wc *wc;
+ void *data;
BUFFER *work_buffer;
uint32_t max_count;
};
@@ -1271,7 +1280,7 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)
register_libuv_worker_jobs();
struct scan_metadata_payload *data = req->data;
- UNUSED(data);
+ struct metadata_wc *wc = data->wc;
worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD);
usec_t started_ut = now_monotonic_usec(); (void)started_ut;
@@ -1279,6 +1288,9 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)
RRDHOST *host;
size_t max_threads = MIN(get_netdata_cpus() / 2, 6);
+ if (max_threads < 1)
+ max_threads = 1;
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Using %zu threads for context loading", max_threads);
struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt));
size_t thread_index;
@@ -1290,25 +1302,28 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)
rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host));
- cleanup_finished_threads(hclt, max_threads, false);
- bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
+ bool found_slot = false;
+ do {
+ if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
+ break;
- if (unlikely(!found_slot)) {
- struct host_context_load_thread hclt_sync = {.host = host};
- restore_host_context(&hclt_sync);
- }
- else {
- __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
- hclt[thread_index].host = host;
- assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
- }
+ cleanup_finished_threads(hclt, max_threads, false);
+ found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
+ } while (!found_slot);
+
+ if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
+ break;
+
+ __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
+ hclt[thread_index].host = host;
+ fatal_assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
}
dfe_done(host);
cleanup_finished_threads(hclt, max_threads, true);
freez(hclt);
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
- internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
worker_is_idle();
}
@@ -1335,6 +1350,10 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
bool more_to_do = false;
uint32_t scan_count = 1;
+ sqlite3_stmt *ml_load_stmt = NULL;
+
+ bool load_ml_models = max_count;
+
if (use_transaction)
(void)db_execute(db_meta, "BEGIN TRANSACTION");
@@ -1379,6 +1398,14 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
rrdhost_hostname(host), rrdset_name(st),
rrddim_name(rd));
}
+
+ if(rrddim_flag_check(rd, RRDDIM_FLAG_ML_MODEL_LOAD)) {
+ rrddim_flag_clear(rd, RRDDIM_FLAG_ML_MODEL_LOAD);
+ if (likely(load_ml_models))
+ (void) ml_dimension_load_models(rd, &ml_load_stmt);
+ }
+
+ worker_is_idle();
}
rrddim_foreach_done(rd);
}
@@ -1387,6 +1414,11 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
if (use_transaction)
(void)db_execute(db_meta, "COMMIT TRANSACTION");
+ if (ml_load_stmt) {
+ sqlite3_finalize(ml_load_stmt);
+ ml_load_stmt = NULL;
+ }
+
return more_to_do;
}
@@ -1411,6 +1443,11 @@ static void store_host_and_system_info(RRDHOST *host, size_t *query_counter)
}
}
+struct host_chart_label_cleanup {
+ Pvoid_t JudyL;
+ Word_t count;
+};
+
// Worker thread to scan hosts for pending metadata to store
static void start_metadata_hosts(uv_work_t *req __maybe_unused)
{
@@ -1427,11 +1464,33 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
internal_error(true, "METADATA: checking all hosts...");
usec_t started_ut = now_monotonic_usec(); (void)started_ut;
+ struct host_chart_label_cleanup *cl_cleanup_data = data->data;
+
+ if (cl_cleanup_data) {
+ Word_t Index = 0;
+ bool first = true;
+ Pvoid_t *PValue;
+ while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) {
+ char *machine_guid = *PValue;
+
+ host = rrdhost_find_by_guid(machine_guid);
+ if (likely(!host)) {
+ uuid_t host_uuid;
+ if (!uuid_parse(machine_guid, host_uuid))
+ delete_host_chart_labels(&host_uuid);
+ }
+
+ freez(machine_guid);
+ }
+ JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0);
+ freez(cl_cleanup_data);
+ }
+
bool run_again = false;
worker_is_busy(UV_EVENT_METADATA_STORE);
if (!data->max_count)
- transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;");
+ transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION");
dfe_start_reentrant(rrdhost_root_index, host) {
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
@@ -1501,7 +1560,7 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
dfe_done(host);
if (!data->max_count && transaction_started)
- transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;");
+ transaction_started = db_execute(db_meta, "COMMIT TRANSACTION");
usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
@@ -1516,42 +1575,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
worker_is_idle();
}
-// Callback after scan of hosts is done
-static void after_start_ml_model_load(uv_work_t *req, int status __maybe_unused)
-{
- struct ml_model_payload *ml_data = req->data;
- struct metadata_wc *wc = ml_data->wc;
- metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING);
- JudyLFreeArray(&ml_data->JudyL, PJE0);
- freez(ml_data);
-}
-
-static void start_ml_model_load(uv_work_t *req __maybe_unused)
-{
- register_libuv_worker_jobs();
-
- struct ml_model_payload *ml_data = req->data;
-
- worker_is_busy(UV_EVENT_METADATA_ML_LOAD);
-
- Pvoid_t *PValue;
- Word_t Index = 0;
- bool first = true;
- RRDDIM *rd;
- RRDDIM_ACQUIRED *rda;
- internal_error(true, "Batch ML load loader, %zu items", ml_data->count);
- while((PValue = JudyLFirstThenNext(ml_data->JudyL, &Index, &first))) {
- UNUSED(PValue);
- rda = (RRDDIM_ACQUIRED *) Index;
- rd = rrddim_acquired_to_rrddim(rda);
- ml_dimension_load_models(rd);
- rrddim_acquired_release(rda);
- }
- worker_is_idle();
-}
-
-
-
static void metadata_event_loop(void *arg)
{
worker_register("METASYNC");
@@ -1561,7 +1584,6 @@ static void metadata_event_loop(void *arg)
worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
- worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models");
int ret;
uv_loop_t *loop;
@@ -1593,7 +1615,7 @@ static void metadata_event_loop(void *arg)
wc->timer_req.data = wc;
fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
- netdata_log_info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Starting metadata sync thread");
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -1602,11 +1624,11 @@ static void metadata_event_loop(void *arg)
wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
int shutdown = 0;
- completion_mark_complete(&wc->init_complete);
+ completion_mark_complete(&wc->start_stop_complete);
BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
struct scan_metadata_payload *data;
+ struct host_chart_label_cleanup *cl_cleanup_data = NULL;
- struct ml_model_payload *ml_data = NULL;
while (shutdown == 0 || (wc->flags & METADATA_FLAG_PROCESSING)) {
uuid_t *uuid;
RRDHOST *host = NULL;
@@ -1633,43 +1655,10 @@ static void metadata_event_loop(void *arg)
if (likely(opcode != METADATA_DATABASE_NOOP))
worker_is_busy(opcode);
- // Have pending ML models to load?
- if (opcode != METADATA_ML_LOAD_MODELS && ml_data && ml_data->count) {
- static usec_t ml_submit_last = 0;
- usec_t now = now_monotonic_usec();
- if (!ml_submit_last)
- ml_submit_last = now;
-
- if (!metadata_flag_check(wc, METADATA_FLAG_ML_LOADING) && (now - ml_submit_last > 150 * USEC_PER_MS)) {
- metadata_flag_set(wc, METADATA_FLAG_ML_LOADING);
- if (unlikely(uv_queue_work(loop, &ml_data->request, start_ml_model_load, after_start_ml_model_load)))
- metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING);
- else {
- ml_submit_last = now;
- ml_data = NULL;
- }
- }
- }
-
switch (opcode) {
case METADATA_DATABASE_NOOP:
case METADATA_DATABASE_TIMER:
break;
-
- case METADATA_ML_LOAD_MODELS: {
- RRDDIM *rd = (RRDDIM *) cmd.param[0];
- RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(rd->rrdset, rrddim_id(rd));
- if (likely(rda)) {
- if (!ml_data) {
- ml_data = callocz(1,sizeof(*ml_data));
- ml_data->request.data = ml_data;
- ml_data->wc = wc;
- }
- JudyLIns(&ml_data->JudyL, (Word_t)rda, PJE0);
- ml_data->count++;
- }
- break;
- }
case METADATA_DEL_DIMENSION:
uuid = (uuid_t *) cmd.param[0];
if (likely(dimension_can_be_deleted(uuid, NULL, false)))
@@ -1695,7 +1684,9 @@ static void metadata_event_loop(void *arg)
data = mallocz(sizeof(*data));
data->request.data = data;
data->wc = wc;
+ data->data = cl_cleanup_data;
data->work_buffer = work_buffer;
+ cl_cleanup_data = NULL;
if (unlikely(cmd.completion)) {
data->max_count = 0; // 0 will process all pending updates
@@ -1711,6 +1702,7 @@ static void metadata_event_loop(void *arg)
after_metadata_hosts))) {
// Failed to launch worker -- let the event loop handle completion
cmd.completion = wc->scan_complete;
+ cl_cleanup_data = data->data;
freez(data);
metadata_flag_clear(wc, METADATA_FLAG_PROCESSING);
}
@@ -1728,6 +1720,15 @@ static void metadata_event_loop(void *arg)
freez(data);
}
break;
+ case METADATA_DELETE_HOST_CHART_LABELS:;
+ if (!cl_cleanup_data)
+ cl_cleanup_data = callocz(1,sizeof(*cl_cleanup_data));
+
+ Pvoid_t *PValue = JudyLIns(&cl_cleanup_data->JudyL, (Word_t) ++cl_cleanup_data->count, PJE0);
+ if (PValue)
+ *PValue = (void *) cmd.param[0];
+
+ break;
case METADATA_UNITTEST:;
struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0];
sleep_usec(1000); // processing takes 1ms
@@ -1755,10 +1756,12 @@ static void metadata_event_loop(void *arg)
freez(loop);
worker_unregister();
- netdata_log_info("METADATA: Shutting down event loop");
- completion_mark_complete(&wc->init_complete);
- completion_destroy(wc->scan_complete);
- freez(wc->scan_complete);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down metadata thread");
+ completion_mark_complete(&wc->start_stop_complete);
+ if (wc->scan_complete) {
+ completion_destroy(wc->scan_complete);
+ freez(wc->scan_complete);
+ }
metadata_free_cmd_queue(wc);
return;
@@ -1771,23 +1774,21 @@ error_after_loop_init:
worker_unregister();
}
-struct metadata_wc metasync_worker = {.loop = NULL};
-
void metadata_sync_shutdown(void)
{
- completion_init(&metasync_worker.init_complete);
+ completion_init(&metasync_worker.start_stop_complete);
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- netdata_log_info("METADATA: Sending a shutdown command");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a shutdown command");
cmd.opcode = METADATA_SYNC_SHUTDOWN;
metadata_enq_cmd(&metasync_worker, &cmd);
/* wait for metadata thread to shut down */
- netdata_log_info("METADATA: Waiting for shutdown ACK");
- completion_wait_for(&metasync_worker.init_complete);
- completion_destroy(&metasync_worker.init_complete);
- netdata_log_info("METADATA: Shutdown complete");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for shutdown ACK");
+ completion_wait_for(&metasync_worker.start_stop_complete);
+ completion_destroy(&metasync_worker.start_stop_complete);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Shutdown complete");
}
void metadata_sync_shutdown_prepare(void)
@@ -1804,20 +1805,20 @@ void metadata_sync_shutdown_prepare(void)
completion_init(compl);
__atomic_store_n(&wc->scan_complete, compl, __ATOMIC_RELAXED);
- netdata_log_info("METADATA: Sending a scan host command");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a scan host command");
uint32_t max_wait_iterations = 2000;
while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_PROCESSING)) && max_wait_iterations--) {
if (max_wait_iterations == 1999)
- netdata_log_info("METADATA: Current worker is running; waiting to finish");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Current worker is running; waiting to finish");
sleep_usec(1000);
}
cmd.opcode = METADATA_SCAN_HOSTS;
metadata_enq_cmd(&metasync_worker, &cmd);
- netdata_log_info("METADATA: Waiting for host scan completion");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for host scan completion");
completion_wait_for(wc->scan_complete);
- netdata_log_info("METADATA: Host scan complete; can continue with shutdown");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Host scan complete; can continue with shutdown");
}
// -------------------------------------------------------------
@@ -1828,15 +1829,14 @@ void metadata_sync_init(void)
struct metadata_wc *wc = &metasync_worker;
memset(wc, 0, sizeof(*wc));
- metadata_init_cmd_queue(wc);
- completion_init(&wc->init_complete);
+ completion_init(&wc->start_stop_complete);
fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc));
- completion_wait_for(&wc->init_complete);
- completion_destroy(&wc->init_complete);
+ completion_wait_for(&wc->start_stop_complete);
+ completion_destroy(&wc->start_stop_complete);
- netdata_log_info("SQLite metadata sync initialization complete");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "SQLite metadata sync initialization complete");
}
@@ -1887,9 +1887,7 @@ void metaqueue_host_update_info(RRDHOST *host)
void metaqueue_ml_load_models(RRDDIM *rd)
{
- if (unlikely(!metasync_worker.loop))
- return;
- queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL);
+ rrddim_flag_set(rd, RRDDIM_FLAG_ML_MODEL_LOAD);
}
void metadata_queue_load_host_context(RRDHOST *host)
@@ -1897,8 +1895,22 @@ void metadata_queue_load_host_context(RRDHOST *host)
if (unlikely(!metasync_worker.loop))
return;
queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command to load host contexts");
}
+void metadata_delete_host_chart_labels(char *machine_guid)
+{
+ if (unlikely(!metasync_worker.loop)) {
+ freez(machine_guid);
+ return;
+ }
+
+ // Node machine guid is already strdup-ed
+ queue_metadata_cmd(METADATA_DELETE_HOST_CHART_LABELS, machine_guid, NULL);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command delete chart labels for host %s", machine_guid);
+}
+
+
//
// unitests
//
@@ -1946,7 +1958,7 @@ static void *metadata_unittest_threads(void)
tu.join = 0;
for (int i = 0; i < threads_to_create; i++) {
char buf[100 + 1];
- snprintf(buf, 100, "META[%d]", i);
+ snprintf(buf, sizeof(buf) - 1, "META[%d]", i);
netdata_thread_create(
&threads[i],
buf,