summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_metadata.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_metadata.c')
-rw-r--r--database/sqlite/sqlite_metadata.c790
1 files changed, 567 insertions, 223 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 697772bf5..143783163 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -11,52 +11,68 @@
#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
#define STORE_HOST_LABEL \
- "INSERT OR REPLACE INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
+ "INSERT INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
#define STORE_CHART_LABEL \
- "INSERT OR REPLACE INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES "
+ "INSERT INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES "
#define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())"
#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;"
-#define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \
- "(host_id, hostname, registry_hostname, update_every, os, timezone," \
- "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \
- "entries, health_enabled) " \
- "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \
- "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \
- "@entries, @health_enabled);"
-
-#define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \
- "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \
- "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);"
-
-#define SQL_STORE_DIMENSION "INSERT OR REPLACE INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \
- "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options);"
+#define SQL_STORE_HOST_INFO \
+ "INSERT OR REPLACE INTO host (host_id, hostname, registry_hostname, update_every, os, timezone, tags, hops, " \
+ "memory_mode, abbrev_timezone, utc_offset, program_name, program_version, entries, health_enabled, last_connected) " \
+ "VALUES (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, " \
+ "@memory_mode, @abbrev_tz, @utc_offset, @prog_name, @prog_version, @entries, @health_enabled, @last_connected);"
+
+#define SQL_STORE_CHART \
+ "INSERT INTO chart (chart_id, host_id, type, id, name, family, context, title, unit, plugin, module, priority, " \
+ "update_every, chart_type, memory_mode, history_entries) " \
+ "values (@chart_id, @host_id, @type, @id, @name, @family, @context, @title, @unit, @plugin, @module, @priority, " \
+ "@update_every, @chart_type, @memory_mode, @history_entries) " \
+ "ON CONFLICT(chart_id) DO UPDATE SET type=excluded.type, id=excluded.id, name=excluded.name, " \
+ "family=excluded.family, context=excluded.context, title=excluded.title, unit=excluded.unit, " \
+ "plugin=excluded.plugin, module=excluded.module, priority=excluded.priority, update_every=excluded.update_every, " \
+ "chart_type=excluded.chart_type, memory_mode = excluded.memory_mode, history_entries = excluded.history_entries"
+
+#define SQL_STORE_DIMENSION \
+ "INSERT INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \
+ "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options) " \
+ "ON CONFLICT(dim_id) DO UPDATE SET id=excluded.id, name=excluded.name, multiplier=excluded.multiplier, " \
+ "divisor=excluded.divisor, algorithm=excluded.algorithm, options=excluded.options"
#define SELECT_DIMENSION_LIST "SELECT dim_id, rowid FROM dimension WHERE rowid > @row_id"
+#define SELECT_CHART_LIST "SELECT chart_id, rowid FROM chart WHERE rowid > @row_id"
+#define SELECT_CHART_LABEL_LIST "SELECT chart_id, rowid FROM chart_label WHERE rowid > @row_id"
-#define SQL_STORE_HOST_SYSTEM_INFO_VALUES "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES " \
- "(@uuid, @name, @value, unixepoch())"
+#define SQL_STORE_HOST_SYSTEM_INFO_VALUES \
+ "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES " \
+ "(@uuid, @name, @value, UNIXEPOCH())"
#define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \
"UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);"
#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;"
#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"
-#define METADATA_CMD_Q_MAX_SIZE (1024) // Max queue size; callers will block until there is room
+#define METADATA_CMD_Q_MAX_SIZE (2048) // Max queue size; callers will block until there is room
#define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds
-#define METADATA_MAINTENANCE_RETRY (60) // Retry run if already running or last run did actual work
-#define METADATA_MAINTENANCE_INTERVAL (3600) // Repeat maintenance after latest successful
+#define METADATA_MAINTENANCE_REPEAT (60) // Repeat if last run for dimensions, charts, labels needs more work
+#define METADATA_HEALTH_LOG_INTERVAL (3600) // Repeat maintenance for health
+#define METADATA_DIM_CHECK_INTERVAL (3600) // Repeat maintenance for dimensions
+#define METADATA_CHART_CHECK_INTERVAL (3600) // Repeat maintenance for charts
+#define METADATA_LABEL_CHECK_INTERVAL (3600) // Repeat maintenance for labels
+#define METADATA_RUNTIME_THRESHOLD (5) // Run time threshold for cleanup task
#define METADATA_HOST_CHECK_FIRST_CHECK (5) // First check for pending metadata
#define METADATA_HOST_CHECK_INTERVAL (30) // Repeat check for pending metadata
#define METADATA_HOST_CHECK_IMMEDIATE (5) // Repeat immediate run because we have more metadata to write
-
#define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying)
#define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop
+#define DATABASE_FREE_PAGES_THRESHOLD_PC (5) // Percentage of free pages to trigger vacuum
+#define DATABASE_FREE_PAGES_VACUUM_PC (10) // Percentage of free pages to vacuum
+
enum metadata_opcode {
METADATA_DATABASE_NOOP = 0,
METADATA_DATABASE_TIMER,
@@ -79,35 +95,31 @@ struct metadata_cmd {
enum metadata_opcode opcode;
struct completion *completion;
const void *param[MAX_PARAM_LIST];
+ struct metadata_cmd *prev, *next;
};
struct metadata_database_cmdqueue {
- unsigned head, tail;
- struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE];
+ struct metadata_cmd *cmd_base;
};
typedef enum {
- METADATA_FLAG_CLEANUP = (1 << 0), // Cleanup is running
- METADATA_FLAG_SCANNING_HOSTS = (1 << 1), // Scanning of hosts in worker thread
- METADATA_FLAG_SHUTDOWN = (1 << 2), // Shutting down
+ METADATA_FLAG_PROCESSING = (1 << 0), // store or cleanup
+ METADATA_FLAG_SHUTDOWN = (1 << 1), // Shutting down
+ METADATA_FLAG_ML_LOADING = (1 << 2), // ML model load in progress
} METADATA_FLAG;
-#define METADATA_WORKER_BUSY (METADATA_FLAG_CLEANUP | METADATA_FLAG_SCANNING_HOSTS)
-
struct metadata_wc {
uv_thread_t thread;
uv_loop_t *loop;
uv_async_t async;
uv_timer_t timer_req;
- time_t check_metadata_after;
- time_t check_hosts_after;
+ time_t metadata_check_after;
volatile unsigned queue_size;
METADATA_FLAG flags;
- uint64_t row_id;
struct completion init_complete;
+ struct completion *scan_complete;
/* FIFO command queue */
uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
struct metadata_database_cmdqueue cmd_queue;
};
@@ -140,7 +152,7 @@ static int host_label_store_to_sql_callback(const char *name, const char *value,
buffer_sprintf(lb->sql, STORE_HOST_LABEL);
else
buffer_strcat(lb->sql, ", ");
- buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int)ls & ~(RRDLABEL_FLAG_INTERNAL), name, value);
+ buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int) (ls & ~(RRDLABEL_FLAG_INTERNAL)), name, value);
lb->count++;
return 1;
}
@@ -151,7 +163,7 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value
buffer_sprintf(lb->sql, STORE_CHART_LABEL);
else
buffer_strcat(lb->sql, ", ");
- buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, ls, name, value);
+ buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int) (ls & ~(RRDLABEL_FLAG_INTERNAL)), name, value);
lb->count++;
return 1;
}
@@ -177,7 +189,7 @@ static void clean_old_chart_labels(RRDSET *st)
static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter)
{
size_t old_version = st->rrdlabels_last_saved_version;
- size_t new_version = dictionary_version(st->rrdlabels);
+ size_t new_version = rrdlabels_version(st->rrdlabels);
if (new_version == old_version)
return 0;
@@ -185,6 +197,7 @@ static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t
struct query_build tmp = {.sql = work_buffer, .count = 0};
uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
+ buffer_strcat(work_buffer, " ON CONFLICT (chart_id, label_key) DO UPDATE SET source_type = excluded.source_type, label_value=excluded.label_value, date_created=UNIXEPOCH()");
int rc = db_execute(db_meta, buffer_tostring(work_buffer));
if (likely(!rc)) {
st->rrdlabels_last_saved_version = new_version;
@@ -252,7 +265,7 @@ failed:
return rc != SQLITE_DONE;
}
-static void delete_dimension_uuid(uuid_t *dimension_uuid)
+static void delete_dimension_uuid(uuid_t *dimension_uuid, sqlite3_stmt **action_res __maybe_unused, bool flag __maybe_unused)
{
static __thread sqlite3_stmt *res = NULL;
int rc;
@@ -265,7 +278,7 @@ static void delete_dimension_uuid(uuid_t *dimension_uuid)
}
}
- rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto skip_execution;
@@ -286,13 +299,6 @@ static int store_host_metadata(RRDHOST *host)
static __thread sqlite3_stmt *res = NULL;
int rc, param = 0;
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
- return 0;
- error_report("Database has not been initialized");
- return 1;
- }
-
if (unlikely((!res))) {
rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res);
if (unlikely(rc != SQLITE_OK)) {
@@ -361,6 +367,10 @@ static int store_host_metadata(RRDHOST *host)
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
+ rc = sqlite3_bind_int64(res, ++param, (sqlite3_int64) host->last_connected);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
int store_rc = sqlite3_step_monitored(res);
if (unlikely(store_rc != SQLITE_DONE))
error_report("Failed to store host %s, rc = %d", rrdhost_hostname(host), rc);
@@ -474,13 +484,6 @@ static int store_chart_metadata(RRDSET *st)
static __thread sqlite3_stmt *res = NULL;
int rc, param = 0, store_rc = 0;
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
- return 0;
- error_report("Database has not been initialized");
- return 1;
- }
-
if (unlikely(!res)) {
rc = prepare_statement(db_meta, SQL_STORE_CHART, &res);
if (unlikely(rc != SQLITE_OK)) {
@@ -583,13 +586,6 @@ static int store_dimension_metadata(RRDDIM *rd)
static __thread sqlite3_stmt *res = NULL;
int rc, param = 0;
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
- return 0;
- error_report("Database has not been initialized");
- return 1;
- }
-
if (unlikely(!res)) {
rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res);
if (unlikely(rc != SQLITE_OK)) {
@@ -650,7 +646,7 @@ bind_fail:
return 1;
}
-static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused)
+static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused, sqlite3_stmt **res __maybe_unused, bool flag __maybe_unused)
{
#ifdef ENABLE_DBENGINE
if(dbengine_enabled) {
@@ -675,8 +671,173 @@ static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused)
#endif
}
+int get_pragma_value(sqlite3 *database, const char *sql)
+{
+ sqlite3_stmt *res = NULL;
+ int rc = sqlite3_prepare_v2(database, sql, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK))
+ return -1;
+
+ int result = -1;
+ rc = sqlite3_step_monitored(res);
+ if (likely(rc == SQLITE_ROW))
+ result = sqlite3_column_int(res, 0);
+
+ rc = sqlite3_finalize(res);
+ (void) rc;
+
+ return result;
+}
+
+
+int get_free_page_count(sqlite3 *database)
+{
+ return get_pragma_value(database, "PRAGMA freelist_count");
+}
+
+int get_database_page_count(sqlite3 *database)
+{
+ return get_pragma_value(database, "PRAGMA page_count");
+}
+
+static bool run_cleanup_loop(
+ sqlite3_stmt *res,
+ struct metadata_wc *wc,
+ bool (*check_cb)(uuid_t *, sqlite3_stmt **, bool),
+ void (*action_cb)(uuid_t *, sqlite3_stmt **, bool),
+ uint32_t *total_checked,
+ uint32_t *total_deleted,
+ uint64_t *row_id,
+ sqlite3_stmt **check_stmt,
+ sqlite3_stmt **action_stmt,
+ bool check_flag,
+ bool action_flag)
+{
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ return true;
+
+ int rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) *row_id);
+ if (unlikely(rc != SQLITE_OK))
+ return true;
+
+ time_t start_running = now_monotonic_sec();
+ bool time_expired = false;
+ while (!time_expired && sqlite3_step_monitored(res) == SQLITE_ROW &&
+ (*total_deleted < MAX_METADATA_CLEANUP && *total_checked < MAX_METADATA_CLEANUP)) {
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ break;
+
+ *row_id = sqlite3_column_int64(res, 1);
+ rc = check_cb((uuid_t *)sqlite3_column_blob(res, 0), check_stmt, check_flag);
+
+ if (rc == true) {
+ action_cb((uuid_t *)sqlite3_column_blob(res, 0), action_stmt, action_flag);
+ (*total_deleted)++;
+ }
+
+ (*total_checked)++;
+ time_expired = ((now_monotonic_sec() - start_running) > METADATA_RUNTIME_THRESHOLD);
+ }
+ return time_expired || (*total_checked == MAX_METADATA_CLEANUP) || (*total_deleted == MAX_METADATA_CLEANUP);
+}
+
+
+#define SQL_CHECK_CHART_EXISTENCE_IN_DIMENSION "SELECT count(1) FROM dimension WHERE chart_id = @chart_id"
+#define SQL_CHECK_CHART_EXISTENCE_IN_CHART "SELECT count(1) FROM chart WHERE chart_id = @chart_id"
+
+static bool chart_can_be_deleted(uuid_t *chart_uuid, sqlite3_stmt **check_res, bool check_in_dimension)
+{
+ int rc, result = 1;
+ sqlite3_stmt *res = check_res ? *check_res : NULL;
+
+ if (!res) {
+ if (check_in_dimension)
+ rc = sqlite3_prepare_v2(db_meta, SQL_CHECK_CHART_EXISTENCE_IN_DIMENSION, -1, &res, 0);
+ else
+ rc = sqlite3_prepare_v2(db_meta, SQL_CHECK_CHART_EXISTENCE_IN_CHART, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to check for chart existence, rc = %d", rc);
+ return 0;
+ }
+ if (check_res)
+ *check_res = res;
+ }
+
+ rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind chart uuid parameter, rc = %d", rc);
+ goto skip;
+ }
+
+ rc = sqlite3_step_monitored(res);
+ if (likely(rc == SQLITE_ROW))
+ result = sqlite3_column_int(res, 0);
+
+skip:
+ if (check_res)
+ rc = sqlite3_reset(res);
+ else
+ rc = sqlite3_finalize(res);
+
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to %s statement that checks chart uuid existence rc = %d", check_res ? "reset" : "finalize", rc);
+ return result == 0;
+}
+
+#define SQL_DELETE_CHART_BY_UUID "DELETE FROM chart WHERE chart_id = @chart_id"
+#define SQL_DELETE_CHART_LABEL_BY_UUID "DELETE FROM chart_label WHERE chart_id = @chart_id"
+
+static void delete_chart_uuid(uuid_t *chart_uuid, sqlite3_stmt **action_res, bool label_only)
+{
+ int rc;
+ sqlite3_stmt *res = action_res ? *action_res : NULL;
+
+ if (!res) {
+ if (label_only)
+ rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_LABEL_BY_UUID, -1, &res, 0);
+ else
+ rc = sqlite3_prepare_v2(db_meta, SQL_DELETE_CHART_BY_UUID, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to check for chart existence, rc = %d", rc);
+ return;
+ }
+ if (action_res)
+ *action_res = res;
+ }
+
+ rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind chart uuid parameter, rc = %d", rc);
+ goto skip;
+ }
+
+ rc = sqlite3_step_monitored(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to delete a chart uuid from the %s table, rc = %d", label_only ? "labels" : "chart", rc);
+
+skip:
+ if (action_res)
+ rc = sqlite3_reset(res);
+ else
+ rc = sqlite3_finalize(res);
+
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to %s statement that deletes a chart uuid rc = %d", action_res ? "reset" : "finalize", rc);
+}
+
static void check_dimension_metadata(struct metadata_wc *wc)
{
+ static time_t next_execution_t = 0;
+ static uint64_t last_row_id = 0;
+
+ time_t now = now_realtime_sec();
+
+ if (!next_execution_t)
+ next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK;
+
+ if (next_execution_t && next_execution_t > now)
+ return;
+
int rc;
sqlite3_stmt *res = NULL;
@@ -686,54 +847,212 @@ static void check_dimension_metadata(struct metadata_wc *wc)
return;
}
- rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) wc->row_id);
+ uint32_t total_checked = 0;
+ uint32_t total_deleted = 0;
+
+ internal_error(true, "METADATA: Checking dimensions starting after row %"PRIu64, last_row_id);
+
+ bool more_to_do = run_cleanup_loop(
+ res,
+ wc,
+ dimension_can_be_deleted,
+ delete_dimension_uuid,
+ &total_checked,
+ &total_deleted,
+ &last_row_id,
+ NULL,
+ NULL,
+ false,
+ false);
+
+ now = now_realtime_sec();
+ if (more_to_do)
+ next_execution_t = now + METADATA_MAINTENANCE_REPEAT;
+ else {
+ last_row_id = 0;
+ next_execution_t = now + METADATA_DIM_CHECK_INTERVAL;
+ }
+
+ netdata_log_info(
+ "METADATA: Dimensions checked %u, deleted %u. Checks will %s in %lld seconds",
+ total_checked,
+ total_deleted,
+ last_row_id ? "resume" : "restart",
+ (long long)(next_execution_t - now));
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize the prepared statement to check dimensions");
+}
+
+static void check_chart_metadata(struct metadata_wc *wc)
+{
+ static time_t next_execution_t = 0;
+ static uint64_t last_row_id = 0;
+
+ time_t now = now_realtime_sec();
+
+ if (!next_execution_t)
+ next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK;
+
+ if (next_execution_t && next_execution_t > now)
+ return;
+
+ sqlite3_stmt *res = NULL;
+
+ int rc = sqlite3_prepare_v2(db_meta, SELECT_CHART_LIST, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to row parameter");
- goto skip_run;
+ error_report("Failed to prepare statement to fetch charts");
+ return;
}
uint32_t total_checked = 0;
- uint32_t total_deleted= 0;
- uint64_t last_row_id = wc->row_id;
+ uint32_t total_deleted = 0;
+
+ internal_error(true, "METADATA: Checking charts starting after row %"PRIu64, last_row_id);
+
+ sqlite3_stmt *check_res = NULL;
+ sqlite3_stmt *action_res = NULL;
+ bool more_to_do = run_cleanup_loop(
+ res,
+ wc,
+ chart_can_be_deleted,
+ delete_chart_uuid,
+ &total_checked,
+ &total_deleted,
+ &last_row_id,
+ &check_res,
+ &action_res,
+ true,
+ false);
+
+ if (check_res)
+ sqlite3_finalize(check_res);
+
+ if (action_res)
+ sqlite3_finalize(action_res);
+
+ now = now_realtime_sec();
+ if (more_to_do)
+ next_execution_t = now + METADATA_MAINTENANCE_REPEAT;
+ else {
+ last_row_id = 0;
+ next_execution_t = now + METADATA_CHART_CHECK_INTERVAL;
+ }
- netdata_log_info("METADATA: Checking dimensions starting after row %"PRIu64, wc->row_id);
+ netdata_log_info(
+ "METADATA: Charts checked %u, deleted %u. Checks will %s in %lld seconds",
+ total_checked,
+ total_deleted,
+ last_row_id ? "resume" : "restart",
+ (long long)(next_execution_t - now));
- while (sqlite3_step_monitored(res) == SQLITE_ROW && total_deleted < MAX_METADATA_CLEANUP) {
- if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
- break;
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize the prepared statement when reading charts");
+}
+
+static void check_label_metadata(struct metadata_wc *wc)
+{
+ static time_t next_execution_t = 0;
+ static uint64_t last_row_id = 0;
- last_row_id = sqlite3_column_int64(res, 1);
- rc = dimension_can_be_deleted((uuid_t *)sqlite3_column_blob(res, 0));
- if (rc == true) {
- delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0));
- total_deleted++;
- }
- total_checked++;
- }
- wc->row_id = last_row_id;
time_t now = now_realtime_sec();
- if (total_deleted > 0) {
- wc->check_metadata_after = now + METADATA_MAINTENANCE_RETRY;
- } else
- wc->row_id = 0;
- netdata_log_info("METADATA: Checked %u, deleted %u -- will resume after row %"PRIu64" in %lld seconds", total_checked, total_deleted, wc->row_id,
- (long long)(wc->check_metadata_after - now));
-
-skip_run:
+
+ if (!next_execution_t)
+ next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK;
+
+ if (next_execution_t && next_execution_t > now)
+ return;
+
+ int rc;
+ sqlite3_stmt *res = NULL;
+
+ rc = sqlite3_prepare_v2(db_meta, SELECT_CHART_LABEL_LIST, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to fetch charts");
+ return;
+ }
+
+ uint32_t total_checked = 0;
+ uint32_t total_deleted = 0;
+
+ internal_error(true,"METADATA: Checking charts labels starting after row %"PRIu64, last_row_id);
+
+ sqlite3_stmt *check_res = NULL;
+ sqlite3_stmt *action_res = NULL;
+
+ bool more_to_do = run_cleanup_loop(
+ res,
+ wc,
+ chart_can_be_deleted,
+ delete_chart_uuid,
+ &total_checked,
+ &total_deleted,
+ &last_row_id,
+ &check_res,
+ &action_res,
+ false,
+ true);
+
+ if (check_res)
+ sqlite3_finalize(check_res);
+
+ if (action_res)
+ sqlite3_finalize(action_res);
+
+ now = now_realtime_sec();
+ if (more_to_do)
+ next_execution_t = now + METADATA_MAINTENANCE_REPEAT;
+ else {
+ last_row_id = 0;
+ next_execution_t = now + METADATA_LABEL_CHECK_INTERVAL;
+ }
+
+ netdata_log_info(
+ "METADATA: Chart labels checked %u, deleted %u. Checks will %s in %lld seconds",
+ total_checked,
+ total_deleted,
+ last_row_id ? "resume" : "restart",
+ (long long)(next_execution_t - now));
+
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when reading dimensions");
+ error_report("Failed to finalize the prepared statement when checking charts");
}
-static void cleanup_health_log(void)
+
+static void cleanup_health_log(struct metadata_wc *wc)
{
+ static time_t next_execution_t = 0;
+
+ time_t now = now_realtime_sec();
+
+ if (!next_execution_t)
+ next_execution_t = now + METADATA_MAINTENANCE_FIRST_CHECK;
+
+ if (next_execution_t && next_execution_t > now)
+ return;
+
+ next_execution_t = now + METADATA_HEALTH_LOG_INTERVAL;
+
RRDHOST *host;
- dfe_start_reentrant(rrdhost_root_index, host) {
+
+ bool is_claimed = claimed();
+ dfe_start_reentrant(rrdhost_root_index, host){
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
continue;
- sql_health_alarm_log_cleanup(host);
+ sql_health_alarm_log_cleanup(host, is_claimed);
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ break;
}
dfe_done(host);
+
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ return;
+
+ (void) db_execute(db_meta,"DELETE FROM health_log WHERE host_id NOT IN (SELECT host_id FROM host)");
+ (void) db_execute(db_meta,"DELETE FROM health_log_detail WHERE health_log_id NOT IN (SELECT health_log_id FROM health_log)");
}
//
@@ -742,103 +1061,57 @@ static void cleanup_health_log(void)
static void metadata_init_cmd_queue(struct metadata_wc *wc)
{
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- wc->queue_size = 0;
- fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
+ wc->cmd_queue.cmd_base = NULL;
fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
-int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd)
+static void metadata_free_cmd_queue(struct metadata_wc *wc)
{
- unsigned queue_size;
-
- /* wait for free space in queue */
uv_mutex_lock(&wc->cmd_mutex);
-
- if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
- metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
- uv_mutex_unlock(&wc->cmd_mutex);
- return 0;
- }
-
- if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE ||
- metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return 1;
+ while(wc->cmd_queue.cmd_base) {
+ struct metadata_cmd *t = wc->cmd_queue.cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ freez(t);
}
-
- fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
uv_mutex_unlock(&wc->cmd_mutex);
- return 0;
}
static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
- uv_mutex_unlock(&wc->cmd_mutex);
- (void) uv_async_send(&wc->async);
- return;
- }
-
if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
- uv_mutex_unlock(&wc->cmd_mutex);
- (void) uv_async_send(&wc->async);
- return;
+ goto wakeup_event_loop;
}
- while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) {
- if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return;
- }
- uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
- }
- fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ goto wakeup_event_loop;
+
+ struct metadata_cmd *t = mallocz(sizeof(*t));
+ *t = *cmd;
+ t->prev = t->next = NULL;
+
+ uv_mutex_lock(&wc->cmd_mutex);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
uv_mutex_unlock(&wc->cmd_mutex);
- /* wake up event loop */
+wakeup_event_loop:
(void) uv_async_send(&wc->async);
}
static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
{
struct metadata_cmd ret;
- unsigned queue_size;
uv_mutex_lock(&wc->cmd_mutex);
- queue_size = wc->queue_size;
- if (queue_size == 0) {
- memset(&ret, 0, sizeof(ret));
+ if(wc->cmd_queue.cmd_base) {
+ struct metadata_cmd *t = wc->cmd_queue.cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ ret = *t;
+ freez(t);
+ }
+ else {
ret.opcode = METADATA_DATABASE_NOOP;
ret.completion = NULL;
- } else {
- /* dequeue command */
- ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
-
- if (queue_size == 1) {
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- } else {
- wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.head + 1 : 0;
- }
- wc->queue_size = queue_size - 1;
- /* wake up producers */
- uv_cond_signal(&wc->cmd_cond);
}
uv_mutex_unlock(&wc->cmd_mutex);
@@ -865,43 +1138,62 @@ static void timer_cb(uv_timer_t* handle)
time_t now = now_realtime_sec();
- if (wc->check_metadata_after && wc->check_metadata_after < now) {
- cmd.opcode = METADATA_MAINTENANCE;
- if (!metadata_enq_cmd_noblock(wc, &cmd))
- wc->check_metadata_after = now + METADATA_MAINTENANCE_INTERVAL;
- }
-
- if (wc->check_hosts_after && wc->check_hosts_after < now) {
+ if (wc->metadata_check_after && wc->metadata_check_after < now) {
cmd.opcode = METADATA_SCAN_HOSTS;
- if (!metadata_enq_cmd_noblock(wc, &cmd))
- wc->check_hosts_after = now + METADATA_HOST_CHECK_INTERVAL;
+ metadata_enq_cmd(wc, &cmd);
}
}
-static void after_metadata_cleanup(uv_work_t *req, int status)
+void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int vacuum_pc)
{
- UNUSED(status);
+ int free_pages = get_free_page_count(database);
+ int total_pages = get_database_page_count(database);
+
+ if (!threshold)
+ threshold = DATABASE_FREE_PAGES_THRESHOLD_PC;
+
+ if (!vacuum_pc)
+ vacuum_pc = DATABASE_FREE_PAGES_VACUUM_PC;
+
+ if (free_pages > (total_pages * threshold / 100)) {
- struct metadata_wc *wc = req->data;
- metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
+ int do_free_pages = (int) (free_pages * vacuum_pc / 100);
+ netdata_log_info("%s: Freeing %d database pages", db_alias, do_free_pages);
+
+ char sql[128];
+ snprintfz(sql, 127, "PRAGMA incremental_vacuum(%d)", do_free_pages);
+ (void) db_execute(database, sql);
+ }
}
-static void start_metadata_cleanup(uv_work_t *req)
+void run_metadata_cleanup(struct metadata_wc *wc)
{
- register_libuv_worker_jobs();
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ return;
- worker_is_busy(UV_EVENT_METADATA_CLEANUP);
- struct metadata_wc *wc = req->data;
check_dimension_metadata(wc);
- cleanup_health_log();
+ check_chart_metadata(wc);
+ check_label_metadata(wc);
+ cleanup_health_log(wc);
+
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ return;
+
+ vacuum_database(db_meta, "METADATA", DATABASE_FREE_PAGES_THRESHOLD_PC, DATABASE_FREE_PAGES_VACUUM_PC);
+
(void) sqlite3_wal_checkpoint(db_meta, NULL);
- worker_is_idle();
}
+struct ml_model_payload {
+ uv_work_t request;
+ struct metadata_wc *wc;
+ Pvoid_t JudyL;
+ size_t count;
+};
+
struct scan_metadata_payload {
uv_work_t request;
struct metadata_wc *wc;
- struct completion *completion;
BUFFER *work_buffer;
uint32_t max_count;
};
@@ -1027,10 +1319,10 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
struct scan_metadata_payload *data = req->data;
struct metadata_wc *wc = data->wc;
- metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
+ metadata_flag_clear(wc, METADATA_FLAG_PROCESSING);
internal_error(true, "METADATA: scanning hosts complete");
- if (unlikely(data->completion)) {
- completion_mark_complete(data->completion);
+ if (unlikely(wc->scan_complete)) {
+ completion_mark_complete(wc->scan_complete);
internal_error(true, "METADATA: Sending completion done");
}
freez(data);
@@ -1044,7 +1336,7 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
uint32_t scan_count = 1;
if (use_transaction)
- (void)db_execute(db_meta, "BEGIN TRANSACTION;");
+ (void)db_execute(db_meta, "BEGIN TRANSACTION");
rrdset_foreach_reentrant(st, host) {
if (scan_count == max_count) {
@@ -1093,7 +1385,7 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_trans
rrdset_foreach_done(st);
if (use_transaction)
- (void)db_execute(db_meta, "COMMIT TRANSACTION;");
+ (void)db_execute(db_meta, "COMMIT TRANSACTION");
return more_to_do;
}
@@ -1160,6 +1452,7 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
struct query_build tmp = {.sql = work_buffer, .count = 0};
uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
+ buffer_strcat(work_buffer, " ON CONFLICT (host_id, label_key) DO UPDATE SET source_type = excluded.source_type, label_value=excluded.label_value, date_created=UNIXEPOCH()");
rc = db_execute(db_meta, buffer_tostring(work_buffer));
if (unlikely(rc)) {
@@ -1215,12 +1508,50 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
(double)(all_ended_ut - all_started_ut) / USEC_PER_MS);
if (unlikely(run_again))
- wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;
- else
- wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL;
+ wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;
+ else {
+ wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL;
+ run_metadata_cleanup(wc);
+ }
+ worker_is_idle();
+}
+
+// Callback after scan of hosts is done
+static void after_start_ml_model_load(uv_work_t *req, int status __maybe_unused)
+{
+ struct ml_model_payload *ml_data = req->data;
+ struct metadata_wc *wc = ml_data->wc;
+ metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING);
+ JudyLFreeArray(&ml_data->JudyL, PJE0);
+ freez(ml_data);
+}
+
+static void start_ml_model_load(uv_work_t *req __maybe_unused)
+{
+ register_libuv_worker_jobs();
+
+ struct ml_model_payload *ml_data = req->data;
+
+ worker_is_busy(UV_EVENT_METADATA_ML_LOAD);
+
+ Pvoid_t *PValue;
+ Word_t Index = 0;
+ bool first = true;
+ RRDDIM *rd;
+ RRDDIM_ACQUIRED *rda;
+ internal_error(true, "Batch ML load loader, %zu items", ml_data->count);
+ while((PValue = JudyLFirstThenNext(ml_data->JudyL, &Index, &first))) {
+ UNUSED(PValue);
+ rda = (RRDDIM_ACQUIRED *) Index;
+ rd = rrddim_acquired_to_rrddim(rda);
+ ml_dimension_load_models(rd);
+ rrddim_acquired_release(rda);
+ }
worker_is_idle();
}
+
+
static void metadata_event_loop(void *arg)
{
worker_register("METASYNC");
@@ -1237,10 +1568,8 @@ static void metadata_event_loop(void *arg)
unsigned cmd_batch_size;
struct metadata_wc *wc = arg;
enum metadata_opcode opcode;
- uv_work_t metadata_cleanup_worker;
uv_thread_set_name_np(wc->thread, "METASYNC");
-// service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
ret = uv_loop_init(loop);
if (ret) {
@@ -1268,19 +1597,17 @@ static void metadata_event_loop(void *arg)
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
- metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
+ metadata_flag_clear(wc, METADATA_FLAG_PROCESSING);
- wc->check_metadata_after = now_realtime_sec() + METADATA_MAINTENANCE_FIRST_CHECK;
- wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
+ wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
int shutdown = 0;
- wc->row_id = 0;
completion_mark_complete(&wc->init_complete);
BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
struct scan_metadata_payload *data;
- while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
+ struct ml_model_payload *ml_data = NULL;
+ while (shutdown == 0 || (wc->flags & METADATA_FLAG_PROCESSING)) {
uuid_t *uuid;
RRDHOST *host = NULL;
@@ -1306,6 +1633,24 @@ static void metadata_event_loop(void *arg)
if (likely(opcode != METADATA_DATABASE_NOOP))
worker_is_busy(opcode);
+ // Have pending ML models to load?
+ if (opcode != METADATA_ML_LOAD_MODELS && ml_data && ml_data->count) {
+ static usec_t ml_submit_last = 0;
+ usec_t now = now_monotonic_usec();
+ if (!ml_submit_last)
+ ml_submit_last = now;
+
+ if (!metadata_flag_check(wc, METADATA_FLAG_ML_LOADING) && (now - ml_submit_last > 150 * USEC_PER_MS)) {
+ metadata_flag_set(wc, METADATA_FLAG_ML_LOADING);
+ if (unlikely(uv_queue_work(loop, &ml_data->request, start_ml_model_load, after_start_ml_model_load)))
+ metadata_flag_clear(wc, METADATA_FLAG_ML_LOADING);
+ else {
+ ml_submit_last = now;
+ ml_data = NULL;
+ }
+ }
+ }
+
switch (opcode) {
case METADATA_DATABASE_NOOP:
case METADATA_DATABASE_TIMER:
@@ -1313,13 +1658,22 @@ static void metadata_event_loop(void *arg)
case METADATA_ML_LOAD_MODELS: {
RRDDIM *rd = (RRDDIM *) cmd.param[0];
- ml_dimension_load_models(rd);
+ RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(rd->rrdset, rrddim_id(rd));
+ if (likely(rda)) {
+ if (!ml_data) {
+ ml_data = callocz(1,sizeof(*ml_data));
+ ml_data->request.data = ml_data;
+ ml_data->wc = wc;
+ }
+ JudyLIns(&ml_data->JudyL, (Word_t)rda, PJE0);
+ ml_data->count++;
+ }
break;
}
case METADATA_DEL_DIMENSION:
uuid = (uuid_t *) cmd.param[0];
- if (likely(dimension_can_be_deleted(uuid)))
- delete_dimension_uuid(uuid);
+ if (likely(dimension_can_be_deleted(uuid, NULL, false)))
+ delete_dimension_uuid(uuid, NULL, false);
freez(uuid);
break;
case METADATA_STORE_CLAIM_ID:
@@ -1332,7 +1686,7 @@ static void metadata_event_loop(void *arg)
store_host_and_system_info(host, NULL);
break;
case METADATA_SCAN_HOSTS:
- if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_PROCESSING)))
break;
if (unittest_running)
@@ -1341,7 +1695,6 @@ static void metadata_event_loop(void *arg)
data = mallocz(sizeof(*data));
data->request.data = data;
data->wc = wc;
- data->completion = cmd.completion; // Completion by the worker
data->work_buffer = work_buffer;
if (unlikely(cmd.completion)) {
@@ -1351,15 +1704,15 @@ static void metadata_event_loop(void *arg)
else
data->max_count = 5000;
- metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS);
+ metadata_flag_set(wc, METADATA_FLAG_PROCESSING);
if (unlikely(
uv_queue_work(loop,&data->request,
start_metadata_hosts,
after_metadata_hosts))) {
// Failed to launch worker -- let the event loop handle completion
- cmd.completion = data->completion;
+ cmd.completion = wc->scan_complete;
freez(data);
- metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
+ metadata_flag_clear(wc, METADATA_FLAG_PROCESSING);
}
break;
case METADATA_LOAD_HOST_CONTEXT:;
@@ -1375,17 +1728,6 @@ static void metadata_event_loop(void *arg)
freez(data);
}
break;
- case METADATA_MAINTENANCE:
- if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
- break;
-
- metadata_cleanup_worker.data = wc;
- metadata_flag_set(wc, METADATA_FLAG_CLEANUP);
- if (unlikely(
- uv_queue_work(loop, &metadata_cleanup_worker, start_metadata_cleanup, after_metadata_cleanup))) {
- metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
- }
- break;
case METADATA_UNITTEST:;
struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0];
sleep_usec(1000); // processing takes 1ms
@@ -1404,7 +1746,6 @@ static void metadata_event_loop(void *arg)
uv_close((uv_handle_t *)&wc->timer_req, NULL);
uv_close((uv_handle_t *)&wc->async, NULL);
- uv_cond_destroy(&wc->cmd_cond);
int rc;
do {
rc = uv_loop_close(loop);
@@ -1416,6 +1757,9 @@ static void metadata_event_loop(void *arg)
netdata_log_info("METADATA: Shutting down event loop");
completion_mark_complete(&wc->init_complete);
+ completion_destroy(wc->scan_complete);
+ freez(wc->scan_complete);
+ metadata_free_cmd_queue(wc);
return;
error_after_timer_init:
@@ -1454,24 +1798,25 @@ void metadata_sync_shutdown_prepare(void)
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
- struct completion compl;
- completion_init(&compl);
+ struct metadata_wc *wc = &metasync_worker;
+
+ struct completion *compl = mallocz(sizeof(*compl));
+ completion_init(compl);
+ __atomic_store_n(&wc->scan_complete, compl, __ATOMIC_RELAXED);
netdata_log_info("METADATA: Sending a scan host command");
uint32_t max_wait_iterations = 2000;
- while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_SCANNING_HOSTS)) && max_wait_iterations--) {
+ while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_PROCESSING)) && max_wait_iterations--) {
if (max_wait_iterations == 1999)
netdata_log_info("METADATA: Current worker is running; waiting to finish");
sleep_usec(1000);
}
cmd.opcode = METADATA_SCAN_HOSTS;
- cmd.completion = &compl;
metadata_enq_cmd(&metasync_worker, &cmd);
netdata_log_info("METADATA: Waiting for host scan completion");
- completion_wait_for(&compl);
- completion_destroy(&compl);
+ completion_wait_for(wc->scan_complete);
netdata_log_info("METADATA: Host scan complete; can continue with shutdown");
}
@@ -1631,7 +1976,6 @@ int metadata_unittest(void)
// Queue items for a specific period of time
metadata_unittest_threads();
- fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size);
metadata_sync_shutdown();
return 0;