summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_metadata.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_metadata.c')
-rw-r--r--database/sqlite/sqlite_metadata.c549
1 files changed, 360 insertions, 189 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 35f928ff..607d789a 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -64,9 +64,11 @@ enum metadata_opcode {
METADATA_STORE_CLAIM_ID,
METADATA_ADD_HOST_INFO,
METADATA_SCAN_HOSTS,
+ METADATA_LOAD_HOST_CONTEXT,
METADATA_MAINTENANCE,
METADATA_SYNC_SHUTDOWN,
METADATA_UNITTEST,
+ METADATA_ML_LOAD_MODELS,
// leave this last
// we need it to check for worker utilization
METADATA_MAX_ENUMERATIONS_DEFINED
@@ -154,19 +156,43 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value
return 1;
}
-static void check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer)
+#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;"
+#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;"
+
+static void clean_old_chart_labels(RRDSET *st)
+{
+ char sql[512];
+ time_t first_time_s = rrdset_first_entry_s(st);
+
+ if (unlikely(!first_time_s))
+ snprintfz(sql, 511,SQL_DELETE_CHART_LABEL);
+ else
+ snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
+
+ int rc = exec_statement_with_uuid(sql, &st->chart_uuid);
+ if (unlikely(rc))
+ error_report("METADATA: 'host:%s' Failed to clean old labels for chart %s", rrdhost_hostname(st->rrdhost), rrdset_name(st));
+}
+
+static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter)
{
size_t old_version = st->rrdlabels_last_saved_version;
size_t new_version = dictionary_version(st->rrdlabels);
- if(new_version != old_version) {
- buffer_flush(work_buffer);
- struct query_build tmp = {.sql = work_buffer, .count = 0};
- uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
- rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
+ if (new_version == old_version)
+ return 0;
+
+ struct query_build tmp = {.sql = work_buffer, .count = 0};
+ uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
+ rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
+ int rc = db_execute(db_meta, buffer_tostring(work_buffer));
+ if (likely(!rc)) {
st->rrdlabels_last_saved_version = new_version;
- db_execute(buffer_tostring(work_buffer));
+ (*query_counter)++;
}
+
+ clean_old_chart_labels(st);
+ return rc;
}
// Migrate all hosts with hops zero to this host_uuid
@@ -177,12 +203,13 @@ void migrate_localhost(uuid_t *host_uuid)
rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid);
if (!rc)
rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid);
- if (!rc)
- db_execute(DELETE_MISSING_NODE_INSTANCES);
-
+ if (!rc) {
+ if (unlikely(db_execute(db_meta, DELETE_MISSING_NODE_INSTANCES)))
+ error_report("Failed to remove deleted hosts from node instances");
+ }
}
-static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
+static int store_claim_id(uuid_t *host_id, uuid_t *claim_id)
{
sqlite3_stmt *res = NULL;
int rc;
@@ -190,18 +217,18 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
if (unlikely(!db_meta)) {
if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
error_report("Database has not been initialized");
- return;
+ return 1;
}
rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement store chart labels");
- return;
+ error_report("Failed to prepare statement to store host claim id");
+ return 1;
}
rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id parameter to store node instance information");
+ error_report("Failed to bind host_id parameter to store claim id");
goto failed;
}
@@ -210,17 +237,19 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
else
rc = sqlite3_bind_null(res, 2);
if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind claim_id parameter to store node instance information");
+ error_report("Failed to bind claim_id parameter to host claim id");
goto failed;
}
rc = execute_insert(res);
if (unlikely(rc != SQLITE_DONE))
- error_report("Failed to store node instance information, rc = %d", rc);
+ error_report("Failed to store host claim id rc = %d", rc);
failed:
if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when storing node instance information");
+ error_report("Failed to finalize the prepared statement when storing a host claim id");
+
+ return rc != SQLITE_DONE;
}
static void delete_dimension_uuid(uuid_t *dimension_uuid)
@@ -252,7 +281,7 @@ skip_execution:
//
// Store host and host system info information in the database
-static int sql_store_host_info(RRDHOST *host)
+static int store_host_metadata(RRDHOST *host)
{
static __thread sqlite3_stmt *res = NULL;
int rc, param = 0;
@@ -340,7 +369,7 @@ static int sql_store_host_info(RRDHOST *host)
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
- return !(store_rc == SQLITE_DONE);
+ return store_rc != SQLITE_DONE;
bind_fail:
error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc);
rc = sqlite3_reset(res);
@@ -349,7 +378,7 @@ bind_fail:
return 1;
}
-static void sql_store_host_system_info_key_value(const char *name, const char *value, void *data)
+static void add_host_sysinfo_key_value(const char *name, const char *value, void *data)
{
struct query_build *lb = data;
@@ -365,44 +394,43 @@ static void sql_store_host_system_info_key_value(const char *name, const char *v
lb->count++;
}
-static BUFFER *sql_store_host_system_info(RRDHOST *host)
+static bool build_host_system_info_statements(RRDHOST *host, BUFFER *work_buffer)
{
struct rrdhost_system_info *system_info = host->system_info;
if (unlikely(!system_info))
- return NULL;
-
- BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ return false;
+ buffer_flush(work_buffer);
struct query_build key_data = {.sql = work_buffer, .count = 0};
uuid_unparse_lower(host->host_uuid, key_data.uuid_str);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data);
- sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data);
- sql_store_host_system_info_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data);
-
- return work_buffer;
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data);
+ add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data);
+ add_host_sysinfo_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data);
+
+ return true;
}
@@ -410,13 +438,10 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host)
* Store a chart in the database
*/
-static int sql_store_chart(
- uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family,
- const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority,
- int update_every, int chart_type, int memory_mode, long history_entries)
+static int store_chart_metadata(RRDSET *st)
{
static __thread sqlite3_stmt *res = NULL;
- int rc, param = 0;
+ int rc, param = 0, store_rc = 0;
if (unlikely(!db_meta)) {
if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
@@ -433,98 +458,83 @@ static int sql_store_chart(
}
}
- param++;
- rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &st->chart_uuid, sizeof(st->chart_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &st->rrdhost->host_uuid, sizeof(st->rrdhost->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(st->parts.type), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(st->parts.id), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
+ const char *name = string2str(st->parts.name);
if (name && *name)
- rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
else
- rc = sqlite3_bind_null(res, 5);
+ rc = sqlite3_bind_null(res, ++param);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_family(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_context(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_title(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_units(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_plugin_name(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, rrdset_module_name(st), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 12, (int) priority);
+ rc = sqlite3_bind_int(res, ++param, (int) st->priority);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 13, update_every);
+ rc = sqlite3_bind_int(res, ++param, st->update_every);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 14, chart_type);
+ rc = sqlite3_bind_int(res, ++param, st->chart_type);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 15, memory_mode);
+ rc = sqlite3_bind_int(res, ++param, st->rrd_memory_mode);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- param++;
- rc = sqlite3_bind_int(res, 16, (int) history_entries);
+ rc = sqlite3_bind_int(res, ++param, (int) st->entries);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = execute_insert(res);
- if (unlikely(rc != SQLITE_DONE))
- error_report("Failed to store chart, rc = %d", rc);
+ store_rc = execute_insert(res);
+ if (unlikely(store_rc != SQLITE_DONE))
+ error_report("Failed to store chart, rc = %d", store_rc);
rc = sqlite3_reset(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement in chart store function, rc = %d", rc);
- return 0;
+ return store_rc != SQLITE_DONE;
bind_fail:
error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc);
@@ -537,9 +547,7 @@ bind_fail:
/*
* Store a dimension
*/
-static int sql_store_dimension(
- uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier,
- collected_number divisor, int algorithm, bool hidden)
+static int store_dimension_metadata(RRDDIM *rd)
{
static __thread sqlite3_stmt *res = NULL;
int rc, param = 0;
@@ -559,35 +567,35 @@ static int sql_store_dimension(
}
}
- rc = sqlite3_bind_blob(res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &rd->metric_uuid, sizeof(rd->metric_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_blob(res, ++param, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, &rd->rrdset->chart_uuid, sizeof(rd->rrdset->chart_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(rd->id), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_text(res, ++param, string2str(rd->name), -1, SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int) multiplier);
+ rc = sqlite3_bind_int(res, ++param, (int) rd->multiplier);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int ) divisor);
+ rc = sqlite3_bind_int(res, ++param, (int ) rd->divisor);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, algorithm);
+ rc = sqlite3_bind_int(res, ++param, rd->algorithm);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- if (hidden)
+ if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC);
else
rc = sqlite3_bind_null(res, ++param);
@@ -700,7 +708,6 @@ static void cleanup_health_log(void)
//
// EVENT LOOP STARTS HERE
//
-static uv_mutex_t metadata_async_lock;
static void metadata_init_cmd_queue(struct metadata_wc *wc)
{
@@ -856,6 +863,7 @@ static void start_metadata_cleanup(uv_work_t *req)
struct metadata_wc *wc = req->data;
check_dimension_metadata(wc);
cleanup_health_log();
+ (void) sqlite3_wal_checkpoint(db_meta, NULL);
worker_is_idle();
}
@@ -863,9 +871,125 @@ struct scan_metadata_payload {
uv_work_t request;
struct metadata_wc *wc;
struct completion *completion;
+ BUFFER *work_buffer;
uint32_t max_count;
};
+struct host_context_load_thread {
+ uv_thread_t thread;
+ RRDHOST *host;
+ bool busy;
+ bool finished;
+};
+
+static void restore_host_context(void *arg)
+{
+ struct host_context_load_thread *hclt = arg;
+ RRDHOST *host = hclt->host;
+
+ usec_t started_ut = now_monotonic_usec(); (void)started_ut;
+ rrdhost_load_rrdcontext_data(host);
+ usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
+
+#ifdef ENABLE_ACLK
+ aclk_queue_node_info(host, false);
+#endif
+
+ internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host),
+ (double)(ended_ut - started_ut) / USEC_PER_MS);
+
+ __atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE);
+}
+
+// Callback after scan of hosts is done
+static void after_start_host_load_context(uv_work_t *req, int status __maybe_unused)
+{
+ struct scan_metadata_payload *data = req->data;
+ freez(data);
+}
+
+#define MAX_FIND_THREAD_RETRIES (10)
+
+static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait)
+{
+ for (size_t index = 0; index < max_thread_slots; index++) {
+ if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED)
+ || (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) {
+ int rc = uv_thread_join(&(hclt[index].thread));
+ if (rc)
+ error("Failed to join thread, rc = %d",rc);
+ __atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE);
+ __atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE);
+ }
+ }
+}
+
+static size_t find_available_thread_slot(struct host_context_load_thread *hclt, size_t max_thread_slots, size_t *found_index)
+{
+ size_t retries = MAX_FIND_THREAD_RETRIES;
+ while (retries--) {
+ size_t index = 0;
+ while (index < max_thread_slots) {
+ if (false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) {
+ *found_index = index;
+ return true;
+ }
+ index++;
+ }
+ sleep_usec(10 * USEC_PER_MS);
+ }
+ return false;
+}
+
+static void start_all_host_load_context(uv_work_t *req __maybe_unused)
+{
+ register_libuv_worker_jobs();
+
+ struct scan_metadata_payload *data = req->data;
+ UNUSED(data);
+
+ worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD);
+ usec_t started_ut = now_monotonic_usec(); (void)started_ut;
+
+ RRDHOST *host;
+
+ size_t max_threads = MIN(get_netdata_cpus() / 2, 6);
+ struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt));
+
+ size_t thread_index;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) ||
+ !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
+ continue;
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
+ internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host));
+
+ cleanup_finished_threads(hclt, max_threads, false);
+ bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
+
+ if (unlikely(!found_slot)) {
+ struct host_context_load_thread hclt_sync = {.host = host};
+ restore_host_context(&hclt_sync);
+ }
+ else {
+ __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
+ hclt[thread_index].host = host;
+ assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
+ }
+ }
+ dfe_done(host);
+
+ cleanup_finished_threads(hclt, max_threads, true);
+ freez(hclt);
+ usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
+ internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
+
+ worker_is_idle();
+}
+
// Callback after scan of hosts is done
static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
{
@@ -881,13 +1005,15 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
freez(data);
}
-static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) {
+static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_transaction, BUFFER *work_buffer, size_t *query_counter) {
RRDSET *st;
int rc;
bool more_to_do = false;
uint32_t scan_count = 1;
- BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+
+ if (use_transaction)
+ (void)db_execute(db_meta, "BEGIN TRANSACTION;");
rrdset_foreach_reentrant(st, host) {
if (scan_count == max_count) {
@@ -900,27 +1026,16 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_
rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
scan_count++;
- check_and_update_chart_labels(st, work_buffer);
-
- rc = sql_store_chart(
- &st->chart_uuid,
- &st->rrdhost->host_uuid,
- string2str(st->parts.type),
- string2str(st->parts.id),
- string2str(st->parts.name),
- rrdset_family(st),
- rrdset_context(st),
- rrdset_title(st),
- rrdset_units(st),
- rrdset_plugin_name(st),
- rrdset_module_name(st),
- st->priority,
- st->update_every,
- st->chart_type,
- st->rrd_memory_mode,
- st->entries);
+ buffer_flush(work_buffer);
+ rc = check_and_update_chart_labels(st, work_buffer, query_counter);
+ if (unlikely(rc))
+ error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st));
+ else
+ (*query_counter)++;
+
+ rc = store_chart_metadata(st);
if (unlikely(rc))
- internal_error(true, "METADATA: Failed to store chart metadata %s", string2str(st->id));
+ error_report("METADATA: 'host:%s': Failed to store metadata for chart %s", rrdhost_hostname(host), rrdset_name(st));
}
RRDDIM *rd;
@@ -935,116 +1050,145 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_
else
rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
- rc = sql_store_dimension(
- &rd->metric_uuid,
- &rd->rrdset->chart_uuid,
- string2str(rd->id),
- string2str(rd->name),
- rd->multiplier,
- rd->divisor,
- rd->algorithm,
- rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN));
-
+ rc = store_dimension_metadata(rd);
if (unlikely(rc))
- error_report("METADATA: Failed to store dimension %s", string2str(rd->id));
+ error_report("METADATA: 'host:%s': Failed to dimension metadata for chart %s. dimension %s",
+ rrdhost_hostname(host), rrdset_name(st),
+ rrddim_name(rd));
}
}
rrddim_foreach_done(rd);
}
rrdset_foreach_done(st);
- buffer_free(work_buffer);
+ if (use_transaction)
+ (void)db_execute(db_meta, "COMMIT TRANSACTION;");
+
return more_to_do;
}
+static void store_host_and_system_info(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter)
+{
+ bool free_work_buffer = (NULL == work_buffer);
+
+ if (unlikely(free_work_buffer))
+ work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+
+ if (build_host_system_info_statements(host, work_buffer)) {
+ int rc = db_execute(db_meta, buffer_tostring(work_buffer));
+ if (unlikely(rc)) {
+ error_report("METADATA: 'host:%s': Failed to store host updated information in the database", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
+ }
+ else {
+ if (likely(query_counter))
+ (*query_counter)++;
+ }
+ }
+
+ if (unlikely(store_host_metadata(host))) {
+ error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
+ }
+ else {
+ if (likely(query_counter))
+ (*query_counter)++;
+ }
+
+ if (unlikely(free_work_buffer))
+ buffer_free(work_buffer);
+}
+
// Worker thread to scan hosts for pending metadata to store
static void start_metadata_hosts(uv_work_t *req __maybe_unused)
{
register_libuv_worker_jobs();
RRDHOST *host;
+ int transaction_started = 0;
struct scan_metadata_payload *data = req->data;
struct metadata_wc *wc = data->wc;
+ BUFFER *work_buffer = data->work_buffer;
usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut;
internal_error(true, "METADATA: checking all hosts...");
+ usec_t started_ut = now_monotonic_usec(); (void)started_ut;
bool run_again = false;
worker_is_busy(UV_EVENT_METADATA_STORE);
if (!data->max_count)
- db_execute("BEGIN TRANSACTION;");
+ transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;");
+
dfe_start_reentrant(rrdhost_root_index, host) {
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
continue;
size_t query_counter = 0; (void)query_counter;
- usec_t started_ut = now_monotonic_usec(); (void)started_ut;
rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) {
rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS);
+
int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
- if (likely(rc == SQLITE_OK)) {
- BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ if (likely(!rc)) {
+ query_counter++;
+
+ buffer_flush(work_buffer);
struct query_build tmp = {.sql = work_buffer, .count = 0};
uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
- db_execute(buffer_tostring(work_buffer));
- buffer_free(work_buffer);
- query_counter++;
+ rc = db_execute(db_meta, buffer_tostring(work_buffer));
+
+ if (unlikely(rc)) {
+ error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
+ }
+ else
+ query_counter++;
+ } else {
+ error_report("METADATA: 'host:%s': failed to delete old host labels", rrdhost_hostname(host));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
}
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) {
rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID);
uuid_t uuid;
-
+ int rc;
if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid)))
- store_claim_id(&host->host_uuid, &uuid);
+ rc = store_claim_id(&host->host_uuid, &uuid);
else
- store_claim_id(&host->host_uuid, NULL);
-
- query_counter++;
- }
-
- if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
- rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
-
- BUFFER *work_buffer = sql_store_host_system_info(host);
- if(work_buffer) {
- db_execute(buffer_tostring(work_buffer));
- buffer_free(work_buffer);
- query_counter++;
- }
+ rc = store_claim_id(&host->host_uuid, NULL);
- int rc = sql_store_host_info(host);
if (unlikely(rc))
- error_report("METADATA: 'host:%s': failed to store host info", string2str(host->hostname));
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE);
else
query_counter++;
}
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
+ store_host_and_system_info(host, work_buffer, &query_counter);
+ }
- if (data->max_count)
- db_execute("BEGIN TRANSACTION;");
- if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) {
+ // For clarity
+ bool use_transaction = data->max_count;
+ if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) {
run_again = true;
rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host));
}
- if (data->max_count)
- db_execute("COMMIT TRANSACTION;");
-
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms",
rrdhost_hostname(host), query_counter,
(double)(ended_ut - started_ut) / USEC_PER_MS);
}
dfe_done(host);
- if (!data->max_count)
- db_execute("COMMIT TRANSACTION;");
+
+ if (!data->max_count && transaction_started)
+ transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;");
usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
@@ -1059,7 +1203,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused)
static void metadata_event_loop(void *arg)
{
- service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register("METASYNC");
worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
@@ -1067,6 +1210,7 @@ static void metadata_event_loop(void *arg)
worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
+ worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models");
int ret;
uv_loop_t *loop;
@@ -1076,6 +1220,7 @@ static void metadata_event_loop(void *arg)
uv_work_t metadata_cleanup_worker;
uv_thread_set_name_np(wc->thread, "METASYNC");
+// service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
ret = uv_loop_init(loop);
if (ret) {
@@ -1112,11 +1257,12 @@ static void metadata_event_loop(void *arg)
int shutdown = 0;
wc->row_id = 0;
completion_mark_complete(&wc->init_complete);
+ BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ struct scan_metadata_payload *data;
while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
uuid_t *uuid;
RRDHOST *host = NULL;
- int rc;
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
@@ -1145,6 +1291,11 @@ static void metadata_event_loop(void *arg)
case METADATA_DATABASE_TIMER:
break;
+ case METADATA_ML_LOAD_MODELS: {
+ RRDDIM *rd = (RRDDIM *) cmd.param[0];
+ ml_dimension_load_models(rd);
+ break;
+ }
case METADATA_DEL_DIMENSION:
uuid = (uuid_t *) cmd.param[0];
if (likely(dimension_can_be_deleted(uuid)))
@@ -1158,9 +1309,7 @@ static void metadata_event_loop(void *arg)
break;
case METADATA_ADD_HOST_INFO:
host = (RRDHOST *) cmd.param[0];
- rc = sql_store_host_info(host);
- if (unlikely(rc))
- error_report("Failed to store host info in the database for %s", string2str(host->hostname));
+ store_host_and_system_info(host, NULL, NULL);
break;
case METADATA_SCAN_HOSTS:
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
@@ -1169,10 +1318,11 @@ static void metadata_event_loop(void *arg)
if (unittest_running)
break;
- struct scan_metadata_payload *data = mallocz(sizeof(*data));
+ data = mallocz(sizeof(*data));
data->request.data = data;
data->wc = wc;
data->completion = cmd.completion; // Completion by the worker
+ data->work_buffer = work_buffer;
if (unlikely(cmd.completion)) {
data->max_count = 0; // 0 will process all pending updates
@@ -1192,6 +1342,19 @@ static void metadata_event_loop(void *arg)
metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
}
break;
+ case METADATA_LOAD_HOST_CONTEXT:;
+ if (unittest_running)
+ break;
+
+ data = callocz(1,sizeof(*data));
+ data->request.data = data;
+ data->wc = wc;
+ if (unlikely(
+ uv_queue_work(loop,&data->request, start_all_host_load_context,
+ after_start_host_load_context))) {
+ freez(data);
+ }
+ break;
case METADATA_MAINTENANCE:
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
break;
@@ -1220,21 +1383,14 @@ static void metadata_event_loop(void *arg)
if (!uv_timer_stop(&wc->timer_req))
uv_close((uv_handle_t *)&wc->timer_req, NULL);
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour we need to be aware if this becomes
- * an issue in the future.
- */
uv_close((uv_handle_t *)&wc->async, NULL);
- uv_run(loop, UV_RUN_DEFAULT);
-
uv_cond_destroy(&wc->cmd_cond);
int rc;
-
do {
rc = uv_loop_close(loop);
} while (rc != UV_EBUSY);
+ buffer_free(work_buffer);
freez(loop);
worker_unregister();
@@ -1272,6 +1428,9 @@ void metadata_sync_shutdown(void)
void metadata_sync_shutdown_prepare(void)
{
+ if (unlikely(!metasync_worker.loop))
+ return;
+
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -1303,8 +1462,6 @@ void metadata_sync_init(void)
{
struct metadata_wc *wc = &metasync_worker;
- fatal_assert(0 == uv_mutex_init(&metadata_async_lock));
-
memset(wc, 0, sizeof(*wc));
metadata_init_cmd_queue(wc);
completion_init(&wc->init_complete);
@@ -1364,6 +1521,20 @@ void metaqueue_host_update_info(RRDHOST *host)
queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL);
}
+void metaqueue_ml_load_models(RRDDIM *rd)
+{
+ if (unlikely(!metasync_worker.loop))
+ return;
+ queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL);
+}
+
+void metadata_queue_load_host_context(RRDHOST *host)
+{
+ if (unlikely(!metasync_worker.loop))
+ return;
+ queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL);
+}
+
//
// unitests
//
@@ -1419,7 +1590,7 @@ static void *metadata_unittest_threads(void)
unittest_queue_metadata,
&tu);
}
- uv_async_send(&metasync_worker.async);
+ (void) uv_async_send(&metasync_worker.async);
sleep_usec(seconds_to_run * USEC_PER_SEC);
__atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED);