diff options
Diffstat (limited to 'database/sqlite/sqlite_metadata.c')
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 549 |
1 files changed, 360 insertions, 189 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 35f928ff..607d789a 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -64,9 +64,11 @@ enum metadata_opcode { METADATA_STORE_CLAIM_ID, METADATA_ADD_HOST_INFO, METADATA_SCAN_HOSTS, + METADATA_LOAD_HOST_CONTEXT, METADATA_MAINTENANCE, METADATA_SYNC_SHUTDOWN, METADATA_UNITTEST, + METADATA_ML_LOAD_MODELS, // leave this last // we need it to check for worker utilization METADATA_MAX_ENUMERATIONS_DEFINED @@ -154,19 +156,43 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value return 1; } -static void check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer) +#define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;" +#define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;" + +static void clean_old_chart_labels(RRDSET *st) +{ + char sql[512]; + time_t first_time_s = rrdset_first_entry_s(st); + + if (unlikely(!first_time_s)) + snprintfz(sql, 511,SQL_DELETE_CHART_LABEL); + else + snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s); + + int rc = exec_statement_with_uuid(sql, &st->chart_uuid); + if (unlikely(rc)) + error_report("METADATA: 'host:%s' Failed to clean old labels for chart %s", rrdhost_hostname(st->rrdhost), rrdset_name(st)); +} + +static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter) { size_t old_version = st->rrdlabels_last_saved_version; size_t new_version = dictionary_version(st->rrdlabels); - if(new_version != old_version) { - buffer_flush(work_buffer); - struct query_build tmp = {.sql = work_buffer, .count = 0}; - uuid_unparse_lower(st->chart_uuid, tmp.uuid_str); - rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp); + if (new_version == old_version) + return 0; + + struct query_build tmp = {.sql = work_buffer, .count = 0}; + uuid_unparse_lower(st->chart_uuid, tmp.uuid_str); + rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp); + int rc = db_execute(db_meta, buffer_tostring(work_buffer)); + if (likely(!rc)) { st->rrdlabels_last_saved_version = new_version; - db_execute(buffer_tostring(work_buffer)); + (*query_counter)++; } + + clean_old_chart_labels(st); + return rc; } // Migrate all hosts with hops zero to this host_uuid @@ -177,12 +203,13 @@ void migrate_localhost(uuid_t *host_uuid) rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid); if (!rc) rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid); - if (!rc) - db_execute(DELETE_MISSING_NODE_INSTANCES); - + if (!rc) { + if (unlikely(db_execute(db_meta, DELETE_MISSING_NODE_INSTANCES))) + error_report("Failed to remove deleted hosts from node instances"); + } } -static void store_claim_id(uuid_t *host_id, uuid_t *claim_id) +static int store_claim_id(uuid_t *host_id, uuid_t *claim_id) { sqlite3_stmt *res = NULL; int rc; @@ -190,18 +217,18 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id) if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) error_report("Database has not been initialized"); - return; + return 1; } rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement store chart labels"); - return; + error_report("Failed to prepare statement to store host claim id"); + return 1; } rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to store node instance information"); + error_report("Failed to bind host_id parameter to store claim id"); goto failed; } @@ -210,17 +237,19 @@ static void store_claim_id(uuid_t *host_id, uuid_t *claim_id) else rc = sqlite3_bind_null(res, 2); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind claim_id parameter to store node instance information"); + error_report("Failed to bind claim_id parameter to host claim id"); goto failed; } rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store node instance information, rc = %d", rc); + error_report("Failed to store host claim id rc = %d", rc); failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when storing node instance information"); + error_report("Failed to finalize the prepared statement when storing a host claim id"); + + return rc != SQLITE_DONE; } static void delete_dimension_uuid(uuid_t *dimension_uuid) @@ -252,7 +281,7 @@ skip_execution: // // Store host and host system info information in the database -static int sql_store_host_info(RRDHOST *host) +static int store_host_metadata(RRDHOST *host) { static __thread sqlite3_stmt *res = NULL; int rc, param = 0; @@ -340,7 +369,7 @@ static int sql_store_host_info(RRDHOST *host) if (unlikely(rc != SQLITE_OK)) error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc); - return !(store_rc == SQLITE_DONE); + return store_rc != SQLITE_DONE; bind_fail: error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc); rc = sqlite3_reset(res); @@ -349,7 +378,7 @@ bind_fail: return 1; } -static void sql_store_host_system_info_key_value(const char *name, const char *value, void *data) +static void add_host_sysinfo_key_value(const char *name, const char *value, void *data) { struct query_build *lb = data; @@ -365,44 +394,43 @@ static void sql_store_host_system_info_key_value(const char *name, const char *v lb->count++; } -static BUFFER *sql_store_host_system_info(RRDHOST *host) +static bool build_host_system_info_statements(RRDHOST *host, BUFFER *work_buffer) { struct rrdhost_system_info *system_info = host->system_info; if (unlikely(!system_info)) - return NULL; - - BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + return false; + buffer_flush(work_buffer); struct query_build key_data = {.sql = work_buffer, .count = 0}; uuid_unparse_lower(host->host_uuid, key_data.uuid_str); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data); - sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data); - sql_store_host_system_info_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data); - - return work_buffer; + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data); + add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data); + add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data); + add_host_sysinfo_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data); + + return true; } @@ -410,13 +438,10 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host) * Store a chart in the database */ -static int sql_store_chart( - uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family, - const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority, - int update_every, int chart_type, int memory_mode, long history_entries) +static int store_chart_metadata(RRDSET *st) { static __thread sqlite3_stmt *res = NULL; - int rc, param = 0; + int rc, param = 0, store_rc = 0; if (unlikely(!db_meta)) { if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) @@ -433,98 +458,83 @@ static int sql_store_chart( } } - param++; - rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &st->chart_uuid, sizeof(st->chart_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &st->rrdhost->host_uuid, sizeof(st->rrdhost->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(st->parts.type), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(st->parts.id), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; + const char *name = string2str(st->parts.name); if (name && *name) - rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC); else - rc = sqlite3_bind_null(res, 5); + rc = sqlite3_bind_null(res, ++param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_family(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_context(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_title(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_units(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_plugin_name(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, rrdset_module_name(st), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 12, (int) priority); + rc = sqlite3_bind_int(res, ++param, (int) st->priority); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 13, update_every); + rc = sqlite3_bind_int(res, ++param, st->update_every); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 14, chart_type); + rc = sqlite3_bind_int(res, ++param, st->chart_type); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 15, memory_mode); + rc = sqlite3_bind_int(res, ++param, st->rrd_memory_mode); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - param++; - rc = sqlite3_bind_int(res, 16, (int) history_entries); + rc = sqlite3_bind_int(res, ++param, (int) st->entries); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store chart, rc = %d", rc); + store_rc = execute_insert(res); + if (unlikely(store_rc != SQLITE_DONE)) + error_report("Failed to store chart, rc = %d", store_rc); rc = sqlite3_reset(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to reset statement in chart store function, rc = %d", rc); - return 0; + return store_rc != SQLITE_DONE; bind_fail: error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc); @@ -537,9 +547,7 @@ bind_fail: /* * Store a dimension */ -static int sql_store_dimension( - uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier, - collected_number divisor, int algorithm, bool hidden) +static int store_dimension_metadata(RRDDIM *rd) { static __thread sqlite3_stmt *res = NULL; int rc, param = 0; @@ -559,35 +567,35 @@ static int sql_store_dimension( } } - rc = sqlite3_bind_blob(res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &rd->metric_uuid, sizeof(rd->metric_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_blob(res, ++param, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, ++param, &rd->rrdset->chart_uuid, sizeof(rd->rrdset->chart_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(rd->id), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, ++param, string2str(rd->name), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_int(res, ++param, (int) multiplier); + rc = sqlite3_bind_int(res, ++param, (int) rd->multiplier); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_int(res, ++param, (int ) divisor); + rc = sqlite3_bind_int(res, ++param, (int ) rd->divisor); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_int(res, ++param, algorithm); + rc = sqlite3_bind_int(res, ++param, rd->algorithm); if (unlikely(rc != SQLITE_OK)) goto bind_fail; - if (hidden) + if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)) rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC); else rc = sqlite3_bind_null(res, ++param); @@ -700,7 +708,6 @@ static void cleanup_health_log(void) // // EVENT LOOP STARTS HERE // -static uv_mutex_t metadata_async_lock; static void metadata_init_cmd_queue(struct metadata_wc *wc) { @@ -856,6 +863,7 @@ static void start_metadata_cleanup(uv_work_t *req) struct metadata_wc *wc = req->data; check_dimension_metadata(wc); cleanup_health_log(); + (void) sqlite3_wal_checkpoint(db_meta, NULL); worker_is_idle(); } @@ -863,9 +871,125 @@ struct scan_metadata_payload { uv_work_t request; struct metadata_wc *wc; struct completion *completion; + BUFFER *work_buffer; uint32_t max_count; }; +struct host_context_load_thread { + uv_thread_t thread; + RRDHOST *host; + bool busy; + bool finished; +}; + +static void restore_host_context(void *arg) +{ + struct host_context_load_thread *hclt = arg; + RRDHOST *host = hclt->host; + + usec_t started_ut = now_monotonic_usec(); (void)started_ut; + rrdhost_load_rrdcontext_data(host); + usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; + + rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); + +#ifdef ENABLE_ACLK + aclk_queue_node_info(host, false); +#endif + + internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host), + (double)(ended_ut - started_ut) / USEC_PER_MS); + + __atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE); +} + +// Callback after scan of hosts is done +static void after_start_host_load_context(uv_work_t *req, int status __maybe_unused) +{ + struct scan_metadata_payload *data = req->data; + freez(data); +} + +#define MAX_FIND_THREAD_RETRIES (10) + +static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait) +{ + for (size_t index = 0; index < max_thread_slots; index++) { + if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED) + || (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) { + int rc = uv_thread_join(&(hclt[index].thread)); + if (rc) + error("Failed to join thread, rc = %d",rc); + __atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE); + __atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE); + } + } +} + +static size_t find_available_thread_slot(struct host_context_load_thread *hclt, size_t max_thread_slots, size_t *found_index) +{ + size_t retries = MAX_FIND_THREAD_RETRIES; + while (retries--) { + size_t index = 0; + while (index < max_thread_slots) { + if (false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) { + *found_index = index; + return true; + } + index++; + } + sleep_usec(10 * USEC_PER_MS); + } + return false; +} + +static void start_all_host_load_context(uv_work_t *req __maybe_unused) +{ + register_libuv_worker_jobs(); + + struct scan_metadata_payload *data = req->data; + UNUSED(data); + + worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD); + usec_t started_ut = now_monotonic_usec(); (void)started_ut; + + RRDHOST *host; + + size_t max_threads = MIN(get_netdata_cpus() / 2, 6); + struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt)); + + size_t thread_index; + dfe_start_reentrant(rrdhost_root_index, host) { + if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) || + !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD)) + continue; + + rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS); + internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host)); + + cleanup_finished_threads(hclt, max_threads, false); + bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index); + + if (unlikely(!found_slot)) { + struct host_context_load_thread hclt_sync = {.host = host}; + restore_host_context(&hclt_sync); + } + else { + __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED); + hclt[thread_index].host = host; + assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index])); + } + } + dfe_done(host); + + cleanup_finished_threads(hclt, max_threads, true); + freez(hclt); + usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; + internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS); + + worker_is_idle(); +} + // Callback after scan of hosts is done static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) { @@ -881,13 +1005,15 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) freez(data); } -static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) { +static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_transaction, BUFFER *work_buffer, size_t *query_counter) { RRDSET *st; int rc; bool more_to_do = false; uint32_t scan_count = 1; - BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + + if (use_transaction) + (void)db_execute(db_meta, "BEGIN TRANSACTION;"); rrdset_foreach_reentrant(st, host) { if (scan_count == max_count) { @@ -900,27 +1026,16 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_ rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE); scan_count++; - check_and_update_chart_labels(st, work_buffer); - - rc = sql_store_chart( - &st->chart_uuid, - &st->rrdhost->host_uuid, - string2str(st->parts.type), - string2str(st->parts.id), - string2str(st->parts.name), - rrdset_family(st), - rrdset_context(st), - rrdset_title(st), - rrdset_units(st), - rrdset_plugin_name(st), - rrdset_module_name(st), - st->priority, - st->update_every, - st->chart_type, - st->rrd_memory_mode, - st->entries); + buffer_flush(work_buffer); + rc = check_and_update_chart_labels(st, work_buffer, query_counter); + if (unlikely(rc)) + error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st)); + else + (*query_counter)++; + + rc = store_chart_metadata(st); if (unlikely(rc)) - internal_error(true, "METADATA: Failed to store chart metadata %s", string2str(st->id)); + error_report("METADATA: 'host:%s': Failed to store metadata for chart %s", rrdhost_hostname(host), rrdset_name(st)); } RRDDIM *rd; @@ -935,116 +1050,145 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_ else rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); - rc = sql_store_dimension( - &rd->metric_uuid, - &rd->rrdset->chart_uuid, - string2str(rd->id), - string2str(rd->name), - rd->multiplier, - rd->divisor, - rd->algorithm, - rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)); - + rc = store_dimension_metadata(rd); if (unlikely(rc)) - error_report("METADATA: Failed to store dimension %s", string2str(rd->id)); + error_report("METADATA: 'host:%s': Failed to dimension metadata for chart %s. dimension %s", + rrdhost_hostname(host), rrdset_name(st), + rrddim_name(rd)); } } rrddim_foreach_done(rd); } rrdset_foreach_done(st); - buffer_free(work_buffer); + if (use_transaction) + (void)db_execute(db_meta, "COMMIT TRANSACTION;"); + return more_to_do; } +static void store_host_and_system_info(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter) +{ + bool free_work_buffer = (NULL == work_buffer); + + if (unlikely(free_work_buffer)) + work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + + if (build_host_system_info_statements(host, work_buffer)) { + int rc = db_execute(db_meta, buffer_tostring(work_buffer)); + if (unlikely(rc)) { + error_report("METADATA: 'host:%s': Failed to store host updated information in the database", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); + } + else { + if (likely(query_counter)) + (*query_counter)++; + } + } + + if (unlikely(store_host_metadata(host))) { + error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); + } + else { + if (likely(query_counter)) + (*query_counter)++; + } + + if (unlikely(free_work_buffer)) + buffer_free(work_buffer); +} + // Worker thread to scan hosts for pending metadata to store static void start_metadata_hosts(uv_work_t *req __maybe_unused) { register_libuv_worker_jobs(); RRDHOST *host; + int transaction_started = 0; struct scan_metadata_payload *data = req->data; struct metadata_wc *wc = data->wc; + BUFFER *work_buffer = data->work_buffer; usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut; internal_error(true, "METADATA: checking all hosts..."); + usec_t started_ut = now_monotonic_usec(); (void)started_ut; bool run_again = false; worker_is_busy(UV_EVENT_METADATA_STORE); if (!data->max_count) - db_execute("BEGIN TRANSACTION;"); + transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;"); + dfe_start_reentrant(rrdhost_root_index, host) { if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE)) continue; size_t query_counter = 0; (void)query_counter; - usec_t started_ut = now_monotonic_usec(); (void)started_ut; rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE); if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) { rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS); + int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid); - if (likely(rc == SQLITE_OK)) { - BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + if (likely(!rc)) { + query_counter++; + + buffer_flush(work_buffer); struct query_build tmp = {.sql = work_buffer, .count = 0}; uuid_unparse_lower(host->host_uuid, tmp.uuid_str); rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp); - db_execute(buffer_tostring(work_buffer)); - buffer_free(work_buffer); - query_counter++; + rc = db_execute(db_meta, buffer_tostring(work_buffer)); + + if (unlikely(rc)) { + error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); + } + else + query_counter++; + } else { + error_report("METADATA: 'host:%s': failed to delete old host labels", rrdhost_hostname(host)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); } } if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) { rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID); uuid_t uuid; - + int rc; if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid))) - store_claim_id(&host->host_uuid, &uuid); + rc = store_claim_id(&host->host_uuid, &uuid); else - store_claim_id(&host->host_uuid, NULL); - - query_counter++; - } - - if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) { - rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO); - - BUFFER *work_buffer = sql_store_host_system_info(host); - if(work_buffer) { - db_execute(buffer_tostring(work_buffer)); - buffer_free(work_buffer); - query_counter++; - } + rc = store_claim_id(&host->host_uuid, NULL); - int rc = sql_store_host_info(host); if (unlikely(rc)) - error_report("METADATA: 'host:%s': failed to store host info", string2str(host->hostname)); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE); else query_counter++; } + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) { + rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO); + store_host_and_system_info(host, work_buffer, &query_counter); + } - if (data->max_count) - db_execute("BEGIN TRANSACTION;"); - if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) { + // For clarity + bool use_transaction = data->max_count; + if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) { run_again = true; rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE); internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host)); } - if (data->max_count) - db_execute("COMMIT TRANSACTION;"); - usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms", rrdhost_hostname(host), query_counter, (double)(ended_ut - started_ut) / USEC_PER_MS); } dfe_done(host); - if (!data->max_count) - db_execute("COMMIT TRANSACTION;"); + + if (!data->max_count && transaction_started) + transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;"); usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut; internal_error(true, "METADATA: checking all hosts completed in %0.2f ms", @@ -1059,7 +1203,6 @@ static void start_metadata_hosts(uv_work_t *req __maybe_unused) static void metadata_event_loop(void *arg) { - service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register("METASYNC"); worker_register_job_name(METADATA_DATABASE_NOOP, "noop"); worker_register_job_name(METADATA_DATABASE_TIMER, "timer"); @@ -1067,6 +1210,7 @@ static void metadata_event_loop(void *arg) worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id"); worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info"); worker_register_job_name(METADATA_MAINTENANCE, "maintenance"); + worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models"); int ret; uv_loop_t *loop; @@ -1076,6 +1220,7 @@ static void metadata_event_loop(void *arg) uv_work_t metadata_cleanup_worker; uv_thread_set_name_np(wc->thread, "METASYNC"); +// service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); loop = wc->loop = mallocz(sizeof(uv_loop_t)); ret = uv_loop_init(loop); if (ret) { @@ -1112,11 +1257,12 @@ static void metadata_event_loop(void *arg) int shutdown = 0; wc->row_id = 0; completion_mark_complete(&wc->init_complete); + BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + struct scan_metadata_payload *data; while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) { uuid_t *uuid; RRDHOST *host = NULL; - int rc; worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); @@ -1145,6 +1291,11 @@ static void metadata_event_loop(void *arg) case METADATA_DATABASE_TIMER: break; + case METADATA_ML_LOAD_MODELS: { + RRDDIM *rd = (RRDDIM *) cmd.param[0]; + ml_dimension_load_models(rd); + break; + } case METADATA_DEL_DIMENSION: uuid = (uuid_t *) cmd.param[0]; if (likely(dimension_can_be_deleted(uuid))) @@ -1158,9 +1309,7 @@ static void metadata_event_loop(void *arg) break; case METADATA_ADD_HOST_INFO: host = (RRDHOST *) cmd.param[0]; - rc = sql_store_host_info(host); - if (unlikely(rc)) - error_report("Failed to store host info in the database for %s", string2str(host->hostname)); + store_host_and_system_info(host, NULL, NULL); break; case METADATA_SCAN_HOSTS: if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS))) @@ -1169,10 +1318,11 @@ static void metadata_event_loop(void *arg) if (unittest_running) break; - struct scan_metadata_payload *data = mallocz(sizeof(*data)); + data = mallocz(sizeof(*data)); data->request.data = data; data->wc = wc; data->completion = cmd.completion; // Completion by the worker + data->work_buffer = work_buffer; if (unlikely(cmd.completion)) { data->max_count = 0; // 0 will process all pending updates @@ -1192,6 +1342,19 @@ static void metadata_event_loop(void *arg) metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); } break; + case METADATA_LOAD_HOST_CONTEXT:; + if (unittest_running) + break; + + data = callocz(1,sizeof(*data)); + data->request.data = data; + data->wc = wc; + if (unlikely( + uv_queue_work(loop,&data->request, start_all_host_load_context, + after_start_host_load_context))) { + freez(data); + } + break; case METADATA_MAINTENANCE: if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP))) break; @@ -1220,21 +1383,14 @@ static void metadata_event_loop(void *arg) if (!uv_timer_stop(&wc->timer_req)) uv_close((uv_handle_t *)&wc->timer_req, NULL); - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour we need to be aware if this becomes - * an issue in the future. - */ uv_close((uv_handle_t *)&wc->async, NULL); - uv_run(loop, UV_RUN_DEFAULT); - uv_cond_destroy(&wc->cmd_cond); int rc; - do { rc = uv_loop_close(loop); } while (rc != UV_EBUSY); + buffer_free(work_buffer); freez(loop); worker_unregister(); @@ -1272,6 +1428,9 @@ void metadata_sync_shutdown(void) void metadata_sync_shutdown_prepare(void) { + if (unlikely(!metasync_worker.loop)) + return; + struct metadata_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -1303,8 +1462,6 @@ void metadata_sync_init(void) { struct metadata_wc *wc = &metasync_worker; - fatal_assert(0 == uv_mutex_init(&metadata_async_lock)); - memset(wc, 0, sizeof(*wc)); metadata_init_cmd_queue(wc); completion_init(&wc->init_complete); @@ -1364,6 +1521,20 @@ void metaqueue_host_update_info(RRDHOST *host) queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL); } +void metaqueue_ml_load_models(RRDDIM *rd) +{ + if (unlikely(!metasync_worker.loop)) + return; + queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL); +} + +void metadata_queue_load_host_context(RRDHOST *host) +{ + if (unlikely(!metasync_worker.loop)) + return; + queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL); +} + // // unitests // @@ -1419,7 +1590,7 @@ static void *metadata_unittest_threads(void) unittest_queue_metadata, &tu); } - uv_async_send(&metasync_worker.async); + (void) uv_async_send(&metasync_worker.async); sleep_usec(seconds_to_run * USEC_PER_SEC); __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED); |