diff options
Diffstat (limited to 'database/sqlite/sqlite_metadata.c')
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 790 |
1 files changed, 567 insertions, 223 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 697772bf5..143783163 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -11,52 +11,68 @@ #define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;" #define STORE_HOST_LABEL \ - "INSERT OR REPLACE INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES " + "INSERT INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES " #define STORE_CHART_LABEL \ - "INSERT OR REPLACE INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES " + "INSERT INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES " #define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())" #define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;" -#define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \ - "(host_id, hostname, registry_hostname, update_every, os, timezone," \ - "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \ - "entries, health_enabled) " \ - "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \ - "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \ - "@entries, @health_enabled);" - -#define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \ - "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \ - "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);" - -#define SQL_STORE_DIMENSION "INSERT OR REPLACE INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \ - "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options);" +#define SQL_STORE_HOST_INFO \ + "INSERT OR REPLACE INTO host (host_id, hostname, registry_hostname, update_every, os, timezone, tags, hops, " \ + "memory_mode, abbrev_timezone, utc_offset, program_name, program_version, entries, health_enabled, last_connected) " \ + "VALUES (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, " \ + "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected);" + +#define SQL_STORE_CHART \ + "INSERT INTO chart (chart_id, host_id, type, id, name, family, context, title, unit, plugin, module, priority, " \ + "update_every, chart_type, memory_mode, history_entries) " \ + "values (@chart_id, @host_id, @type, @id, @name, @family, @context, @title, @unit, @plugin, @module, @priority, " \ + "@update_every, @chart_type, @memory_mode, @history_entries) " \ + "ON CONFLICT(chart_id) DO UPDATE SET type=excluded.type, id=excluded.id, name=excluded.name, " \ + "family=excluded.family, context=excluded.context, title=excluded.title, unit=excluded.unit, " \ + "plugin=excluded.plugin, module=excluded.module, priority=excluded.priority, update_every=excluded.update_every, " \ + "chart_type=excluded.chart_type, memory_mode = excluded.memory_mode, history_entries = excluded.history_entries" + +#define SQL_STORE_DIMENSION \ + "INSERT INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \ + "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options) " \ + "ON CONFLICT(dim_id) DO UPDATE SET id=excluded.id, name=excluded.name, multiplier=excluded.multiplier, " \ + "divisor=excluded.divisor, algorithm=excluded.algorithm, options=excluded.options" #define SELECT_DIMENSION_LIST "SELECT dim_id, rowid FROM dimension WHERE rowid > @row_id" +#define SELECT_CHART_LIST "SELECT chart_id, rowid FROM chart WHERE rowid > @row_id" +#define SELECT_CHART_LABEL_LIST "SELECT chart_id, rowid FROM chart_label WHERE rowid > @row_id" -#define SQL_STORE_HOST_SYSTEM_INFO_VALUES "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES " \ - "(@uuid, @name, @value, unixepoch())" +#define SQL_STORE_HOST_SYSTEM_INFO_VALUES \ + "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES " \ + "(@uuid, @name, @value, UNIXEPOCH())" #define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \ "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);" #define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;" #define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);" -#define METADATA_CMD_Q_MAX_SIZE (1024) // Max queue size; callers will block until there is room +#define METADATA_CMD_Q_MAX_SIZE (2048) // Max queue size; callers will block until there is room #define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds -#define METADATA_MAINTENANCE_RETRY (60) // Retry run if already running or last run did actual work -#define METADATA_MAINTENANCE_INTERVAL (3600) // Repeat maintenance after latest successful +#define METADATA_MAINTENANCE_REPEAT (60) // Repeat if last run for dimensions, charts, labels needs more work +#define METADATA_HEALTH_LOG_INTERVAL (3600) // Repeat maintenance for health +#define METADATA_DIM_CHECK_INTERVAL (3600) // Repeat maintenance for dimensions +#define METADATA_CHART_CHECK_INTERVAL (3600) // Repeat maintenance for charts +#define METADATA_LABEL_CHECK_INTERVAL (3600) // Repeat maintenance for labels +#define METADATA_RUNTIME_THRESHOLD (5) // Run time threshold for cleanup task #define METADATA_HOST_CHECK_FIRST_CHECK (5) // First check for pending metadata #define METADATA_HOST_CHECK_INTERVAL (30) // Repeat check for pending metadata #define METADATA_HOST_CHECK_IMMEDIATE (5) // Repeat immediate run because we have more metadata to write - #define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying) #define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop +#define DATABASE_FREE_PAGES_THRESHOLD_PC (5) // Percentage of free pages to trigger vacuum +#define DATABASE_FREE_PAGES_VACUUM_PC (10) // Percentage of free pages to vacuum + enum metadata_opcode { METADATA_DATABASE_NOOP = 0, METADATA_DATABASE_TIMER, @@ -79,35 +95,31 @@ struct metadata_cmd { enum metadata_opcode opcode; struct completion *completion; const void *param[MAX_PARAM_LIST]; + struct metadata_cmd *prev, *next; }; struct metadata_database_cmdqueue { - unsigned head, tail; - struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE]; + struct metadata_cmd *cmd_base; }; typedef enum { - METADATA_FLAG_CLEANUP = (1 << 0), // Cleanup is running - METADATA_FLAG_SCANNING_HOSTS = (1 << 1), // Scanning of hosts in worker thread - METADATA_FLAG_SHUTDOWN = (1 << 2), // Shutting down + METADATA_FLAG_PROCESSING = (1 << 0), // store or cleanup + METADATA_FLAG_SHUTDOWN = (1 << 1), // Shutting down + METADATA_FLAG_ML_LOADING = (1 << 2), // ML model load in progress } METADATA_FLAG; -#define METADATA_WORKER_BUSY (METADATA_FLAG_CLEANUP | METADATA_FLAG_SCANNING_HOSTS) - struct metadata_wc { uv_thread_t thread; uv_loop_t *loop; uv_async_t async; uv_timer_t timer_req; - time_t check_metadata_after; - time_t check_hosts_after; + time_t metadata_check_after; volatile unsigned queue_size; METADATA_FLAG flags; - uint64_t row_id; struct completion init_complete; + struct completion *scan_complete; /* FIFO command queue */ uv_mutex_t cmd_mutex; - uv_cond_t cmd_cond; struct metadata_database_cmdqueue cmd_queue; }; @@ -140,7 +152,7 @@ static int host_label_store_to_sql_callback(const char *name, const char *value, buffer_sprintf(lb->sql, STORE_HOST_LABEL); else buffer_strcat(lb->sql, ", "); - buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int)ls & ~(RRDLABEL_FLAG_INTERNAL), name, value); + buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int) (ls & ~(RRDLABEL_FLAG_INTERNAL)), name, value); lb->count++; return 1; } @@ -151,7 +163,7 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value buffer_sprintf(lb->sql, STORE_CHART_LABEL); else buffer_strcat(lb->sql, ", "); - buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, ls, name, value); + buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int) (ls & ~(RRDLABEL_FLAG_INTERNAL)), name, value); lb->count++; return 1; } @@ -177,7 +189,7 @@ static void clean_old_chart_labels(RRDSET *st) static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter) { size_t old_version = st->rrdlabels_last_saved_version; - size_t new_version = dictionary_version(st->rrdlabels); + size_t new_version = rrdlabels_version(st->rrdlabels); if (new_version == old_version) return 0; @@ -185,6 +197,7 @@ static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t struct query_build tmp = {.sql = work_buffer, .count = 0}; uuid_unparse_lower(st->chart_uuid, tmp.uuid_str); rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp); + buffer_strcat(work_buffer, " ON CONFLICT (chart_id, label_key) DO UPDATE SET source_type = excluded.source_type, label_value=excluded.label_value, date_created=UNIXEPOCH()"); int rc = db_execute(db_meta, buffer_tostring(work_buffer)); if (likely(!rc)) { st->rrdlabels_last_saved_version = new_version; @@ -252,7 +265,7 @@ failed: return rc != SQLITE_DONE; } -static void delete_dimension_uuid(uuid_t *dimension_uuid) +static void delete_dimension_uuid(uuid_t *dimension_uuid, sqlite3_stmt **action_res __maybe_unused, bool flag __maybe_unused) { static __thread sqlite3_stmt *res = NULL; int rc; @@ -265,7 +278,7 @@ static void delete_dimension_uuid(uuid_t *dimension_uuid) } } - rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto skip_execution; @@ -286,13 +299,6 @@ static int store_host_metadata(RRDHOST *host) static __thread sqlite3_stmt *res = NULL; int rc, param = 0; - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - if (unlikely((!res))) { rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res); if (unlikely(rc != SQLITE_OK)) { @@ -361,6 +367,10 @@ static int store_host_metadata(RRDHOST *host) if (unlikely(rc != SQLITE_OK)) goto bind_fail; + rc = sqlite3_bind_int64(res, ++param, (sqlite3_int64) host->last_connected); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + int store_rc = sqlite3_step_monitored(res); if (unlikely(store_rc != SQLITE_DONE)) error_report("Failed to store host %s, rc = %d", rrdhost_hostname(host), rc); @@ -474,13 +484,6 @@ static int store_chart_metadata(RRDSET *st) static __thread sqlite3_stmt *res = NULL; int rc, param = 0, store_rc = 0; - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - if (unlikely(!res)) { rc = prepare_statement(db_meta, SQL_STORE_CHART, &res); if (unlikely(rc != SQLITE_OK)) { @@ -583,13 +586,6 @@ static int store_dimension_metadata(RRDDIM *rd) static __thread sqlite3_stmt *res = NULL; int rc, param = 0; - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - if (unlikely(!res)) { rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res); if (unlikely(rc != SQLITE_OK)) { @@ -650,7 +646,7 @@ bind_fail: return 1; } -static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused) +static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused, sqlite3_stmt **res __maybe_unused, bool flag __maybe_unused) { #ifdef ENABLE_DBENGINE if(dbengine_enabled) { @@ -675,8 +671,173 @@ static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused) #endif } +int get_pragma_value(sqlite3 *database, const char *sql) +{ + sqlite3_stmt *res = NULL; + int rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) + return -1; + + int result = -1; + rc = sqlite3_step_monitored(res); + if (likely(rc == SQLITE_ROW)) + result = sqlite3_column_int(res, 0); + + rc = sqlite3_finalize(res); + (void) rc; + + return result; +} + + +int get_free_page_count(sqlite3 *database) +{ + return get_pragma_value(database, "PRAGMA freelist_count"); +} + +int get_database_page_count(sqlite3 *database) +{ + return get_pragma_value(database, "PRAGMA page_count"); +} + +static bool run_cleanup_loop( + sqlite3_stmt *res, + struct metadata_wc *wc, + bool (*check_cb)(uuid_t *, sqlite3_stmt **, bool), + void (*action_cb)(uuid_t *, sqlite3_stmt **, bool), + uint32_t *total_checked, + uint32_t *total_deleted, + uint64_t *row_id, + sqlite3_stmt **check_stmt, + sqlite3_stmt **action_stmt, + bool check_flag, + bool action_flag) +{ + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + return true; + + int rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) *row_id); + if (unlikely(rc != SQLITE_OK)) + return true; + + time_t start_running = now_monotonic_sec(); + bool time_expired = false; + while (!time_expired && sqlite3_step_monitored(res) == SQLITE_ROW && + (*total_deleted < MAX_METADATA_CLEANUP && *total_checked < MAX_METADATA_CLEANUP)) { + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + break; + + *row_id = sqlite3_column_int64(res, 1); + rc = check_cb((uuid_t *)sqlite3_column_blob(res, 0), check_stmt, check_flag); + + if (rc == true) { + action_cb((uuid_t *)sqlite3_column_blob(res, 0), action_stmt, action_flag); + (*total_deleted)++; + } + + (*total_checked)++; + time_expired = ((now_monotonic_sec() - start_running) > METADATA_RUNTIME_THRESHOLD); + } + return time_expired || (*total_checked == MAX_METADATA_CLEANUP) || (*total_deleted == MAX_METADATA_CLEANUP); +} + + +#define SQL_CHECK_CHART_EXISTENCE_IN_DIMENSION "SELECT count(1) FROM dimension WHERE chart_id = @chart_id" +#define SQL_CHECK_CHART_EXISTENCE_IN_CHART "SELECT count(1) FROM chart WHERE chart_id = @chart_id" + +static bool chart_can_be_deleted(uuid_t *chart_uuid, sqlite3_stmt **check_res, bool check_in_dimension) +{ + int rc, result = 1; + sqlite3_stmt *res = check_res ? *check_res : NULL; + + if (!res) { + if (check_in_dimension) + rc = sqlite3_prepare_v2(db_meta, SQL_CHECK_CHART_EXISTENCE_IN_DIMENSION, -1, &res, 0); + else + rc = sqlite3_prepare_v2(db_meta, SQL_CHECK_CHART_EXISTENCE_IN_CHART, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to check for chart existence, rc = %d", rc); + return 0; + } + if (check_res) + *check_res = res; + } + + rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind chart uuid parameter, rc = %d", rc); + goto skip; + } + + rc = sqlite3_step_monitored(res); + if (likely(rc == SQLITE_ROW)) + result = sqlite3_column_int(res, 0); + +skip: + if (check_res) + rc = sqlite3_reset(res); + else + rc = sqlite3_finalize(res); + + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to %s statement that checks chart uuid existence rc = %d", check_res ? "reset" : "finalize", rc); + return result == 0; +} + +#define SQL_DELETE_CHART_BY_UUID "DELETE FROM chart WHERE chart_id = @chart_id" +#define SQL_DELETE_CHART_LABEL_BY_UUID "DELETE FROM chart_label WHERE chart_id = @chart_id" + +static void delete_chart_uuid(uuid_t *chart_uuid, sqlite3_stmt **action_res, bool label_only) +{ + int rc; + sqlite3_stmt *res = action_res ? *action_res : NULL; + + if (!res) { + if (label_only) + rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_LABEL_BY_UUID, -1, &res, 0); + else + rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_BY_UUID, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to check for chart existence, rc = %d", rc); + return; + } + if (action_res) + *action_res = res; + } + + rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind chart uuid parameter, rc = %d", rc); + goto skip; + } + + rc = sqlite3_step_monitored(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to delete a chart uuid from the %s table, rc = %d", label_only ? "labels" : "chart", rc); + +skip: + if (action_res) + rc = sqlite3_reset(res); + else + rc = sqlite3_finalize(res); + + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to %s statement that deletes a chart uuid rc = %d", action_res ? "reset" : "finalize", rc); +} + static void check_dimension_metadata(struct metadata_wc *wc) { + static time_t next_execution_t = 0; + static uint64_t last_row_id = 0; + + time_t now = now_realtime_sec(); + + if (!next_execution_t) + next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK; + + if (next_execution_t && next_execution_t > now) + return; + int rc; sqlite3_stmt *res = NULL; @@ -686,54 +847,212 @@ static void check_dimension_metadata(struct metadata_wc *wc) return; } - rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) wc->row_id); + uint32_t total_checked = 0; + uint32_t total_deleted = 0; + + internal_error(true, "METADATA: Checking dimensions starting after row %"PRIu64, last_row_id); + + bool more_to_do = run_cleanup_loop( + res, + wc, + dimension_can_be_deleted, + delete_dimension_uuid, + &total_checked, + &total_deleted, + &last_row_id, + NULL, + NULL, + false, + false); + + now = now_realtime_sec(); + if (more_to_do) + next_execution_t = now + METADATA_MAINTENANCE_REPEAT; + else { + last_row_id = 0; + next_execution_t = now + METADATA_DIM_CHECK_INTERVAL; + } + + netdata_log_info( + "METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds", + total_checked, + total_deleted, + last_row_id ? "resume" : "restart", + (long long)(next_execution_t - now)); + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize the prepared statement to check dimensions"); +} + +static void check_chart_metadata(struct metadata_wc *wc) +{ + static time_t next_execution_t = 0; + static uint64_t last_row_id = 0; + + time_t now = now_realtime_sec(); + + if (!next_execution_t) + next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK; + + if (next_execution_t && next_execution_t > now) + return; + + sqlite3_stmt *res = NULL; + + int rc = sqlite3_prepare_v2(db_meta, SELECT_CHART_LIST, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to row parameter"); - goto skip_run; + error_report("Failed to prepare statement to fetch charts"); + return; } uint32_t total_checked = 0; - uint32_t total_deleted= 0; - uint64_t last_row_id = wc->row_id; + uint32_t total_deleted = 0; + + internal_error(true, "METADATA: Checking charts starting after row %"PRIu64, last_row_id); + + sqlite3_stmt *check_res = NULL; + sqlite3_stmt *action_res = NULL; + bool more_to_do = run_cleanup_loop( + res, + wc, + chart_can_be_deleted, + delete_chart_uuid, + &total_checked, + &total_deleted, + &last_row_id, + &check_res, + &action_res, + true, + false); + + if (check_res) + sqlite3_finalize(check_res); + + if (action_res) + sqlite3_finalize(action_res); + + now = now_realtime_sec(); + if (more_to_do) + next_execution_t = now + METADATA_MAINTENANCE_REPEAT; + else { + last_row_id = 0; + next_execution_t = now + METADATA_CHART_CHECK_INTERVAL; + } - netdata_log_info("METADATA: Checking dimensions starting after row %"PRIu64, wc->row_id); + netdata_log_info( + "METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds", + total_checked, + total_deleted, + last_row_id ? "resume" : "restart", + (long long)(next_execution_t - now)); - while (sqlite3_step_monitored(res) == SQLITE_ROW && total_deleted < MAX_METADATA_CLEANUP) { - if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) - break; + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize the prepared statement when reading charts"); +} + +static void check_label_metadata(struct metadata_wc *wc) +{ + static time_t next_execution_t = 0; + static uint64_t last_row_id = 0; - last_row_id = sqlite3_column_int64(res, 1); - rc = dimension_can_be_deleted((uuid_t *)sqlite3_column_blob(res, 0)); - if (rc == true) { - delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0)); - total_deleted++; - } - total_checked++; - } - wc->row_id = last_row_id; time_t now = now_realtime_sec(); - if (total_deleted > 0) { - wc->check_metadata_after = now + METADATA_MAINTENANCE_RETRY; - } else - wc->row_id = 0; - netdata_log_info("METADATA: Checked %u, deleted %u -- will resume after row %"PRIu64" in %lld seconds", total_checked, total_deleted, wc->row_id, - (long long)(wc->check_metadata_after - now)); - -skip_run: + + if (!next_execution_t) + next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK; + + if (next_execution_t && next_execution_t > now) + return; + + int rc; + sqlite3_stmt *res = NULL; + + rc = sqlite3_prepare_v2(db_meta, SELECT_CHART_LABEL_LIST, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to fetch charts"); + return; + } + + uint32_t total_checked = 0; + uint32_t total_deleted = 0; + + internal_error(true,"METADATA: Checking charts labels starting after row %"PRIu64, last_row_id); + + sqlite3_stmt *check_res = NULL; + sqlite3_stmt *action_res = NULL; + + bool more_to_do = run_cleanup_loop( + res, + wc, + chart_can_be_deleted, + delete_chart_uuid, + &total_checked, + &total_deleted, + &last_row_id, + &check_res, + &action_res, + false, + true); + + if (check_res) + sqlite3_finalize(check_res); + + if (action_res) + sqlite3_finalize(action_res); + + now = now_realtime_sec(); + if (more_to_do) + next_execution_t = now + METADATA_MAINTENANCE_REPEAT; + else { + last_row_id = 0; + next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL; + } + + netdata_log_info( + "METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds", + total_checked, + total_deleted, + last_row_id ? "resume" : "restart", + (long long)(next_execution_t - now)); + rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when reading dimensions"); + error_report("Failed to finalize the prepared statement when checking charts"); } -static void cleanup_health_log(void) + +static void cleanup_health_log(struct metadata_wc *wc) { + static time_t next_execution_t = 0; + + time_t now = now_realtime_sec(); + + if (!next_execution_t) + next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK; + + if (next_execution_t && next_execution_t > now) + return; + + next_execution_t = now + METADATA_HEALTH_LOG_INTERVAL; + RRDHOST *host; - dfe_start_reentrant(rrdhost_root_index, host) { + + bool is_claimed = claimed(); + dfe_start_reentrant(rrdhost_root_index, host){ if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) continue; - sql_health_alarm_log_cleanup(host); + sql_health_alarm_log_cleanup(host, is_claimed); + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + break; } dfe_done(host); + + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + return; + + (void) db_execute(db_meta,"DELETE FROM health_log WHERE host_id NOT IN (SELECT host_id FROM host)"); + (void) db_execute(db_meta,"DELETE FROM health_log_detail WHERE health_log_id NOT IN (SELECT health_log_id FROM health_log)"); } // @@ -742,103 +1061,57 @@ static void cleanup_health_log(void) static void metadata_init_cmd_queue(struct metadata_wc *wc) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - wc->queue_size = 0; - fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); + wc->cmd_queue.cmd_base = NULL; fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); } -int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd) +static void metadata_free_cmd_queue(struct metadata_wc *wc) { - unsigned queue_size; - - /* wait for free space in queue */ uv_mutex_lock(&wc->cmd_mutex); - - if (cmd->opcode == METADATA_SYNC_SHUTDOWN) { - metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN); - uv_mutex_unlock(&wc->cmd_mutex); - return 0; - } - - if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE || - metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { - uv_mutex_unlock(&wc->cmd_mutex); - return 1; + while(wc->cmd_queue.cmd_base) { + struct metadata_cmd *t = wc->cmd_queue.cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + freez(t); } - - fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; uv_mutex_unlock(&wc->cmd_mutex); - return 0; } static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) { - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { - uv_mutex_unlock(&wc->cmd_mutex); - (void) uv_async_send(&wc->async); - return; - } - if (cmd->opcode == METADATA_SYNC_SHUTDOWN) { metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN); - uv_mutex_unlock(&wc->cmd_mutex); - (void) uv_async_send(&wc->async); - return; + goto wakeup_event_loop; } - while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) { - if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { - uv_mutex_unlock(&wc->cmd_mutex); - return; - } - uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); - } - fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + goto wakeup_event_loop; + + struct metadata_cmd *t = mallocz(sizeof(*t)); + *t = *cmd; + t->prev = t->next = NULL; + + uv_mutex_lock(&wc->cmd_mutex); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); uv_mutex_unlock(&wc->cmd_mutex); - /* wake up event loop */ +wakeup_event_loop: (void) uv_async_send(&wc->async); } static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) { struct metadata_cmd ret; - unsigned queue_size; uv_mutex_lock(&wc->cmd_mutex); - queue_size = wc->queue_size; - if (queue_size == 0) { - memset(&ret, 0, sizeof(ret)); + if(wc->cmd_queue.cmd_base) { + struct metadata_cmd *t = wc->cmd_queue.cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + ret = *t; + freez(t); + } + else { ret.opcode = METADATA_DATABASE_NOOP; ret.completion = NULL; - } else { - /* dequeue command */ - ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; - - if (queue_size == 1) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - } else { - wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.head + 1 : 0; - } - wc->queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&wc->cmd_cond); } uv_mutex_unlock(&wc->cmd_mutex); @@ -865,43 +1138,62 @@ static void timer_cb(uv_timer_t* handle) time_t now = now_realtime_sec(); - if (wc->check_metadata_after && wc->check_metadata_after < now) { - cmd.opcode = METADATA_MAINTENANCE; - if (!metadata_enq_cmd_noblock(wc, &cmd)) - wc->check_metadata_after = now + METADATA_MAINTENANCE_INTERVAL; - } - - if (wc->check_hosts_after && wc->check_hosts_after < now) { + if (wc->metadata_check_after && wc->metadata_check_after < now) { cmd.opcode = METADATA_SCAN_HOSTS; - if (!metadata_enq_cmd_noblock(wc, &cmd)) - wc->check_hosts_after = now + METADATA_HOST_CHECK_INTERVAL; + metadata_enq_cmd(wc, &cmd); } } -static void after_metadata_cleanup(uv_work_t *req, int status) +void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int vacuum_pc) { - UNUSED(status); + int free_pages = get_free_page_count(database); + int total_pages = get_database_page_count(database); + + if (!threshold) + threshold = DATABASE_FREE_PAGES_THRESHOLD_PC; + + if (!vacuum_pc) + vacuum_pc = DATABASE_FREE_PAGES_VACUUM_PC; + + if (free_pages > (total_pages * threshold / 100)) { - struct metadata_wc *wc = req->data; - metadata_flag_clear(wc, METADATA_FLAG_CLEANUP); + int do_free_pages = (int) (free_pages * vacuum_pc / 100); + netdata_log_info("%s: Freeing %d database pages", db_alias, do_free_pages); + + char sql[128]; + snprintfz(sql, 127, "PRAGMA incremental_vacuum(%d)", do_free_pages); + (void) db_execute(database, sql); + } } -static void start_metadata_cleanup(uv_work_t *req) +void run_metadata_cleanup(struct metadata_wc *wc) { - register_libuv_worker_jobs(); + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + return; - worker_is_busy(UV_EVENT_METADATA_CLEANUP); - struct metadata_wc *wc = req->data; check_dimension_metadata(wc); - cleanup_health_log(); + check_chart_metadata(wc); + check_label_metadata(wc); + cleanup_health_log(wc); + + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + return; + + vacuum_database(db_meta, "METADATA", DATABASE_FREE_PAGES_THRESHOLD_PC, DATABASE_FREE_PAGES_VACUUM_PC); + (void) sqlite3_wal_checkpoint(db_meta, NULL); - worker_is_idle(); } +struct ml_model_payload { + uv_work_t request; + struct metadata_wc *wc; + Pvoid_t JudyL; + size_t count; +}; + struct scan_metadata_payload { uv_work_t request; struct metadata_wc *wc; - struct completion *completion; BUFFER *work_buffer; uint32_t max_count; }; @@ -1027,10 +1319,10 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) struct scan_metadata_payload *data = req->data; struct metadata_wc *wc = data->wc; - metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); + metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); internal_error(true, "METADATA: scanning hosts complete"); - if (unlikely(data->completion)) { - completion_mark_complete(data->completion); + if (unlikely(wc->scan_complete)) { + completion_mark_complete(wc->scan_complete); internal_error(true, "METADATA: Sending completion done"); } freez(data); @@ -1044,7 +1336,7 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans uint32_t scan_count = 1; if (use_transaction) - (void)db_execute(db_meta, "BEGIN TRANSACTION;"); + (void)db_execute(db_meta, "BEGIN TRANSACTION"); rrdset_foreach_reentrant(st, host) { if (scan_count == max_count) { @@ -1093,7 +1385,7 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans rrdset_foreach_done(st); if (use_transaction) - (void)db_execute(db_meta, "COMMIT TRANSACTION;"); + (void)db_execute(db_meta, "COMMIT TRANSACTION"); return more_to_do; } @@ -1160,6 +1452,7 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) struct query_build tmp = {.sql = work_buffer, .count = 0}; uuid_unparse_lower(host->host_uuid, tmp.uuid_str); rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp); + buffer_strcat(work_buffer, " ON CONFLICT (host_id, label_key) DO UPDATE SET source_type = excluded.source_type, label_value=excluded.label_value, date_created=UNIXEPOCH()"); rc = db_execute(db_meta, buffer_tostring(work_buffer)); if (unlikely(rc)) { @@ -1215,12 +1508,50 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) (double)(all_ended_ut - all_started_ut) / USEC_PER_MS); if (unlikely(run_again)) - wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE; - else - wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL; + wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE; + else { + wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL; + run_metadata_cleanup(wc); + } + worker_is_idle(); +} + +// Callback after scan of hosts is done +static void after_start_ml_model_load(uv_work_t *req, int status __maybe_unused) +{ + struct ml_model_payload *ml_data = req->data; + struct metadata_wc *wc = ml_data->wc; + metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING); + JudyLFreeArray(&ml_data->JudyL, PJE0); + freez(ml_data); +} + +static void start_ml_model_load(uv_work_t *req __maybe_unused) +{ + register_libuv_worker_jobs(); + + struct ml_model_payload *ml_data = req->data; + + worker_is_busy(UV_EVENT_METADATA_ML_LOAD); + + Pvoid_t *PValue; + Word_t Index = 0; + bool first = true; + RRDDIM *rd; + RRDDIM_ACQUIRED *rda; + internal_error(true, "Batch ML load loader, %zu items", ml_data->count); + while((PValue = JudyLFirstThenNext(ml_data->JudyL, &Index, &first))) { + UNUSED(PValue); + rda = (RRDDIM_ACQUIRED *) Index; + rd = rrddim_acquired_to_rrddim(rda); + ml_dimension_load_models(rd); + rrddim_acquired_release(rda); + } worker_is_idle(); } + + static void metadata_event_loop(void *arg) { worker_register("METASYNC"); @@ -1237,10 +1568,8 @@ static void metadata_event_loop(void *arg) unsigned cmd_batch_size; struct metadata_wc *wc = arg; enum metadata_opcode opcode; - uv_work_t metadata_cleanup_worker; uv_thread_set_name_np(wc->thread, "METASYNC"); -// service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); loop = wc->loop = mallocz(sizeof(uv_loop_t)); ret = uv_loop_init(loop); if (ret) { @@ -1268,19 +1597,17 @@ static void metadata_event_loop(void *arg) struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - metadata_flag_clear(wc, METADATA_FLAG_CLEANUP); - metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); + metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); - wc->check_metadata_after = now_realtime_sec() + METADATA_MAINTENANCE_FIRST_CHECK; - wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK; + wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK; int shutdown = 0; - wc->row_id = 0; completion_mark_complete(&wc->init_complete); BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); struct scan_metadata_payload *data; - while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) { + struct ml_model_payload *ml_data = NULL; + while (shutdown == 0 || (wc->flags & METADATA_FLAG_PROCESSING)) { uuid_t *uuid; RRDHOST *host = NULL; @@ -1306,6 +1633,24 @@ static void metadata_event_loop(void *arg) if (likely(opcode != METADATA_DATABASE_NOOP)) worker_is_busy(opcode); + // Have pending ML models to load? + if (opcode != METADATA_ML_LOAD_MODELS && ml_data && ml_data->count) { + static usec_t ml_submit_last = 0; + usec_t now = now_monotonic_usec(); + if (!ml_submit_last) + ml_submit_last = now; + + if (!metadata_flag_check(wc, METADATA_FLAG_ML_LOADING) && (now - ml_submit_last > 150 * USEC_PER_MS)) { + metadata_flag_set(wc, METADATA_FLAG_ML_LOADING); + if (unlikely(uv_queue_work(loop, &ml_data->request, start_ml_model_load, after_start_ml_model_load))) + metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING); + else { + ml_submit_last = now; + ml_data = NULL; + } + } + } + switch (opcode) { case METADATA_DATABASE_NOOP: case METADATA_DATABASE_TIMER: @@ -1313,13 +1658,22 @@ static void metadata_event_loop(void *arg) case METADATA_ML_LOAD_MODELS: { RRDDIM *rd = (RRDDIM *) cmd.param[0]; - ml_dimension_load_models(rd); + RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(rd->rrdset, rrddim_id(rd)); + if (likely(rda)) { + if (!ml_data) { + ml_data = callocz(1,sizeof(*ml_data)); + ml_data->request.data = ml_data; + ml_data->wc = wc; + } + JudyLIns(&ml_data->JudyL, (Word_t)rda, PJE0); + ml_data->count++; + } break; } case METADATA_DEL_DIMENSION: uuid = (uuid_t *) cmd.param[0]; - if (likely(dimension_can_be_deleted(uuid))) - delete_dimension_uuid(uuid); + if (likely(dimension_can_be_deleted(uuid, NULL, false))) + delete_dimension_uuid(uuid, NULL, false); freez(uuid); break; case METADATA_STORE_CLAIM_ID: @@ -1332,7 +1686,7 @@ static void metadata_event_loop(void *arg) store_host_and_system_info(host, NULL); break; case METADATA_SCAN_HOSTS: - if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS))) + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_PROCESSING))) break; if (unittest_running) @@ -1341,7 +1695,6 @@ static void metadata_event_loop(void *arg) data = mallocz(sizeof(*data)); data->request.data = data; data->wc = wc; - data->completion = cmd.completion; // Completion by the worker data->work_buffer = work_buffer; if (unlikely(cmd.completion)) { @@ -1351,15 +1704,15 @@ static void metadata_event_loop(void *arg) else data->max_count = 5000; - metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS); + metadata_flag_set(wc, METADATA_FLAG_PROCESSING); if (unlikely( uv_queue_work(loop,&data->request, start_metadata_hosts, after_metadata_hosts))) { // Failed to launch worker -- let the event loop handle completion - cmd.completion = data->completion; + cmd.completion = wc->scan_complete; freez(data); - metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); + metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); } break; case METADATA_LOAD_HOST_CONTEXT:; @@ -1375,17 +1728,6 @@ static void metadata_event_loop(void *arg) freez(data); } break; - case METADATA_MAINTENANCE: - if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP))) - break; - - metadata_cleanup_worker.data = wc; - metadata_flag_set(wc, METADATA_FLAG_CLEANUP); - if (unlikely( - uv_queue_work(loop, &metadata_cleanup_worker, start_metadata_cleanup, after_metadata_cleanup))) { - metadata_flag_clear(wc, METADATA_FLAG_CLEANUP); - } - break; case METADATA_UNITTEST:; struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0]; sleep_usec(1000); // processing takes 1ms @@ -1404,7 +1746,6 @@ static void metadata_event_loop(void *arg) uv_close((uv_handle_t *)&wc->timer_req, NULL); uv_close((uv_handle_t *)&wc->async, NULL); - uv_cond_destroy(&wc->cmd_cond); int rc; do { rc = uv_loop_close(loop); @@ -1416,6 +1757,9 @@ static void metadata_event_loop(void *arg) netdata_log_info("METADATA: Shutting down event loop"); completion_mark_complete(&wc->init_complete); + completion_destroy(wc->scan_complete); + freez(wc->scan_complete); + metadata_free_cmd_queue(wc); return; error_after_timer_init: @@ -1454,24 +1798,25 @@ void metadata_sync_shutdown_prepare(void) struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); - struct completion compl; - completion_init(&compl); + struct metadata_wc *wc = &metasync_worker; + + struct completion *compl = mallocz(sizeof(*compl)); + completion_init(compl); + __atomic_store_n(&wc->scan_complete, compl, __ATOMIC_RELAXED); netdata_log_info("METADATA: Sending a scan host command"); uint32_t max_wait_iterations = 2000; - while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_SCANNING_HOSTS)) && max_wait_iterations--) { + while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_PROCESSING)) && max_wait_iterations--) { if (max_wait_iterations == 1999) netdata_log_info("METADATA: Current worker is running; waiting to finish"); sleep_usec(1000); } cmd.opcode = METADATA_SCAN_HOSTS; - cmd.completion = &compl; metadata_enq_cmd(&metasync_worker, &cmd); netdata_log_info("METADATA: Waiting for host scan completion"); - completion_wait_for(&compl); - completion_destroy(&compl); + completion_wait_for(wc->scan_complete); netdata_log_info("METADATA: Host scan complete; can continue with shutdown"); } @@ -1631,7 +1976,6 @@ int metadata_unittest(void) // Queue items for a specific period of time metadata_unittest_threads(); - fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size); metadata_sync_shutdown(); return 0; |