diff options
Diffstat (limited to 'database/sqlite/sqlite_metadata.c')
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 354 |
1 files changed, 183 insertions, 171 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 143783163..636f51966 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -4,11 +4,12 @@ // SQL statements -#define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \ - "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \ - "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;" +#define SQL_STORE_CLAIM_ID \ + "INSERT INTO node_instance " \ + "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, UNIXEPOCH()) " \ + "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id" -#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;" +#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid" #define STORE_HOST_LABEL \ "INSERT INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES " @@ -18,13 +19,13 @@ #define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())" -#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;" +#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid" #define SQL_STORE_HOST_INFO \ "INSERT OR REPLACE INTO host (host_id, hostname, registry_hostname, update_every, os, timezone, tags, hops, " \ "memory_mode, abbrev_timezone, utc_offset, program_name, program_version, entries, health_enabled, last_connected) " \ "VALUES (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, " \ - "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected);" + "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected)" #define SQL_STORE_CHART \ "INSERT INTO chart (chart_id, host_id, type, id, name, family, context, title, unit, plugin, module, priority, " \ @@ -51,11 +52,10 @@ "(@uuid, @name, @value, UNIXEPOCH())" #define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \ - "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);" -#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;" -#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);" + "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0)" +#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id" +#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host)" -#define METADATA_CMD_Q_MAX_SIZE (2048) // Max queue size; callers will block until there is room #define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds #define METADATA_MAINTENANCE_REPEAT (60) // Repeat if last run for dimensions, charts, labels needs more work #define METADATA_HEALTH_LOG_INTERVAL (3600) // Repeat maintenance for health @@ -81,10 +81,10 @@ enum metadata_opcode { METADATA_ADD_HOST_INFO, METADATA_SCAN_HOSTS, METADATA_LOAD_HOST_CONTEXT, + METADATA_DELETE_HOST_CHART_LABELS, METADATA_MAINTENANCE, METADATA_SYNC_SHUTDOWN, METADATA_UNITTEST, - METADATA_ML_LOAD_MODELS, // leave this last // we need it to check for worker utilization METADATA_MAX_ENUMERATIONS_DEFINED @@ -98,14 +98,9 @@ struct metadata_cmd { struct metadata_cmd *prev, *next; }; -struct metadata_database_cmdqueue { - struct metadata_cmd *cmd_base; -}; - typedef enum { METADATA_FLAG_PROCESSING = (1 << 0), // store or cleanup METADATA_FLAG_SHUTDOWN = (1 << 1), // Shutting down - METADATA_FLAG_ML_LOADING = (1 << 2), // ML model load in progress } METADATA_FLAG; struct metadata_wc { @@ -114,19 +109,20 @@ struct metadata_wc { uv_async_t async; uv_timer_t timer_req; time_t metadata_check_after; - volatile unsigned queue_size; METADATA_FLAG flags; - struct completion init_complete; + struct completion start_stop_complete; struct completion *scan_complete; /* FIFO command queue */ - uv_mutex_t cmd_mutex; - struct metadata_database_cmdqueue cmd_queue; + SPINLOCK cmd_queue_lock; + struct metadata_cmd *cmd_base; }; #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag)) #define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST) #define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST) +struct metadata_wc metasync_worker = {.loop = NULL}; + // // For unittest // @@ -146,6 +142,33 @@ struct query_build { char uuid_str[UUID_STR_LEN]; }; +#define SQL_DELETE_CHART_LABELS_BY_HOST \ + "DELETE FROM chart_label WHERE chart_id in (SELECT chart_id FROM chart WHERE host_id = @host_id)" + +static void delete_host_chart_labels(uuid_t *host_uuid) +{ + sqlite3_stmt *res = NULL; + + int rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_LABELS_BY_HOST, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to delete chart labels by host"); + return; + } + + rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id parameter to host chart labels"); + goto failed; + } + rc = sqlite3_step_monitored(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to execute command to remove host chart labels"); + +failed: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("Failed to finalize statement to remove host chart labels"); +} + static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { struct query_build *lb = data; if (unlikely(!lb->count)) @@ -168,8 +191,8 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value return 1; } -#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;" -#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;" +#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id" +#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id" static void clean_old_chart_labels(RRDSET *st) { @@ -177,9 +200,9 @@ static void clean_old_chart_labels(RRDSET *st) time_t first_time_s = rrdset_first_entry_s(st); if (unlikely(!first_time_s)) - snprintfz(sql, 511,SQL_DELETE_CHART_LABEL); + snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL); else - snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s); + snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_CHART_LABEL_HISTORY, first_time_s); int rc = exec_statement_with_uuid(sql, &st->chart_uuid); if (unlikely(rc)) @@ -873,7 +896,7 @@ static void check_dimension_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_DIM_CHECK_INTERVAL; } - netdata_log_info( + internal_error(true, "METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, @@ -940,7 +963,7 @@ static void check_chart_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_CHART_CHECK_INTERVAL; } - netdata_log_info( + internal_error(true, "METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, @@ -1009,7 +1032,7 @@ static void check_label_metadata(struct metadata_wc *wc) next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL; } - netdata_log_info( + internal_error(true, "METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds", total_checked, total_deleted, @@ -1059,21 +1082,15 @@ static void cleanup_health_log(struct metadata_wc *wc) // EVENT LOOP STARTS HERE // -static void metadata_init_cmd_queue(struct metadata_wc *wc) -{ - wc->cmd_queue.cmd_base = NULL; - fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); -} - static void metadata_free_cmd_queue(struct metadata_wc *wc) { - uv_mutex_lock(&wc->cmd_mutex); - while(wc->cmd_queue.cmd_base) { - struct metadata_cmd *t = wc->cmd_queue.cmd_base; - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + spinlock_lock(&wc->cmd_queue_lock); + while(wc->cmd_base) { + struct metadata_cmd *t = wc->cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next); freez(t); } - uv_mutex_unlock(&wc->cmd_mutex); + spinlock_unlock(&wc->cmd_queue_lock); } static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) @@ -1090,9 +1107,9 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) *t = *cmd; t->prev = t->next = NULL; - uv_mutex_lock(&wc->cmd_mutex); - DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); - uv_mutex_unlock(&wc->cmd_mutex); + spinlock_lock(&wc->cmd_queue_lock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_base, t, prev, next); + spinlock_unlock(&wc->cmd_queue_lock); wakeup_event_loop: (void) uv_async_send(&wc->async); @@ -1102,10 +1119,10 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) { struct metadata_cmd ret; - uv_mutex_lock(&wc->cmd_mutex); - if(wc->cmd_queue.cmd_base) { - struct metadata_cmd *t = wc->cmd_queue.cmd_base; - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + spinlock_lock(&wc->cmd_queue_lock); + if(wc->cmd_base) { + struct metadata_cmd *t = wc->cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_base, t, prev, next); ret = *t; freez(t); } @@ -1113,7 +1130,7 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) ret.opcode = METADATA_DATABASE_NOOP; ret.completion = NULL; } - uv_mutex_unlock(&wc->cmd_mutex); + spinlock_unlock(&wc->cmd_queue_lock); return ret; } @@ -1136,9 +1153,7 @@ static void timer_cb(uv_timer_t* handle) struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - time_t now = now_realtime_sec(); - - if (wc->metadata_check_after && wc->metadata_check_after < now) { + if (wc->metadata_check_after < now_realtime_sec()) { cmd.opcode = METADATA_SCAN_HOSTS; metadata_enq_cmd(wc, &cmd); } @@ -1158,10 +1173,10 @@ void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int if (free_pages > (total_pages * threshold / 100)) { int do_free_pages = (int) (free_pages * vacuum_pc / 100); - netdata_log_info("%s: Freeing %d database pages", db_alias, do_free_pages); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s: Freeing %d database pages", db_alias, do_free_pages); char sql[128]; - snprintfz(sql, 127, "PRAGMA incremental_vacuum(%d)", do_free_pages); + snprintfz(sql, sizeof(sql) - 1, "PRAGMA incremental_vacuum(%d)", do_free_pages); (void) db_execute(database, sql); } } @@ -1184,16 +1199,10 @@ void run_metadata_cleanup(struct metadata_wc *wc) (void) sqlite3_wal_checkpoint(db_meta, NULL); } -struct ml_model_payload { - uv_work_t request; - struct metadata_wc *wc; - Pvoid_t JudyL; - size_t count; -}; - struct scan_metadata_payload { uv_work_t request; struct metadata_wc *wc; + void *data; BUFFER *work_buffer; uint32_t max_count; }; @@ -1271,7 +1280,7 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused) register_libuv_worker_jobs(); struct scan_metadata_payload *data = req->data; - UNUSED(data); + struct metadata_wc *wc = data->wc; worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD); usec_t started_ut = now_monotonic_usec(); (void)started_ut; @@ -1279,6 +1288,9 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused) RRDHOST *host; size_t max_threads = MIN(get_netdata_cpus() / 2, 6); + if (max_threads < 1) + max_threads = 1; + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Using %zu threads for context loading", max_threads); struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt)); size_t thread_index; @@ -1290,25 +1302,28 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused) rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host)); - cleanup_finished_threads(hclt, max_threads, false); - bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); + bool found_slot = false; + do { + if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) + break; - if (unlikely(!found_slot)) { - struct host_context_load_thread hclt_sync = {.host = host}; - restore_host_context(&hclt_sync); - } - else { - __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); - hclt[thread_index].host = host; - assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index])); - } + cleanup_finished_threads(hclt, max_threads, false); + found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); + } while (!found_slot); + + if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) + break; + + __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); + hclt[thread_index].host = host; + fatal_assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index])); } dfe_done(host); cleanup_finished_threads(hclt, max_threads, true); freez(hclt); usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; - internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); worker_is_idle(); } @@ -1335,6 +1350,10 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans bool more_to_do = false; uint32_t scan_count = 1; + sqlite3_stmt *ml_load_stmt = NULL; + + bool load_ml_models = max_count; + if (use_transaction) (void)db_execute(db_meta, "BEGIN TRANSACTION"); @@ -1379,6 +1398,14 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans rrdhost_hostname(host), rrdset_name(st), rrddim_name(rd)); } + + if(rrddim_flag_check(rd, RRDDIM_FLAG_ML_MODEL_LOAD)) { + rrddim_flag_clear(rd, RRDDIM_FLAG_ML_MODEL_LOAD); + if (likely(load_ml_models)) + (void) ml_dimension_load_models(rd, &ml_load_stmt); + } + + worker_is_idle(); } rrddim_foreach_done(rd); } @@ -1387,6 +1414,11 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans if (use_transaction) (void)db_execute(db_meta, "COMMIT TRANSACTION"); + if (ml_load_stmt) { + sqlite3_finalize(ml_load_stmt); + ml_load_stmt = NULL; + } + return more_to_do; } @@ -1411,6 +1443,11 @@ static void store_host_and_system_info(RRDHOST *host, size_t *query_counter) } } +struct host_chart_label_cleanup { + Pvoid_t JudyL; + Word_t count; +}; + // Worker thread to scan hosts for pending metadata to store static void start_metadata_hosts(uv_work_t *req __maybe_unused) { @@ -1427,11 +1464,33 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) internal_error(true, "METADATA: checking all hosts..."); usec_t started_ut = now_monotonic_usec(); (void)started_ut; + struct host_chart_label_cleanup *cl_cleanup_data = data->data; + + if (cl_cleanup_data) { + Word_t Index = 0; + bool first = true; + Pvoid_t *PValue; + while ((PValue = JudyLFirstThenNext(cl_cleanup_data->JudyL, &Index, &first))) { + char *machine_guid = *PValue; + + host = rrdhost_find_by_guid(machine_guid); + if (likely(!host)) { + uuid_t host_uuid; + if (!uuid_parse(machine_guid, host_uuid)) + delete_host_chart_labels(&host_uuid); + } + + freez(machine_guid); + } + JudyLFreeArray(&cl_cleanup_data->JudyL, PJE0); + freez(cl_cleanup_data); + } + bool run_again = false; worker_is_busy(UV_EVENT_METADATA_STORE); if (!data->max_count) - transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;"); + transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION"); dfe_start_reentrant(rrdhost_root_index, host) { if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE)) @@ -1501,7 +1560,7 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) dfe_done(host); if (!data->max_count && transaction_started) - transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;"); + transaction_started = db_execute(db_meta, "COMMIT TRANSACTION"); usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut; internal_error(true, "METADATA: checking all hosts completed in %0.2f ms", @@ -1516,42 +1575,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) worker_is_idle(); } -// Callback after scan of hosts is done -static void after_start_ml_model_load(uv_work_t *req, int status __maybe_unused) -{ - struct ml_model_payload *ml_data = req->data; - struct metadata_wc *wc = ml_data->wc; - metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING); - JudyLFreeArray(&ml_data->JudyL, PJE0); - freez(ml_data); -} - -static void start_ml_model_load(uv_work_t *req __maybe_unused) -{ - register_libuv_worker_jobs(); - - struct ml_model_payload *ml_data = req->data; - - worker_is_busy(UV_EVENT_METADATA_ML_LOAD); - - Pvoid_t *PValue; - Word_t Index = 0; - bool first = true; - RRDDIM *rd; - RRDDIM_ACQUIRED *rda; - internal_error(true, "Batch ML load loader, %zu items", ml_data->count); - while((PValue = JudyLFirstThenNext(ml_data->JudyL, &Index, &first))) { - UNUSED(PValue); - rda = (RRDDIM_ACQUIRED *) Index; - rd = rrddim_acquired_to_rrddim(rda); - ml_dimension_load_models(rd); - rrddim_acquired_release(rda); - } - worker_is_idle(); -} - - - static void metadata_event_loop(void *arg) { worker_register("METASYNC"); @@ -1561,7 +1584,6 @@ static void metadata_event_loop(void *arg) worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id"); worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info"); worker_register_job_name(METADATA_MAINTENANCE, "maintenance"); - worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models"); int ret; uv_loop_t *loop; @@ -1593,7 +1615,7 @@ static void metadata_event_loop(void *arg) wc->timer_req.data = wc; fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS)); - netdata_log_info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Starting metadata sync thread"); struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -1602,11 +1624,11 @@ static void metadata_event_loop(void *arg) wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK; int shutdown = 0; - completion_mark_complete(&wc->init_complete); + completion_mark_complete(&wc->start_stop_complete); BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); struct scan_metadata_payload *data; + struct host_chart_label_cleanup *cl_cleanup_data = NULL; - struct ml_model_payload *ml_data = NULL; while (shutdown == 0 || (wc->flags & METADATA_FLAG_PROCESSING)) { uuid_t *uuid; RRDHOST *host = NULL; @@ -1633,43 +1655,10 @@ static void metadata_event_loop(void *arg) if (likely(opcode != METADATA_DATABASE_NOOP)) worker_is_busy(opcode); - // Have pending ML models to load? - if (opcode != METADATA_ML_LOAD_MODELS && ml_data && ml_data->count) { - static usec_t ml_submit_last = 0; - usec_t now = now_monotonic_usec(); - if (!ml_submit_last) - ml_submit_last = now; - - if (!metadata_flag_check(wc, METADATA_FLAG_ML_LOADING) && (now - ml_submit_last > 150 * USEC_PER_MS)) { - metadata_flag_set(wc, METADATA_FLAG_ML_LOADING); - if (unlikely(uv_queue_work(loop, &ml_data->request, start_ml_model_load, after_start_ml_model_load))) - metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING); - else { - ml_submit_last = now; - ml_data = NULL; - } - } - } - switch (opcode) { case METADATA_DATABASE_NOOP: case METADATA_DATABASE_TIMER: break; - - case METADATA_ML_LOAD_MODELS: { - RRDDIM *rd = (RRDDIM *) cmd.param[0]; - RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(rd->rrdset, rrddim_id(rd)); - if (likely(rda)) { - if (!ml_data) { - ml_data = callocz(1,sizeof(*ml_data)); - ml_data->request.data = ml_data; - ml_data->wc = wc; - } - JudyLIns(&ml_data->JudyL, (Word_t)rda, PJE0); - ml_data->count++; - } - break; - } case METADATA_DEL_DIMENSION: uuid = (uuid_t *) cmd.param[0]; if (likely(dimension_can_be_deleted(uuid, NULL, false))) @@ -1695,7 +1684,9 @@ static void metadata_event_loop(void *arg) data = mallocz(sizeof(*data)); data->request.data = data; data->wc = wc; + data->data = cl_cleanup_data; data->work_buffer = work_buffer; + cl_cleanup_data = NULL; if (unlikely(cmd.completion)) { data->max_count = 0; // 0 will process all pending updates @@ -1711,6 +1702,7 @@ static void metadata_event_loop(void *arg) after_metadata_hosts))) { // Failed to launch worker -- let the event loop handle completion cmd.completion = wc->scan_complete; + cl_cleanup_data = data->data; freez(data); metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); } @@ -1728,6 +1720,15 @@ static void metadata_event_loop(void *arg) freez(data); } break; + case METADATA_DELETE_HOST_CHART_LABELS:; + if (!cl_cleanup_data) + cl_cleanup_data = callocz(1,sizeof(*cl_cleanup_data)); + + Pvoid_t *PValue = JudyLIns(&cl_cleanup_data->JudyL, (Word_t) ++cl_cleanup_data->count, PJE0); + if (PValue) + *PValue = (void *) cmd.param[0]; + + break; case METADATA_UNITTEST:; struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0]; sleep_usec(1000); // processing takes 1ms @@ -1755,10 +1756,12 @@ static void metadata_event_loop(void *arg) freez(loop); worker_unregister(); - netdata_log_info("METADATA: Shutting down event loop"); - completion_mark_complete(&wc->init_complete); - completion_destroy(wc->scan_complete); - freez(wc->scan_complete); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down metadata thread"); + completion_mark_complete(&wc->start_stop_complete); + if (wc->scan_complete) { + completion_destroy(wc->scan_complete); + freez(wc->scan_complete); + } metadata_free_cmd_queue(wc); return; @@ -1771,23 +1774,21 @@ error_after_loop_init: worker_unregister(); } -struct metadata_wc metasync_worker = {.loop = NULL}; - void metadata_sync_shutdown(void) { - completion_init(&metasync_worker.init_complete); + completion_init(&metasync_worker.start_stop_complete); struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - netdata_log_info("METADATA: Sending a shutdown command"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a shutdown command"); cmd.opcode = METADATA_SYNC_SHUTDOWN; metadata_enq_cmd(&metasync_worker, &cmd); /* wait for metadata thread to shut down */ - netdata_log_info("METADATA: Waiting for shutdown ACK"); - completion_wait_for(&metasync_worker.init_complete); - completion_destroy(&metasync_worker.init_complete); - netdata_log_info("METADATA: Shutdown complete"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for shutdown ACK"); + completion_wait_for(&metasync_worker.start_stop_complete); + completion_destroy(&metasync_worker.start_stop_complete); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Shutdown complete"); } void metadata_sync_shutdown_prepare(void) @@ -1804,20 +1805,20 @@ void metadata_sync_shutdown_prepare(void) completion_init(compl); __atomic_store_n(&wc->scan_complete, compl, __ATOMIC_RELAXED); - netdata_log_info("METADATA: Sending a scan host command"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Sending a scan host command"); uint32_t max_wait_iterations = 2000; while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_PROCESSING)) && max_wait_iterations--) { if (max_wait_iterations == 1999) - netdata_log_info("METADATA: Current worker is running; waiting to finish"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Current worker is running; waiting to finish"); sleep_usec(1000); } cmd.opcode = METADATA_SCAN_HOSTS; metadata_enq_cmd(&metasync_worker, &cmd); - netdata_log_info("METADATA: Waiting for host scan completion"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Waiting for host scan completion"); completion_wait_for(wc->scan_complete); - netdata_log_info("METADATA: Host scan complete; can continue with shutdown"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "METADATA: Host scan complete; can continue with shutdown"); } // ------------------------------------------------------------- @@ -1828,15 +1829,14 @@ void metadata_sync_init(void) struct metadata_wc *wc = &metasync_worker; memset(wc, 0, sizeof(*wc)); - metadata_init_cmd_queue(wc); - completion_init(&wc->init_complete); + completion_init(&wc->start_stop_complete); fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc)); - completion_wait_for(&wc->init_complete); - completion_destroy(&wc->init_complete); + completion_wait_for(&wc->start_stop_complete); + completion_destroy(&wc->start_stop_complete); - netdata_log_info("SQLite metadata sync initialization complete"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "SQLite metadata sync initialization complete"); } @@ -1887,9 +1887,7 @@ void metaqueue_host_update_info(RRDHOST *host) void metaqueue_ml_load_models(RRDDIM *rd) { - if (unlikely(!metasync_worker.loop)) - return; - queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL); + rrddim_flag_set(rd, RRDDIM_FLAG_ML_MODEL_LOAD); } void metadata_queue_load_host_context(RRDHOST *host) @@ -1897,8 +1895,22 @@ void metadata_queue_load_host_context(RRDHOST *host) if (unlikely(!metasync_worker.loop)) return; queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command to load host contexts"); } +void metadata_delete_host_chart_labels(char *machine_guid) +{ + if (unlikely(!metasync_worker.loop)) { + freez(machine_guid); + return; + } + + // Node machine guid is already strdup-ed + queue_metadata_cmd(METADATA_DELETE_HOST_CHART_LABELS, machine_guid, NULL); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command delete chart labels for host %s", machine_guid); +} + + // // unitests // @@ -1946,7 +1958,7 @@ static void *metadata_unittest_threads(void) tu.join = 0; for (int i = 0; i < threads_to_create; i++) { char buf[100 + 1]; - snprintf(buf, 100, "META[%d]", i); + snprintf(buf, sizeof(buf) - 1, "META[%d]", i); netdata_thread_create( &threads[i], buf, |