diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-11-30 18:47:00 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-11-30 18:47:00 +0000 |
commit | 03bf87dcb06f7021bfb2df2fa8691593c6148aff (patch) | |
tree | e16b06711a2ed77cafb4b7754be0220c3d14a9d7 /database/sqlite | |
parent | Adding upstream version 1.36.1. (diff) | |
download | netdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.tar.xz netdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.zip |
Adding upstream version 1.37.0.upstream/1.37.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 323 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 98 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 117 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.h | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 1311 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.h | 71 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 41 | ||||
-rw-r--r-- | database/sqlite/sqlite_context.c | 92 | ||||
-rw-r--r-- | database/sqlite/sqlite_context.h | 20 | ||||
-rw-r--r-- | database/sqlite/sqlite_db_migration.c | 80 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.c | 1769 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.h | 122 | ||||
-rw-r--r-- | database/sqlite/sqlite_health.c | 217 | ||||
-rw-r--r-- | database/sqlite/sqlite_health.h | 16 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 1580 | ||||
-rw-r--r-- | database/sqlite/sqlite_metadata.h | 28 |
16 files changed, 2251 insertions, 3636 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 43b34109..7e3a9b2e 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -3,59 +3,20 @@ #include "sqlite_functions.h" #include "sqlite_aclk.h" -#include "sqlite_aclk_chart.h" #include "sqlite_aclk_node.h" -#ifdef ENABLE_ACLK -#include "../../aclk/aclk.h" -#endif - void sanity_check(void) { // make sure the compiler will stop on misconfigurations BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } const char *aclk_sync_config[] = { - "CREATE TABLE IF NOT EXISTS dimension_delete (dimension_id blob, dimension_name text, chart_type_id text, " - "dim_id blob, chart_id blob, host_id blob, date_created);", - - "CREATE INDEX IF NOT EXISTS ind_h1 ON dimension_delete (host_id);", - - "CREATE TRIGGER IF NOT EXISTS tr_dim_del AFTER DELETE ON dimension BEGIN INSERT INTO dimension_delete " - "(dimension_id, dimension_name, chart_type_id, dim_id, chart_id, host_id, date_created)" - " select old.id, old.name, c.type||\".\"||c.id, old.dim_id, old.chart_id, c.host_id, unixepoch() FROM" - " chart c WHERE c.chart_id = old.chart_id; END;", - - "DELETE FROM dimension_delete WHERE host_id NOT IN" - " (SELECT host_id FROM host) OR unixepoch() - date_created > 604800;", NULL, }; uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; -int retention_running = 0; - -#ifdef ENABLE_ACLK -static void stop_retention_run() -{ - uv_mutex_lock(&aclk_async_lock); - retention_running = 0; - uv_mutex_unlock(&aclk_async_lock); -} - -static int request_retention_run() -{ - int rc = 0; - uv_mutex_lock(&aclk_async_lock); - if (unlikely(retention_running)) - rc = 1; - else - retention_running = 1; - uv_mutex_unlock(&aclk_async_lock); - return rc; -} -#endif int claimed() { @@ -197,25 +158,6 @@ struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_confi return ret; } -int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd) -{ - if (unlikely(!node_id || !cmd)) - return 0; - - uv_mutex_lock(&aclk_async_lock); - struct aclk_database_worker_config *wc = aclk_thread_head; - - while (wc) { - if (!strcmp(wc->node_id, node_id)) - break; - wc = wc->next; - } - uv_mutex_unlock(&aclk_async_lock); - if (wc) - aclk_database_enq_cmd(wc, cmd); - return (wc == NULL); -} - struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id) { if (unlikely(!node_id)) @@ -237,15 +179,14 @@ struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id) void aclk_sync_exit_all() { rrd_rdlock(); - RRDHOST *host = localhost; - while(host) { + RRDHOST *host; + rrdhost_foreach_read(host) { struct aclk_database_worker_config *wc = host->dbsync_worker; if (wc) { wc->is_shutting_down = 1; (void) aclk_database_deq_cmd(wc); uv_cond_signal(&wc->cmd_cond); } - host = host->next; } rrd_unlock(); @@ -304,23 +245,26 @@ static int create_host_callback(void *data, int argc, char **argv, char **column , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") , argv[3] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 , argv[13] ? str2i(argv[IDX_ENTRIES]) : 0 - , RRD_MEMORY_MODE_DBENGINE + , default_rrd_memory_mode , 0 // health , 0 // rrdpush enabled , NULL //destination , NULL // api key , NULL // send charts matching + , false // rrdpush_enable_replication + , 0 // rrdpush_seconds_to_replicate + , 0 // rrdpush_replication_step , system_info , 1 ); if (likely(host)) - host->host_labels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); + host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); #ifdef NETDATA_INTERNAL_CHECKS char node_str[UUID_STR_LEN] = "<none>"; if (likely(host->node_id)) uuid_unparse_lower(*host->node_id, node_str); - internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", host->hostname, host->machine_guid, node_str); + internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str); #endif return 0; } @@ -335,7 +279,7 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str); - RRDHOST *host = rrdhost_find_by_guid(uuid_str, 0); + RRDHOST *host = rrdhost_find_by_guid(uuid_str); if (host == localhost) return 0; @@ -361,7 +305,7 @@ void sql_aclk_sync_init(void) for (int i = 0; aclk_sync_config[i]; i++) { debug(D_ACLK_SYNC, "Executing %s", aclk_sync_config[i]); - rc = sqlite3_exec(db_meta, aclk_sync_config[i], 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db_meta, aclk_sync_config[i], 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error aclk sync initialization setup, rc = %d (%s)", rc, err_msg); error_report("SQLite failed statement %s", aclk_sync_config[i]); @@ -372,18 +316,16 @@ void sql_aclk_sync_init(void) info("SQLite aclk sync initialization completed"); fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); - if (likely(rrdcontext_enabled == CONFIG_BOOLEAN_YES)) { - rc = sqlite3_exec(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, " - "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " - "program_version, entries, health_enabled FROM host WHERE hops >0;", - create_host_callback, NULL, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg); - sqlite3_free(err_msg); - } + rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, " + "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " + "program_version, entries, health_enabled FROM host WHERE hops >0;", + create_host_callback, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); } - rc = sqlite3_exec(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE " + rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE " "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg); @@ -423,30 +365,6 @@ static void timer_cb(uv_timer_t* handle) } if (aclk_connected) { - if (wc->rotation_after && wc->rotation_after < now) { - cmd.opcode = ACLK_DATABASE_UPD_RETENTION; - if (!aclk_database_enq_cmd_noblock(wc, &cmd)) - wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL; - } - - if (wc->chart_updates && !wc->chart_pending && wc->chart_payload_count) { - cmd.opcode = ACLK_DATABASE_PUSH_CHART; - cmd.count = ACLK_MAX_CHART_BATCH; - cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT; - if (!aclk_database_enq_cmd_noblock(wc, &cmd)) { - if (wc->retry_count) - info("Queued chart/dimension payload command %s, retry count = %u", wc->host_guid, wc->retry_count); - wc->chart_pending = 1; - wc->retry_count = 0; - } else { - wc->retry_count++; - if (wc->retry_count % 100 == 0) - error_report("Failed to queue chart/dimension payload command %s, retry count = %u", - wc->host_guid, - wc->retry_count); - } - } - if (wc->alert_updates && !wc->pause_alert_updates) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; cmd.count = ACLK_MAX_ALERT_UPDATES; @@ -456,52 +374,12 @@ static void timer_cb(uv_timer_t* handle) #endif } - -#ifdef ENABLE_ACLK -void after_send_retention(uv_work_t *req, int status) -{ - struct aclk_database_worker_config *wc = req->data; - (void)status; - stop_retention_run(); - wc->retention_running = 0; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DIM_DELETION; - if (aclk_database_enq_cmd_noblock(wc, &cmd)) - info("Failed to queue a dimension deletion message"); - - cmd.opcode = ACLK_DATABASE_NODE_INFO; - if (aclk_database_enq_cmd_noblock(wc, &cmd)) - info("Failed to queue a node update info message"); -} - - -static void send_retention(uv_work_t *req) -{ - struct aclk_database_worker_config *wc = req->data; - - if (unlikely(wc->is_shutting_down)) - return; - - aclk_update_retention(wc); -} -#endif - #define MAX_CMD_BATCH_SIZE (256) void aclk_database_worker(void *arg) { worker_register("ACLKSYNC"); worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); - worker_register_job_name(ACLK_DATABASE_ADD_CHART, "chart add"); - worker_register_job_name(ACLK_DATABASE_ADD_DIMENSION, "dimension add"); - worker_register_job_name(ACLK_DATABASE_PUSH_CHART, "chart push"); - worker_register_job_name(ACLK_DATABASE_PUSH_CHART_CONFIG, "chart conf push"); - worker_register_job_name(ACLK_DATABASE_RESET_CHART, "chart reset"); - worker_register_job_name(ACLK_DATABASE_CHART_ACK, "chart ack"); - worker_register_job_name(ACLK_DATABASE_UPD_RETENTION, "retention check"); - worker_register_job_name(ACLK_DATABASE_DIM_DELETION, "dimension delete"); worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan"); worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log"); worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); @@ -526,7 +404,7 @@ void aclk_database_worker(void *arg) char threadname[NETDATA_THREAD_NAME_MAX+1]; if (wc->host) - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->host->hostname); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host)); else { snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->uuid_str); threadname[11] = '\0'; @@ -556,23 +434,13 @@ void aclk_database_worker(void *arg) timer_req.data = wc; fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); -// wc->retry_count = 0; wc->node_info_send = 1; -// aclk_add_worker_thread(wc); - info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc)); + info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, (unsigned long int) sizeof(*wc)); memset(&cmd, 0, sizeof(cmd)); -#ifdef ENABLE_ACLK - uv_work_t retention_work; - sql_get_last_chart_sequence(wc); - wc->chart_payload_count = sql_get_pending_count(wc); - if (!wc->chart_payload_count) - info("%s: No pending charts and dimensions detected during startup", wc->host_guid); -#endif wc->startup_time = now_realtime_sec(); wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST; - wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY; debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); @@ -604,6 +472,13 @@ void aclk_database_worker(void *arg) // MAINTENANCE case ACLK_DATABASE_CLEANUP: debug(D_ACLK_SYNC, "Database cleanup for %s", wc->host_guid); + + if (wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST + 2 < now_realtime_sec() && claimed() && aclk_connected) { + cmd.opcode = ACLK_DATABASE_NODE_INFO; + cmd.completion = NULL; + (void) aclk_database_enq_cmd_noblock(wc, &cmd); + } + sql_maint_aclk_sync_database(wc, cmd); if (wc->host == localhost) sql_check_aclk_table_list(wc); @@ -614,33 +489,6 @@ void aclk_database_worker(void *arg) sql_delete_aclk_table_list(wc, cmd); break; -// CHART / DIMENSION OPERATIONS -#ifdef ENABLE_ACLK - case ACLK_DATABASE_ADD_CHART: - debug(D_ACLK_SYNC, "Adding chart event for %s", wc->host_guid); - aclk_add_chart_event(wc, cmd); - break; - case ACLK_DATABASE_ADD_DIMENSION: - debug(D_ACLK_SYNC, "Adding dimension event for %s", wc->host_guid); - aclk_add_dimension_event(wc, cmd); - break; - case ACLK_DATABASE_PUSH_CHART: - debug(D_ACLK_SYNC, "Pushing chart info to the cloud for node %s", wc->host_guid); - aclk_send_chart_event(wc, cmd); - break; - case ACLK_DATABASE_PUSH_CHART_CONFIG: - debug(D_ACLK_SYNC, "Pushing chart config info to the cloud for node %s", wc->host_guid); - aclk_send_chart_config(wc, cmd); - break; - case ACLK_DATABASE_CHART_ACK: - debug(D_ACLK_SYNC, "ACK chart SEQ for %s to %"PRIu64, wc->uuid_str, (uint64_t) cmd.param1); - aclk_receive_chart_ack(wc, cmd); - break; - case ACLK_DATABASE_RESET_CHART: - debug(D_ACLK_SYNC, "RESET chart SEQ for %s to %"PRIu64, wc->uuid_str, (uint64_t) cmd.param1); - aclk_receive_chart_reset(wc, cmd); - break; -#endif // ALERTS case ACLK_DATABASE_PUSH_ALERT_CONFIG: debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid); @@ -673,27 +521,6 @@ void aclk_database_worker(void *arg) sql_build_node_collectors(wc); break; #ifdef ENABLE_ACLK - case ACLK_DATABASE_DIM_DELETION: - debug(D_ACLK_SYNC,"Sending dimension deletion information %s", wc->uuid_str); - aclk_process_dimension_deletion(wc, cmd); - break; - case ACLK_DATABASE_UPD_RETENTION: - if (unlikely(wc->retention_running)) - break; - - if (unlikely(request_retention_run())) { - wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY; - break; - } - - debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str); - retention_work.data = wc; - wc->retention_running = 1; - if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) { - wc->retention_running = 0; - stop_retention_run(); - } - break; // NODE_INSTANCE DETECTION case ACLK_DATABASE_ORPHAN_HOST: @@ -705,14 +532,14 @@ void aclk_database_worker(void *arg) case ACLK_DATABASE_TIMER: if (unlikely(localhost && !wc->host && !wc->is_orphan)) { if (claimed()) { - wc->host = rrdhost_find_by_guid(wc->host_guid, 0); + wc->host = rrdhost_find_by_guid(wc->host_guid); if (wc->host) { - info("HOST %s (%s) detected as active", wc->host->hostname, wc->host_guid); - snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->host->hostname); + info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid); + snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host)); uv_thread_set_name_np(wc->thread, threadname); wc->host->dbsync_worker = wc; if (unlikely(!wc->hostname)) - wc->hostname = strdupz(wc->host->hostname); + wc->hostname = strdupz(rrdhost_hostname(wc->host)); aclk_del_worker_thread(wc); wc->node_info_send = 1; } @@ -803,30 +630,6 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); - buffer_sprintf(sql, TABLE_ACLK_CHART, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql, TABLE_ACLK_CHART_PAYLOAD, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql, TABLE_ACLK_CHART_LATEST, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql, INDEX_ACLK_CHART, uuid_str, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql, INDEX_ACLK_CHART_LATEST, uuid_str, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql, TRIGGER_ACLK_CHART_PAYLOAD, uuid_str, uuid_str, uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str); db_execute(buffer_tostring(sql)); buffer_flush(sql); @@ -844,16 +647,14 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) uuid_unparse_lower(*node_id, wc->node_id); if (likely(host)) { host->dbsync_worker = (void *)wc; - wc->hostname = strdupz(host->hostname); + wc->hostname = strdupz(rrdhost_hostname(host)); } else wc->hostname = get_hostname_by_node_id(wc->node_id); wc->host = host; strcpy(wc->uuid_str, uuid_str); strcpy(wc->host_guid, host_guid); - wc->chart_updates = 0; wc->alert_updates = 0; - wc->retry_count = 0; aclk_database_init_cmd_queue(wc); aclk_add_worker_thread(wc); fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc)); @@ -873,31 +674,9 @@ void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); - buffer_sprintf(sql,"DELETE FROM aclk_chart_%s WHERE date_submitted IS NOT NULL AND " - "CAST(date_updated AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_INTERNAL); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql,"DELETE FROM aclk_chart_payload_%s WHERE unique_id NOT IN " - "(SELECT unique_id FROM aclk_chart_%s) AND unique_id NOT IN (SELECT unique_id FROM aclk_chart_latest_%s);", - wc->uuid_str, wc->uuid_str, wc->uuid_str); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql,"UPDATE aclk_chart_%s SET status = NULL, date_submitted=unixepoch() WHERE " - "date_submitted IS NULL AND CAST(date_created AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_AUTO_MARK_SUBMIT_INTERVAL); - db_execute(buffer_tostring(sql)); - buffer_flush(sql); - - buffer_sprintf(sql,"UPDATE aclk_chart_%s SET date_updated = unixepoch() WHERE date_updated IS NULL" - " AND date_submitted IS NOT NULL AND CAST(date_submitted AS INT) < unixepoch()-%d;", - wc->uuid_str, ACLK_AUTO_MARK_UPDATED_INTERVAL); - db_execute(buffer_tostring(sql)); buffer_free(sql); return; @@ -927,7 +706,7 @@ static int is_host_available(uuid_t *host_id) error_report("Failed to bind host_id parameter to select node instance information"); goto failed; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) @@ -980,7 +759,7 @@ void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct a } buffer_flush(sql); - while (sqlite3_step(res) == SQLITE_ROW) + while (sqlite3_step_monitored(res) == SQLITE_ROW) buffer_strcat(sql, (char *) sqlite3_column_text(res, 0)); rc = sqlite3_finalize(res); @@ -1016,42 +795,10 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) { char *err_msg = NULL; debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist"); - int rc = sqlite3_exec(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg); if (rc != SQLITE_OK) { error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg); sqlite3_free(err_msg); } - db_execute("DELETE FROM dimension_delete WHERE host_id NOT IN (SELECT host_id FROM host) " - " OR unixepoch() - date_created > 604800;"); - return; -} - -void aclk_data_rotated(void) -{ -#ifdef ENABLE_ACLK - - if (!aclk_connected) - return; - - time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY; - rrd_rdlock(); - RRDHOST *this_host = localhost; - while (this_host) { - struct aclk_database_worker_config *wc = this_host->dbsync_worker; - if (wc) - wc->rotation_after = next_rotation_time; - this_host = this_host->next; - } - rrd_unlock(); - - struct aclk_database_worker_config *tmp = aclk_thread_head; - - uv_mutex_lock(&aclk_async_lock); - while (tmp) { - tmp->rotation_after = next_rotation_time; - tmp = tmp->next; - } - uv_mutex_unlock(&aclk_async_lock); -#endif return; } diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index b73f422e..06d5d027 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -5,8 +5,6 @@ #include "sqlite3.h" -// TODO: To be added -#include "../../aclk/schema-wrappers/chart_stream.h" #ifndef ACLK_MAX_CHART_BATCH #define ACLK_MAX_CHART_BATCH (200) @@ -15,15 +13,9 @@ #define ACLK_MAX_CHART_BATCH_COUNT (10) #endif #define ACLK_MAX_ALERT_UPDATES (5) -#define ACLK_DATABASE_CLEANUP_FIRST (60) -#define ACLK_DATABASE_ROTATION_DELAY (180) -#define ACLK_DATABASE_RETENTION_RETRY (60) +#define ACLK_DATABASE_CLEANUP_FIRST (1200) #define ACLK_DATABASE_CLEANUP_INTERVAL (3600) -#define ACLK_DATABASE_ROTATION_INTERVAL (3600) -#define ACLK_DELETE_ACK_INTERNAL (600) #define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400) -#define ACLK_AUTO_MARK_SUBMIT_INTERVAL (3600) -#define ACLK_AUTO_MARK_UPDATED_INTERVAL (1800) #define ACLK_SYNC_QUERY_SIZE 512 struct aclk_completion { @@ -74,57 +66,14 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) out[23] = '_'; } -static inline char *get_str_from_uuid(uuid_t *uuid) -{ - char uuid_str[GUID_LEN + 1]; - if (unlikely(!uuid)) { - uuid_t zero_uuid; - uuid_clear(zero_uuid); - uuid_unparse_lower(zero_uuid, uuid_str); - } - else - uuid_unparse_lower(*uuid, uuid_str); - return strdupz(uuid_str); -} - -#define TABLE_ACLK_CHART "CREATE TABLE IF NOT EXISTS aclk_chart_%s (sequence_id INTEGER PRIMARY KEY, " \ - "date_created, date_updated, date_submitted, status, uuid, type, unique_id, " \ - "update_count default 1, unique(uuid, status));" - -#define TABLE_ACLK_CHART_PAYLOAD "CREATE TABLE IF NOT EXISTS aclk_chart_payload_%s (unique_id BLOB PRIMARY KEY, " \ - "uuid, claim_id, type, date_created, payload);" - -#define TABLE_ACLK_CHART_LATEST "CREATE TABLE IF NOT EXISTS aclk_chart_latest_%s (uuid BLOB PRIMARY KEY, " \ - "unique_id, date_submitted);" - -#define TRIGGER_ACLK_CHART_PAYLOAD "CREATE TRIGGER IF NOT EXISTS aclk_tr_chart_payload_%s " \ - "after insert on aclk_chart_payload_%s " \ - "begin insert into aclk_chart_%s (uuid, unique_id, type, status, date_created) values " \ - " (new.uuid, new.unique_id, new.type, 'pending', unixepoch()) on conflict(uuid, status) " \ - " do update set unique_id = new.unique_id, update_count = update_count + 1; " \ - "end;" - #define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ - "alert_unique_id, date_created, date_submitted, date_cloud_ack, " \ + "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ "unique(alert_unique_id));" -#define INDEX_ACLK_CHART "CREATE INDEX IF NOT EXISTS aclk_chart_index_%s ON aclk_chart_%s (unique_id);" - -#define INDEX_ACLK_CHART_LATEST "CREATE INDEX IF NOT EXISTS aclk_chart_latest_index_%s ON aclk_chart_latest_%s (unique_id);" - #define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);" - enum aclk_database_opcode { ACLK_DATABASE_NOOP = 0, - ACLK_DATABASE_ADD_CHART, - ACLK_DATABASE_ADD_DIMENSION, - ACLK_DATABASE_PUSH_CHART, - ACLK_DATABASE_PUSH_CHART_CONFIG, - ACLK_DATABASE_RESET_CHART, - ACLK_DATABASE_CHART_ACK, - ACLK_DATABASE_UPD_RETENTION, - ACLK_DATABASE_DIM_DELETION, ACLK_DATABASE_ORPHAN_HOST, ACLK_DATABASE_ALARM_HEALTH_LOG, ACLK_DATABASE_CLEANUP, @@ -142,20 +91,11 @@ enum aclk_database_opcode { ACLK_MAX_ENUMERATIONS_DEFINED }; -struct aclk_chart_payload_t { - long sequence_id; - long last_sequence_id; - char *payload; - struct aclk_chart_payload_t *next; -}; - - struct aclk_database_cmd { enum aclk_database_opcode opcode; void *data; void *data_param; int count; - uint64_t param1; struct aclk_completion *completion; }; @@ -172,12 +112,8 @@ struct aclk_database_worker_config { char node_id[GUID_LEN + 1]; char host_guid[GUID_LEN + 1]; char *hostname; // hostname to avoid constant lookups - uint64_t chart_sequence_id; // last chart_sequence_id - time_t chart_timestamp; // last chart timestamp time_t cleanup_after; // Start a cleanup after this timestamp time_t startup_time; // When the sync thread started - time_t rotation_after; - uint64_t batch_id; // batch id to use uint64_t alerts_batch_id; // batch id for alerts to use uint64_t alerts_start_seq_id; // cloud has asked to start streaming from uint64_t alert_sequence_id; // last alert sequence_id @@ -193,15 +129,9 @@ struct aclk_database_worker_config { uv_cond_t cmd_cond; volatile unsigned queue_size; struct aclk_database_cmdqueue cmd_queue; - uint32_t retry_count; - int chart_updates; int alert_updates; - time_t batch_created; int node_info_send; time_t node_collectors_send; - int chart_pending; - int chart_reset_count; - int retention_running; volatile unsigned is_shutting_down; volatile unsigned is_orphan; struct aclk_database_worker_config *next; @@ -216,23 +146,25 @@ static inline RRDHOST *find_host_by_node_id(char *node_id) if (uuid_parse(node_id, node_uuid)) return NULL; - RRDHOST *host = localhost; - while(host) { - if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) - return host; - host = host->next; + rrd_rdlock(); + RRDHOST *host, *ret = NULL; + rrdhost_foreach_read(host) { + if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) { + ret = host; + break; + } } - return NULL; + rrd_unlock(); + + return ret; } extern sqlite3 *db_meta; -extern int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); -extern void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); -extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); -int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd); -void aclk_data_rotated(void); +int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); +void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); +void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); void sql_aclk_sync_init(void); void sql_check_aclk_table_list(struct aclk_database_worker_config *wc); void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index ea1cc9fe..47663a8d 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -24,7 +24,7 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after return 0; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { when = (time_t) sqlite3_column_int64(res, 0); } @@ -36,7 +36,14 @@ time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after return when; } -#define MAX_REMOVED_PERIOD 900 +void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) { + char sql[ACLK_SYNC_QUERY_SIZE]; + snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u", uuid_str, ae->unique_id, unique_id); + sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL); + ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; +} + +#define MAX_REMOVED_PERIOD 86400 //decide if some events should be sent or not int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) { @@ -56,12 +63,13 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) uuid_t config_hash_id; RRDCALC_STATUS status; uint32_t unique_id; - + //get the previous sent event of this alarm_id + //base the search on the last filtered event snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \ - where hl.unique_id = aa.alert_unique_id \ - and hl.alarm_id = %u and hl.unique_id <> %u \ - order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id); + where hl.unique_id = aa.filtered_alert_unique_id \ + and hl.alarm_id = %u \ + order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id); rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -70,7 +78,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) return send; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); if (sqlite3_column_type(res, 1) != SQLITE_NULL) @@ -93,8 +101,9 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) } //same status, same config - if (ae->new_status == RRDCALC_STATUS_CLEAR) { + if (ae->new_status == RRDCALC_STATUS_CLEAR || ae->new_status == RRDCALC_STATUS_UNDEFINED) { send = 0; + update_filtered(ae, unique_id, uuid_str); goto done; } @@ -107,6 +116,7 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) goto done; } else { send = 0; + update_filtered(ae, unique_id, uuid_str); goto done; } } @@ -130,6 +140,8 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) return 0; } + CHECK_SQLITE_CONNECTION(db_meta); + if (!skip_filter) { if (!should_send_to_cloud(host, ae)) { return 0; @@ -137,9 +149,6 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) } int rc = 0; - - CHECK_SQLITE_CONNECTION(db_meta); - sqlite3_stmt *res_alert = NULL; char uuid_str[GUID_LEN + 1]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); @@ -148,8 +157,8 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) buffer_sprintf( sql, - "INSERT INTO aclk_alert_%s (alert_unique_id, date_created) " - "VALUES (@alert_unique_id, unixepoch()) on conflict (alert_unique_id) do nothing; ", + "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " + "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) on conflict (alert_unique_id) do nothing; ", uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0); @@ -220,7 +229,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d int rc; if (unlikely(!wc->alert_updates)) { - log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); return; } @@ -280,7 +289,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d static __thread uint64_t log_first_sequence_id = 0; static __thread uint64_t log_last_sequence_id = 0; - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { struct alarm_log_entry alarm_log; char old_value_string[100 + 1]; char new_value_string[100 + 1]; @@ -300,9 +309,9 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.config_hash = strdupz((char *)uuid_str); alarm_log.utc_offset = wc->host->utc_offset; - alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone); + alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host)); alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : - strdupz((char *)wc->host->health_default_exec); + strdupz((char *)string2str(wc->host->health_default_exec)); alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16)); char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? @@ -374,7 +383,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + wc->host ? rrdhost_hostname(wc->host) : "N/A", log_first_sequence_id, log_last_sequence_id, wc->alerts_batch_id); @@ -401,8 +410,8 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host) BUFFER *sql = buffer_create(1024); buffer_sprintf(sql,"delete from aclk_alert_%s; " \ - "insert into aclk_alert_%s (alert_unique_id, date_created) " \ - "select unique_id alert_unique_id, unixepoch() from health_log_%s " \ + "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \ "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str); @@ -424,9 +433,7 @@ void aclk_send_alarm_health_log(char *node_id) struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); if (likely(!wc)) { - rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); - rrd_unlock(); if (likely(host)) wc = (struct aclk_database_worker_config *)host->dbsync_worker; } @@ -460,9 +467,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a RRDHOST *host = wc->host; if (unlikely(!host)) { - rrd_rdlock(); host = find_host_by_node_id(wc->node_id); - rrd_unlock(); if (unlikely(!host)) { log_access( @@ -500,7 +505,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a last_timestamp.tv_sec = 0; last_timestamp.tv_usec = 0; - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; if (sqlite3_column_bytes(res, 1) > 0) { first_timestamp.tv_sec = sqlite3_column_int64(res, 1); @@ -536,8 +541,6 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a freez(claim_id); buffer_free(sql); - - aclk_alert_reloaded = 1; #endif return; @@ -554,7 +557,7 @@ void aclk_send_alarm_configuration(char *config_hash) return; } - log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -603,7 +606,7 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct struct provide_alarm_configuration p_alarm_config; p_alarm_config.cfg_hash = NULL; - if (sqlite3_step(res) == SQLITE_ROW) { + if (sqlite3_step_monitored(res) == SQLITE_ROW) { alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL; alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL; @@ -664,14 +667,14 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct } if (likely(p_alarm_config.cfg_hash)) { - log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); freez((char *) cmd.data_param); freez(p_alarm_config.cfg_hash); destroy_aclk_alarm_configuration(&alarm_config); } else - log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash); bind_fail: rc = sqlite3_finalize(res); @@ -697,9 +700,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start return; struct aclk_database_worker_config *wc = NULL; - rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); - rrd_unlock(); if (likely(host)) { wc = (struct aclk_database_worker_config *)host->dbsync_worker ? (struct aclk_database_worker_config *)host->dbsync_worker : @@ -716,7 +717,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); if (likely(wc)) { - log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id); + log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id); __sync_synchronize(); wc->alerts_batch_id = batch_id; wc->alerts_start_seq_id = start_seq_id; @@ -736,15 +737,15 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config BUFFER *sql = buffer_create(1024); - buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \ - "select unique_id alert_unique_id, unixepoch() from health_log_%s " \ + buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ + "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \ "where new_status = -2 and updated_by_id = 0 and unique_id not in " \ "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \ "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str); db_execute(buffer_tostring(sql)); - log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); buffer_free(sql); @@ -780,17 +781,15 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn return; struct aclk_database_worker_config *wc = NULL; - rrd_rdlock(); RRDHOST *host = find_host_by_node_id(node_id); if (likely(host)) wc = (struct aclk_database_worker_config *)host->dbsync_worker; - rrd_unlock(); if (likely(wc)) { log_access( "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + wc->host ? rrdhost_hostname(wc->host) : "N/A", snapshot_id, sequence_id); if (wc->alerts_snapshot_id == snapshot_id) @@ -831,13 +830,13 @@ void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id) #ifdef ENABLE_ACLK void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host) { - char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0=UNKNOWN"); + char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN"); char config_hash_id[GUID_LEN + 1]; uuid_unparse_lower(ae->config_hash_id, config_hash_id); - alarm_log->chart = strdupz((char *)ae->chart); - alarm_log->name = strdupz((char *)ae->name); - alarm_log->family = strdupz((char *)ae->family); + alarm_log->chart = strdupz(ae_chart_name(ae)); + alarm_log->name = strdupz(ae_name(ae)); + alarm_log->family = strdupz(ae_family(ae)); alarm_log->batch_id = 0; alarm_log->sequence_id = 0; @@ -846,9 +845,9 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->config_hash = strdupz((char *)config_hash_id); alarm_log->utc_offset = host->utc_offset; - alarm_log->timezone = strdupz((char *)host->abbrev_timezone); - alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec); - alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)""); + alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host)); + alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health_default_exec)); + alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)""); alarm_log->command = strdupz((char *)edit_command); @@ -861,31 +860,31 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN alarm_log->last_repeat = (time_t)ae->last_repeat; alarm_log->silenced = - ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ? + ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ? 1 : 0; - alarm_log->value_string = strdupz(ae->new_value_string); - alarm_log->old_value_string = strdupz(ae->old_value_string); + alarm_log->value_string = strdupz(ae_new_value_string(ae)); + alarm_log->old_value_string = strdupz(ae_old_value_string(ae)); alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0; alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0; alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; - alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)""); - alarm_log->chart_context = ae->chart_context ? strdupz(ae->chart_context) : strdupz((char *)""); + alarm_log->rendered_info = strdupz(ae_info(ae)); + alarm_log->chart_context = strdupz(ae_chart_context(ae)); freez(edit_command); } #endif #ifdef ENABLE_ACLK -static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark) +static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark) { ALARM_ENTRY *ae = host->health_log.alarms; while (ae) { - if (ae->alarm_id == alarm_id && ae->unique_id > mark && + if (ae->alarm_id == alarm_id && ae->unique_id >mark && (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL)) return 1; ae = ae->next; @@ -905,7 +904,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru UNUSED(cmd); // we perhaps we don't need this for snapshots if (unlikely(!wc->alert_updates)) { - log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); return; } @@ -921,7 +920,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru if (unlikely(!claim_id)) return; - log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id); + log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->alerts_snapshot_id); aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id); @@ -1025,11 +1024,11 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host) BUFFER *sql = buffer_create(1024); - buffer_sprintf(sql,"delete from aclk_alert_%s where alert_unique_id not in " + buffer_sprintf(sql,"delete from aclk_alert_%s where filtered_alert_unique_id not in " " (select unique_id from health_log_%s); ", uuid_str, uuid_str); char *err_msg = NULL; - int rc = sqlite3_exec(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg); if (rc != SQLITE_OK) { error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""", uuid_str, err_msg); @@ -1064,7 +1063,7 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert return 1; } - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h index 0181b484..88a939e8 100644 --- a/database/sqlite/sqlite_aclk_alert.h +++ b/database/sqlite/sqlite_aclk_alert.h @@ -26,6 +26,6 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id); int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status); -extern int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter); +int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter); #endif //NETDATA_SQLITE_ACLK_ALERT_H diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c deleted file mode 100644 index c1db60c4..00000000 --- a/database/sqlite/sqlite_aclk_chart.c +++ /dev/null @@ -1,1311 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "sqlite_functions.h" -#include "sqlite_aclk_chart.h" - -#ifdef ENABLE_ACLK -#include "../../aclk/aclk_charts_api.h" -#include "../../aclk/aclk.h" - -static inline int -sql_queue_chart_payload(struct aclk_database_worker_config *wc, void *data, enum aclk_database_opcode opcode) -{ - int rc; - if (unlikely(!wc)) - return 1; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = opcode; - cmd.data = data; - rc = aclk_database_enq_cmd_noblock(wc, &cmd); - return rc; -} - -static time_t payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - time_t send_status = 0; - - if (unlikely(!res)) { - char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT acl.date_submitted FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " - "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", - uuid_str, uuid_str); - rc = prepare_statement(db_meta, sql, &res); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to check payload data on %s", sql); - return 0; - } - } - - rc = sqlite3_bind_blob(res, 1, uuid, sizeof(*uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_blob(res, 2, payload, payload_size, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - while (sqlite3_step(res) == SQLITE_ROW) { - send_status = (time_t) sqlite3_column_int64(res, 0); - } - -bind_fail: - if (unlikely(sqlite3_reset(res) != SQLITE_OK)) - error_report("Failed to reset statement in check payload, rc = %d", rc); - return send_status; -} - -static int aclk_add_chart_payload( - struct aclk_database_worker_config *wc, - uuid_t *uuid, - char *claim_id, - ACLK_PAYLOAD_TYPE payload_type, - void *payload, - size_t payload_size, - time_t *send_status, - int check_sent) -{ - static __thread sqlite3_stmt *res_chart = NULL; - int rc; - time_t date_submitted; - - if (unlikely(!payload)) - return 0; - - if (check_sent) { - date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size); - if (send_status) - *send_status = date_submitted; - if (date_submitted) - return 0; - } - - if (unlikely(!res_chart)) { - char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, - "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ - "VALUES (@unique_id, @uuid, @claim_id, unixepoch(), @type, @payload);", wc->uuid_str); - rc = prepare_statement(db_meta, sql, &res_chart); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to store chart payload data"); - return 1; - } - } - - uuid_t unique_uuid; - uuid_generate(unique_uuid); - - uuid_t claim_uuid; - if (uuid_parse(claim_id, claim_uuid)) - return 1; - - rc = sqlite3_bind_blob(res_chart, 1, &unique_uuid, sizeof(unique_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_blob(res_chart, 2, uuid, sizeof(*uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_blob(res_chart, 3, &claim_uuid, sizeof(claim_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res_chart, 4, payload_type); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_blob(res_chart, 5, payload, payload_size, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = execute_insert(res_chart); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed store chart payload event, rc = %d", rc); - else { - wc->chart_payload_count++; - time_t now = now_realtime_sec(); - if (wc->rotation_after > now && wc->rotation_after < now + ACLK_DATABASE_ROTATION_DELAY) - wc->rotation_after = now + ACLK_DATABASE_ROTATION_DELAY; - } - -bind_fail: - if (unlikely(sqlite3_reset(res_chart) != SQLITE_OK)) - error_report("Failed to reset statement in store chart payload, rc = %d", rc); - return (rc != SQLITE_DONE); -} - -int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - int rc = 0; - CHECK_SQLITE_CONNECTION(db_meta); - - char *claim_id = get_agent_claimid(); - - RRDSET *st = cmd.data; - - if (likely(claim_id)) { - struct chart_instance_updated chart_payload; - memset(&chart_payload, 0, sizeof(chart_payload)); - chart_payload.config_hash = get_str_from_uuid(&st->state->hash_id); - chart_payload.update_every = st->update_every; - chart_payload.memory_mode = st->rrd_memory_mode; - chart_payload.name = (char *)st->name; - chart_payload.node_id = wc->node_id; - chart_payload.claim_id = claim_id; - chart_payload.id = strdupz(st->id); - - chart_payload.chart_labels = rrdlabels_create(); - rrdlabels_copy(chart_payload.chart_labels, st->state->chart_labels); - - size_t size; - char *payload = generate_chart_instance_updated(&size, &chart_payload); - if (likely(payload)) - rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1); - freez(payload); - chart_instance_updated_destroy(&chart_payload); - } - return rc; -} - -static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid, - const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time, - time_t *send_status) -{ - int rc = 0; - size_t size; - - if (unlikely(!dim_uuid || !dim_id || !dim_name || !chart_type_id)) - return 0; - - struct chart_dimension_updated dim_payload; - memset(&dim_payload, 0, sizeof(dim_payload)); - -#ifdef NETDATA_INTERNAL_CHECKS - if (!first_time) - info("Host %s (node %s) deleting dimension id=[%s] name=[%s] chart=[%s]", - wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id); - if (last_time) - info("Host %s (node %s) stopped collecting dimension id=[%s] name=[%s] chart=[%s] %ld seconds ago at %ld", - wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id, now_realtime_sec() - last_time, last_time); -#endif - - dim_payload.node_id = wc->node_id; - dim_payload.claim_id = claim_id; - dim_payload.name = dim_name; - dim_payload.id = dim_id; - dim_payload.chart_id = chart_type_id; - dim_payload.created_at.tv_sec = first_time; - dim_payload.last_timestamp.tv_sec = last_time; - char *payload = generate_chart_dimension_updated(&size, &dim_payload); - if (likely(payload)) - rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1); - freez(payload); - return rc; -} - -void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - int rc = 0; - sqlite3_stmt *res = NULL; - - if (!aclk_connected) - return; - - if (unlikely(!db_meta)) - return; - - uuid_t host_id; - if (uuid_parse(wc->host_guid, host_id)) - return; - - char *claim_id = get_agent_claimid(); - if (!claim_id) - return; - - rc = sqlite3_prepare_v2( - db_meta, - "DELETE FROM dimension_delete where host_id = @host_id " - "RETURNING dimension_id, dimension_name, chart_type_id, dim_id LIMIT 10;", - -1, - &res, - 0); - - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement when trying to delete dimension deletes"); - freez(claim_id); - return; - } - - rc = sqlite3_bind_blob(res, 1, &host_id, sizeof(host_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - unsigned count = 0; - while (sqlite3_step(res) == SQLITE_ROW) { - (void) aclk_upd_dimension_event( - wc, - claim_id, - (uuid_t *)sqlite3_column_text(res, 3), - (const char *)sqlite3_column_text(res, 0), - (const char *)sqlite3_column_text(res, 1), - (const char *)sqlite3_column_text(res, 2), - 0, - 0, - NULL); - count++; - } - - if (count) { - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_DIM_DELETION; - if (aclk_database_enq_cmd_noblock(wc, &cmd)) - info("Failed to queue a dimension deletion message"); - } - -bind_fail: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement when adding dimension deletion events, rc = %d", rc); - freez(claim_id); - return; -} - -int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - int rc = 1; - CHECK_SQLITE_CONNECTION(db_meta); - - struct aclk_chart_dimension_data *aclk_cd_data = cmd.data; - - char *claim_id = get_agent_claimid(); - if (!claim_id) - goto cleanup; - - rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION, - (void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, aclk_cd_data->check_payload); - - freez(claim_id); -cleanup: - freez(aclk_cd_data->payload); - freez(aclk_cd_data); - return rc; -} - -void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - int rc; - - wc->chart_pending = 0; - if (unlikely(!wc->chart_updates)) { - log_access( - "ACLK STA [%s (%s)]: Ignoring chart push event, updates have been turned off for this node.", - wc->node_id, - wc->host ? wc->host->hostname : "N/A"); - return; - } - - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) - return; - - uuid_t claim_uuid; - if (uuid_parse(claim_id, claim_uuid)) - return; - - int limit = cmd.count > 0 ? cmd.count : 1; - - uint64_t first_sequence; - uint64_t last_sequence; - time_t last_timestamp = 0; - - char sql[ACLK_SYNC_QUERY_SIZE]; - static __thread sqlite3_stmt *res = NULL; - - if (unlikely(!res)) { - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \ - "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \ - "WHERE ac.date_submitted IS NULL AND ac.unique_id = acp.unique_id AND ac.update_count > 0 " \ - "AND acp.claim_id = @claim_id ORDER BY ac.sequence_id ASC LIMIT %d;", wc->uuid_str, wc->uuid_str, limit); - rc = prepare_statement(db_meta, sql, &res); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement when trying to send a chart update via ACLK"); - freez(claim_id); - return; - } - } - - rc = sqlite3_bind_blob(res, 1, claim_uuid, sizeof(claim_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - char **payload_list = callocz(limit + 1, sizeof(char *)); - size_t *payload_list_size = callocz(limit + 1, sizeof(size_t)); - size_t *payload_list_max_size = callocz(limit + 1, sizeof(size_t)); - struct aclk_message_position *position_list = callocz(limit + 1, sizeof(*position_list)); - int *is_dim = callocz(limit + 1, sizeof(*is_dim)); - - int loop = cmd.param1; - - uint64_t start_sequence_id = wc->chart_sequence_id; - - while (loop > 0) { - uint64_t previous_sequence_id = wc->chart_sequence_id; - int count = 0; - first_sequence = 0; - last_sequence = 0; - while (count < limit && sqlite3_step(res) == SQLITE_ROW) { - size_t payload_size = sqlite3_column_bytes(res, 1); - if (payload_list_max_size[count] < payload_size) { - freez(payload_list[count]); - payload_list_max_size[count] = payload_size; - payload_list[count] = mallocz(payload_size); - } - payload_list_size[count] = payload_size; - memcpy(payload_list[count], sqlite3_column_blob(res, 1), payload_size); - position_list[count].sequence_id = (uint64_t)sqlite3_column_int64(res, 0); - position_list[count].previous_sequence_id = previous_sequence_id; - position_list[count].seq_id_creation_time.tv_sec = sqlite3_column_int64(res, 2); - position_list[count].seq_id_creation_time.tv_usec = 0; - if (!first_sequence) - first_sequence = position_list[count].sequence_id; - last_sequence = position_list[count].sequence_id; - last_timestamp = position_list[count].seq_id_creation_time.tv_sec; - previous_sequence_id = last_sequence; - is_dim[count] = sqlite3_column_int(res, 3) > 0; - count++; - if (wc->chart_payload_count) - wc->chart_payload_count--; - } - freez(payload_list[count]); - payload_list_max_size[count] = 0; - payload_list[count] = NULL; - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when pushing chart events, rc = %d", rc); - - if (likely(first_sequence)) { - - db_lock(); - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_chart_%s SET status = NULL, date_submitted=unixepoch() " - "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", - wc->uuid_str, first_sequence, last_sequence); - db_execute(sql); - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "INSERT OR REPLACE INTO aclk_chart_latest_%s (uuid, unique_id, date_submitted) " - " SELECT uuid, unique_id, date_submitted FROM aclk_chart_%s s " - " WHERE date_submitted IS NOT NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 - " ;", - wc->uuid_str, wc->uuid_str, first_sequence, last_sequence); - db_execute(sql); - db_unlock(); - - aclk_chart_inst_and_dim_update(payload_list, payload_list_size, is_dim, position_list, wc->batch_id); - log_access( - "ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, - wc->node_id, - wc->hostname ? wc->hostname : "N/A", - first_sequence, - last_sequence, - wc->batch_id); - wc->chart_sequence_id = last_sequence; - wc->chart_timestamp = last_timestamp; - } else - break; - --loop; - } - - if (start_sequence_id != wc->chart_sequence_id) { - time_t now = now_realtime_sec(); - if (wc->rotation_after > now && wc->rotation_after < now + ACLK_DATABASE_ROTATION_DELAY) - wc->rotation_after = now + ACLK_DATABASE_ROTATION_DELAY; - } else { - wc->chart_payload_count = sql_get_pending_count(wc); - if (!wc->chart_payload_count) - log_access( - "ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", - wc->node_id, - wc->hostname ? wc->hostname : "N/A", - now_realtime_sec() - wc->startup_time); - } - - for (int i = 0; i <= limit; ++i) - freez(payload_list[i]); - - freez(payload_list); - freez(payload_list_size); - freez(payload_list_max_size); - freez(position_list); - freez(is_dim); - -bind_fail: - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when pushing chart events, rc = %d", rc); - - freez(claim_id); - return; -} - -// Push one chart config to the cloud -int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - UNUSED(wc); - - CHECK_SQLITE_CONNECTION(db_meta); - - sqlite3_stmt *res = NULL; - int rc = 0; - - char *hash_id = (char *) cmd.data_param; - - uuid_t hash_uuid; - rc = uuid_parse(hash_id, hash_uuid); - - if (unlikely(rc)) { - freez((char *) cmd.data_param); - return 1; - } - - BUFFER *sql = buffer_create(1024); - buffer_sprintf(sql, "SELECT type, family, context, title, priority, plugin, module, unit, chart_type " \ - "FROM chart_hash WHERE hash_id = @hash_id;"); - - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement when trying to fetch a chart hash configuration"); - goto fail; - } - - rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - struct chart_config_updated chart_config; - chart_config.config_hash = NULL; - - while (sqlite3_step(res) == SQLITE_ROW) { - chart_config.type = strdupz((char *)sqlite3_column_text(res, 0)); - chart_config.family = strdupz((char *)sqlite3_column_text(res, 1)); - chart_config.context = strdupz((char *)sqlite3_column_text(res, 2)); - chart_config.title = strdupz((char *)sqlite3_column_text(res, 3)); - chart_config.priority = sqlite3_column_int64(res, 4); - chart_config.plugin = strdupz((char *)sqlite3_column_text(res, 5)); - chart_config.module = sqlite3_column_bytes(res, 6) > 0 ? strdupz((char *)sqlite3_column_text(res, 6)) : NULL; - chart_config.chart_type = (RRDSET_TYPE) sqlite3_column_int(res,8); - chart_config.units = strdupz((char *)sqlite3_column_text(res, 7)); - chart_config.config_hash = strdupz(hash_id); - } - - if (likely(chart_config.config_hash)) { - log_access( - "ACLK REQ [%s (%s)]: Sending chart config for %s.", - wc->node_id, - wc->host ? wc->host->hostname : "N/A", - hash_id); - aclk_chart_config_updated(&chart_config, 1); - destroy_chart_config_updated(&chart_config); - } else - log_access( - "ACLK STA [%s (%s)]: Chart config for %s not found.", - wc->node_id, - wc->host ? wc->host->hostname : "N/A", - hash_id); - -bind_fail: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when pushing chart config hash, rc = %d", rc); -fail: - freez((char *)cmd.data_param); - buffer_free(sql); - return rc; -} - -void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - int rc; - sqlite3_stmt *res = NULL; - - char sql[ACLK_SYNC_QUERY_SIZE]; - - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"UPDATE aclk_chart_%s SET date_updated=unixepoch() WHERE sequence_id <= @sequence_id " - "AND date_submitted IS NOT NULL AND date_updated IS NULL;", wc->uuid_str); - - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to ack chart sequence ids"); - return; - } - - rc = sqlite3_bind_int64(res, 1, (uint64_t) cmd.param1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = execute_insert(res); - if (rc != SQLITE_DONE) - error_report("Failed to ACK sequence id, rc = %d", rc); - else - log_access( - "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED IN THE DATABASE UP TO %" PRIu64, - wc->node_id, - wc->host ? wc->host->hostname : "N/A", - cmd.param1); - -bind_fail: - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize statement to ACK older sequence ids, rc = %d", rc); - return; -} - -void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) -{ - BUFFER *sql = buffer_create(1024); - buffer_sprintf( - sql, - "UPDATE aclk_chart_%s SET status = NULL, date_submitted = NULL WHERE sequence_id >= %" PRIu64 ";", - wc->uuid_str, - cmd.param1); - db_execute(buffer_tostring(sql)); - if (cmd.param1 == 1) { - buffer_flush(sql); - log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->hostname ? wc->hostname: "N/A"); - buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \ - "DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str); - db_lock(); - - db_execute("BEGIN TRANSACTION;"); - db_execute(buffer_tostring(sql)); - db_execute("COMMIT TRANSACTION;"); - - db_unlock(); - wc->chart_sequence_id = 0; - wc->chart_timestamp = 0; - wc->chart_payload_count = 0; - - RRDHOST *host = wc->host; - if (likely(host)) { - rrdhost_rdlock(host); - RRDSET *st; - rrdset_foreach_read(st, host) - { - rrdset_rdlock(st); - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); - RRDDIM *rd; - rrddim_foreach_read(rd, st) - { - rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); - rd->aclk_live_status = (rd->aclk_live_status == 0); - } - rrdset_unlock(st); - } - rrdhost_unlock(host); - } else - error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid); - } else { - log_access( - "ACLK STA [%s (%s)]: RESTARTING CHART SYNC FROM SEQUENCE %" PRIu64, - wc->node_id, - wc->hostname ? wc->hostname : "N/A", - cmd.param1); - wc->chart_payload_count = sql_get_pending_count(wc); - sql_get_last_chart_sequence(wc); - } - buffer_free(sql); - wc->chart_updates = 1; - return; -} - -// -// Functions called directly from ACLK threads and will queue commands -// -void aclk_get_chart_config(char **hash_id) -{ - struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)localhost->dbsync_worker; - - if (unlikely(!wc || !hash_id)) - return; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_PUSH_CHART_CONFIG; - for (int i = 0; hash_id[i]; ++i) { - // TODO: Verify that we have a valid hash_id - log_access( - "ACLK REQ [%s (%s)]: Request %d for chart config with hash %s received.", - wc->node_id, - wc->host ? wc->host->hostname : "N/A", - i, - hash_id[i]); - cmd.data_param = (void *)strdupz(hash_id[i]); - aclk_database_enq_cmd(wc, &cmd); - } - return; -} - -// Send a command to a node_id -// Need to discover the thread that will handle the request -// if thread not in active hosts, then try to find in the queue -static void aclk_submit_param_command(char *node_id, enum aclk_database_opcode aclk_command, uint64_t param) -{ - if (unlikely(!node_id)) - return; - - struct aclk_database_worker_config *wc = NULL; - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = aclk_command; - cmd.param1 = param; - - rrd_rdlock(); - RRDHOST *host = find_host_by_node_id(node_id); - if (likely(host)) - wc = (struct aclk_database_worker_config *)host->dbsync_worker; - rrd_unlock(); - if (wc) - aclk_database_enq_cmd(wc, &cmd); - else { - if (aclk_worker_enq_cmd(node_id, &cmd)) - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); - } - return; -} - -void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id) -{ - if (unlikely(!node_id)) - return; - - char *hostname = get_hostname_by_node_id(node_id); - log_access("ACLK REQ [%s (%s)]: CHARTS ACKNOWLEDGED upto %" PRIu64, node_id, hostname ? hostname : "N/A", - last_sequence_id); - freez(hostname); - aclk_submit_param_command(node_id, ACLK_DATABASE_CHART_ACK, last_sequence_id); - return; -} - -// Start streaming charts / dimensions for node_id -void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at, uint64_t batch_id) -{ - UNUSED(created_at); - if (unlikely(!node_id)) - return; - - uuid_t node_uuid; - if (uuid_parse(node_id, node_uuid)) { - log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM ignored, invalid node id", node_id); - return; - } - - struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); - rrd_rdlock(); - RRDHOST *host = localhost; - while(host) { - if (wc || (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))) { - rrd_unlock(); - if (!wc) - wc = (struct aclk_database_worker_config *)host->dbsync_worker ? - (struct aclk_database_worker_config *)host->dbsync_worker : - (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); - if (likely(wc)) { - wc->chart_reset_count++; - __sync_synchronize(); - wc->chart_updates = 0; - wc->batch_id = batch_id; - __sync_synchronize(); - wc->batch_created = now_realtime_sec(); - log_access( - "ACLK REQ [%s (%s)]: CHARTS STREAM from %"PRIu64" (LOCAL %"PRIu64") t=%ld resets=%d" , - wc->node_id, - wc->hostname ? wc->hostname : "N/A", - sequence_id + 1, - wc->chart_sequence_id, - wc->chart_timestamp, - wc->chart_reset_count); - if (sequence_id > wc->chart_sequence_id || wc->chart_reset_count > 10) { - log_access( - "ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST " - "remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ", - wc->node_id, - wc->hostname ? wc->hostname : "N/A", - sequence_id, - wc->chart_sequence_id, - wc->chart_reset_count); - - chart_reset_t chart_reset; - chart_reset.claim_id = get_agent_claimid(); - if (chart_reset.claim_id) { - chart_reset.node_id = node_id; - chart_reset.reason = SEQ_ID_NOT_EXISTS; - aclk_chart_reset(chart_reset); - freez(chart_reset.claim_id); - wc->chart_reset_count = -1; - } - } else { - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - // TODO: handle timestamp - if (sequence_id < wc->chart_sequence_id || - !sequence_id) { // || created_at != wc->chart_timestamp) { - log_access( - "ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64, - wc->node_id, - wc->hostname ? wc->hostname : "N/A", - sequence_id + 1, - wc->chart_timestamp, - wc->batch_id); - cmd.opcode = ACLK_DATABASE_RESET_CHART; - cmd.param1 = sequence_id + 1; - cmd.completion = NULL; - aclk_database_enq_cmd(wc, &cmd); - } else { - wc->chart_reset_count = 0; - wc->chart_updates = 1; - } - } - } else { - log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, wc->hostname ? wc->hostname : "N/A"); - } - return; - } - host = host->next; - } - rrd_unlock(); - return; -} - -#define SQL_SELECT_HOST_MEMORY_MODE "SELECT memory_mode FROM chart WHERE host_id = @host_id LIMIT 1;" - -static RRD_MEMORY_MODE sql_get_host_memory_mode(uuid_t *host_id) -{ - int rc; - - RRD_MEMORY_MODE memory_mode = RRD_MEMORY_MODE_RAM; - sqlite3_stmt *res = NULL; - - rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_MEMORY_MODE, -1, &res, 0); - - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to read host memory mode"); - return memory_mode; - } - - rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host parameter to fetch host memory mode"); - goto failed; - } - - while (sqlite3_step(res) == SQLITE_ROW) { - memory_mode = (RRD_MEMORY_MODE)sqlite3_column_int(res, 0); - } - -failed: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when reading host memory mode"); - return memory_mode; -} - -#define SELECT_HOST_DIMENSION_LIST \ - "SELECT d.dim_id, c.update_every, c.type||'.'||c.id, d.id, d.name FROM chart c, dimension d " \ - "WHERE d.chart_id = c.chart_id AND c.host_id = @host_id ORDER BY c.update_every ASC;" - -#define SELECT_HOST_CHART_LIST \ - "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \ - "WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;" - -void aclk_update_retention(struct aclk_database_worker_config *wc) -{ - int rc; - - if (!aclk_connected) - return; - - if (wc->host && rrdhost_flag_check(wc->host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS)) { - internal_error(true, "Skipping aclk_update_retention for host %s because context streaming is enabled", wc->host->hostname); - return; - } - - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) - return; - - sqlite3_stmt *res = NULL; - RRD_MEMORY_MODE memory_mode; - - uuid_t host_uuid; - rc = uuid_parse(wc->host_guid, host_uuid); - if (unlikely(rc)) { - freez(claim_id); - return; - } - - if (wc->host) - memory_mode = wc->host->rrd_memory_mode; - else - memory_mode = sql_get_host_memory_mode(&host_uuid); - - if (memory_mode == RRD_MEMORY_MODE_DBENGINE) - rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_DIMENSION_LIST, -1, &res, 0); - else - rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_CHART_LIST, -1, &res, 0); - - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch host dimensions"); - freez(claim_id); - return; - } - - rc = sqlite3_bind_blob(res, 1, &host_uuid, sizeof(host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host parameter to fetch host dimensions"); - goto failed; - } - - time_t start_time = LONG_MAX; - time_t first_entry_t; - time_t last_entry_t; - uint32_t update_every = 0; - uint32_t dimension_update_count = 0; - uint32_t total_checked = 0; - uint32_t total_deleted= 0; - uint32_t total_stopped= 0; - time_t send_status; - - struct retention_updated rotate_data; - - memset(&rotate_data, 0, sizeof(rotate_data)); - - int max_intervals = 32; - - rotate_data.interval_duration_count = 0; - rotate_data.interval_durations = callocz(max_intervals, sizeof(*rotate_data.interval_durations)); - - now_realtime_timeval(&rotate_data.rotation_timestamp); - rotate_data.memory_mode = memory_mode; - rotate_data.claim_id = claim_id; - rotate_data.node_id = strdupz(wc->node_id); - - time_t now = now_realtime_sec(); - while (sqlite3_step(res) == SQLITE_ROW && dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP) { - if (unlikely(netdata_exit)) - break; - if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) { - if (update_every) { - debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); - if (start_time == LONG_MAX) - rotate_data.interval_durations[rotate_data.interval_duration_count].retention = 0; - else - rotate_data.interval_durations[rotate_data.interval_duration_count].retention = - rotate_data.rotation_timestamp.tv_sec - start_time; - rotate_data.interval_duration_count++; - } - update_every = (uint32_t)sqlite3_column_int(res, 1); - rotate_data.interval_durations[rotate_data.interval_duration_count].update_every = update_every; - start_time = LONG_MAX; - } -#ifdef ENABLE_DBENGINE - if (memory_mode == RRD_MEMORY_MODE_DBENGINE) - rc = - rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t, 0); - else -#endif - { - if (wc->host) { - RRDSET *st = NULL; - rc = (st = rrdset_find(wc->host, (const char *)sqlite3_column_text(res, 2))) ? 0 : 1; - if (!rc) { - first_entry_t = rrdset_first_entry_t(st); - last_entry_t = rrdset_last_entry_t(st); - } - } else { - rc = 0; - first_entry_t = rotate_data.rotation_timestamp.tv_sec; - } - } - - if (likely(!rc && first_entry_t)) - start_time = MIN(start_time, first_entry_t); - - if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { - int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every)); - if (rc) { - first_entry_t = 0; - last_entry_t = 0; - live = 0; - } - if (!wc->host || !first_entry_t) { - if (!first_entry_t) { - delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0)); - total_deleted++; - dimension_update_count++; - } - else { - (void)aclk_upd_dimension_event( - wc, - claim_id, - (uuid_t *)sqlite3_column_blob(res, 0), - (const char *)(const char *)sqlite3_column_text(res, 3), - (const char *)(const char *)sqlite3_column_text(res, 4), - (const char *)(const char *)sqlite3_column_text(res, 2), - first_entry_t, - live ? 0 : last_entry_t, - &send_status); - - if (!send_status) { - if (last_entry_t) - total_stopped++; - dimension_update_count++; - } - } - } - } - total_checked++; - } - if (update_every) { - debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); - if (start_time == LONG_MAX) - rotate_data.interval_durations[rotate_data.interval_duration_count].retention = 0; - else - rotate_data.interval_durations[rotate_data.interval_duration_count].retention = - rotate_data.rotation_timestamp.tv_sec - start_time; - rotate_data.interval_duration_count++; - } - - if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) - log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", - wc->node_id, wc->hostname ? wc->hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); - else - log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", - wc->node_id, wc->hostname ? wc->hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); - -#ifdef NETDATA_INTERNAL_CHECKS - info("Retention update for %s (chart updates = %d)", wc->host_guid, wc->chart_updates); - for (int i = 0; i < rotate_data.interval_duration_count; ++i) - info( - "Update for host %s (node %s) for %u Retention = %u", - wc->host_guid, - wc->node_id, - rotate_data.interval_durations[i].update_every, - rotate_data.interval_durations[i].retention); -#endif - if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) - aclk_retention_updated(&rotate_data); - freez(rotate_data.node_id); - freez(rotate_data.interval_durations); - -failed: - freez(claim_id); - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when reading host dimensions"); - return; -} - -uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc) -{ - char sql[ACLK_SYNC_QUERY_SIZE]; - static __thread sqlite3_stmt *res = NULL; - - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT count(1) FROM aclk_chart_%s ac WHERE ac.date_submitted IS NULL;", wc->uuid_str); - - int rc; - uint32_t chart_payload_count = 0; - if (unlikely(!res)) { - rc = prepare_statement(db_meta, sql, &res); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to count pending messages"); - return 0; - } - } - while (sqlite3_step(res) == SQLITE_ROW) - chart_payload_count = (uint32_t) sqlite3_column_int(res, 0); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when fetching pending messages, rc = %d", rc); - - return chart_payload_count; -} - -void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc) -{ - char sql[ACLK_SYNC_QUERY_SIZE]; - - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT ac.sequence_id, ac.date_created FROM aclk_chart_%s ac " \ - "WHERE ac.date_submitted IS NOT NULL ORDER BY ac.sequence_id DESC LIMIT 1;", wc->uuid_str); - - int rc; - sqlite3_stmt *res = NULL; - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to find last chart sequence id"); - return; - } - - wc->chart_sequence_id = 0; - wc->chart_timestamp = 0; - while (sqlite3_step(res) == SQLITE_ROW) { - wc->chart_sequence_id = (uint64_t)sqlite3_column_int64(res, 0); - wc->chart_timestamp = (time_t)sqlite3_column_int64(res, 1); - } - - debug(D_ACLK_SYNC, "Node %s reports last sequence_id=%" PRIu64, wc->node_id, wc->chart_sequence_id); - - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when fetching chart sequence info, rc = %d", rc); - - return; -} - -void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) -{ - RRDHOST *host = rd->rrdset->rrdhost; - if (likely(rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS))) - return; - - int live = !last_updated; - - if (likely(rd->aclk_live_status == live)) - return; - - time_t created_at = rd->tiers[0]->query_ops.oldest_time(rd->tiers[0]->db_metric_handle); - - if (unlikely(!created_at && rd->updated)) - created_at = rd->last_collected_time.tv_sec; - - rd->aclk_live_status = live; - - struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker; - if (unlikely(!wc)) - return; - - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) - return; - - struct chart_dimension_updated dim_payload; - memset(&dim_payload, 0, sizeof(dim_payload)); - dim_payload.node_id = wc->node_id; - dim_payload.claim_id = claim_id; - dim_payload.name = rd->name; - dim_payload.id = rd->id; - dim_payload.chart_id = rd->rrdset->id; - dim_payload.created_at.tv_sec = created_at; - dim_payload.last_timestamp.tv_sec = last_updated; - - size_t size = 0; - char *payload = generate_chart_dimension_updated(&size, &dim_payload); - - freez(claim_id); - if (unlikely(!payload)) - return; - - struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data)); - uuid_copy(aclk_cd_data->uuid, rd->metric_uuid); - aclk_cd_data->payload = payload; - aclk_cd_data->payload_size = size; - aclk_cd_data->check_payload = 1; - - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - - cmd.opcode = ACLK_DATABASE_ADD_DIMENSION; - cmd.data = aclk_cd_data; - int rc = aclk_database_enq_cmd_noblock(wc, &cmd); - - if (unlikely(rc)) { - freez(aclk_cd_data->payload); - freez(aclk_cd_data); - rd->aclk_live_status = !live; - } - return; -} - -void aclk_send_dimension_update(RRDDIM *rd) -{ - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) - return; - - time_t first_entry_t = rrddim_first_entry_t(rd); - time_t last_entry_t = rrddim_last_entry_t(rd); - - time_t now = now_realtime_sec(); - int live = ((now - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); - - if (!live || rd->aclk_live_status != live || !first_entry_t) { - (void)aclk_upd_dimension_event( - rd->rrdset->rrdhost->dbsync_worker, - claim_id, - &rd->metric_uuid, - rd->id, - rd->name, - rd->rrdset->id, - first_entry_t, - live ? 0 : last_entry_t, - NULL); - - if (!first_entry_t) - debug( - D_ACLK_SYNC, - "%s: Update dimension chart=%s dim=%s live=%d (%ld, %ld)", - rd->rrdset->rrdhost->hostname, - rd->rrdset->name, - rd->name, - live, - first_entry_t, - last_entry_t); - else - debug( - D_ACLK_SYNC, - "%s: Update dimension chart=%s dim=%s live=%d (%ld, %ld) collected %ld seconds ago", - rd->rrdset->rrdhost->hostname, - rd->rrdset->name, - rd->name, - live, - first_entry_t, - last_entry_t, - now - last_entry_t); - rd->aclk_live_status = live; - } - - freez(claim_id); - return; -} - -#define SQL_SEQ_NULL(result, n) sqlite3_column_type(result, n) == SQLITE_NULL ? 0 : sqlite3_column_int64(result, n) - -struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host) -{ - struct aclk_chart_sync_stats *aclk_statistics = NULL; - - struct aclk_database_worker_config *wc = NULL; - wc = (struct aclk_database_worker_config *)host->dbsync_worker; - if (!wc) - return NULL; - - aclk_statistics = callocz(1, sizeof(struct aclk_chart_sync_stats)); - - aclk_statistics->updates = wc->chart_updates; - aclk_statistics->batch_id = wc->batch_id; - - char host_uuid_fixed[GUID_LEN + 1]; - - strncpy(host_uuid_fixed, host->machine_guid, GUID_LEN); - host_uuid_fixed[GUID_LEN] = 0; - - host_uuid_fixed[8] = '_'; - host_uuid_fixed[13] = '_'; - host_uuid_fixed[18] = '_'; - host_uuid_fixed[23] = '_'; - - sqlite3_stmt *res = NULL; - BUFFER *sql = buffer_create(1024); - buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s;", host_uuid_fixed); - buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_submitted IS NULL;", host_uuid_fixed); - buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_submitted IS NOT NULL;", host_uuid_fixed); - buffer_sprintf(sql, "SELECT min(sequence_id), max(sequence_id), 0 FROM aclk_chart_%s WHERE date_updated IS NOT NULL;", host_uuid_fixed); - buffer_sprintf(sql, "SELECT max(date_created), max(date_submitted), max(date_updated), 0 FROM aclk_chart_%s;", host_uuid_fixed); - - int rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); - if (rc != SQLITE_OK) { - buffer_free(sql); - freez(aclk_statistics); - return NULL; - } - - rc = sqlite3_step(res); - if (rc == SQLITE_ROW) { - aclk_statistics->min_seqid = SQL_SEQ_NULL(res, 0); - aclk_statistics->max_seqid = SQL_SEQ_NULL(res, 1); - } - - rc = sqlite3_step(res); - if (rc == SQLITE_ROW) { - aclk_statistics->min_seqid_pend = SQL_SEQ_NULL(res, 0); - aclk_statistics->max_seqid_pend = SQL_SEQ_NULL(res, 1); - } - - rc = sqlite3_step(res); - if (rc == SQLITE_ROW) { - aclk_statistics->min_seqid_sent = SQL_SEQ_NULL(res, 0); - aclk_statistics->max_seqid_sent = SQL_SEQ_NULL(res, 1); - } - - rc = sqlite3_step(res); - if (rc == SQLITE_ROW) { - aclk_statistics->min_seqid_ack = SQL_SEQ_NULL(res, 0); - aclk_statistics->max_seqid_ack = SQL_SEQ_NULL(res, 1); - } - - rc = sqlite3_step(res); - if (rc == SQLITE_ROW) { - aclk_statistics->min_seqid_ack = SQL_SEQ_NULL(res, 0); - aclk_statistics->max_seqid_ack = SQL_SEQ_NULL(res, 1); - } - - rc = sqlite3_step(res); - if (rc == SQLITE_ROW) { - aclk_statistics->max_date_created = (time_t) SQL_SEQ_NULL(res, 0); - aclk_statistics->max_date_submitted = (time_t) SQL_SEQ_NULL(res, 1); - aclk_statistics->max_date_ack = (time_t) SQL_SEQ_NULL(res, 2); - } - - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement when fetching aclk sync statistics, rc = %d", rc); - - buffer_free(sql); - return aclk_statistics; -} - -void sql_check_chart_liveness(RRDSET *st) { - RRDDIM *rd; - - if (unlikely(st->state->is_ar_chart)) - return; - - rrdset_rdlock(st); - - if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - rrdset_unlock(st); - return; - } - - if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) { - debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name); - rrdset_flag_set(st, RRDSET_FLAG_ACLK); - } - } - else - debug(D_ACLK_SYNC,"Check chart liveness [%s] chart definition already submitted", st->name); - time_t mark = now_realtime_sec(); - - debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name); - rrddim_foreach_read(rd, st) { - if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) - queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); - } - rrdset_unlock(st); -} - -// ST is read locked -int queue_chart_to_aclk(RRDSET *st) -{ - RRDHOST *host = st->rrdhost; - - if (likely(rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS))) - return 0; - - return sql_queue_chart_payload((struct aclk_database_worker_config *) st->rrdhost->dbsync_worker, - st, ACLK_DATABASE_ADD_CHART); -} - -#endif //ENABLE_ACLK diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h deleted file mode 100644 index 84325bf6..00000000 --- a/database/sqlite/sqlite_aclk_chart.h +++ /dev/null @@ -1,71 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_SQLITE_ACLK_CHART_H -#define NETDATA_SQLITE_ACLK_CHART_H - - -typedef enum payload_type { - ACLK_PAYLOAD_CHART, - ACLK_PAYLOAD_DIMENSION, - ACLK_PAYLOAD_DIMENSION_ROTATED -} ACLK_PAYLOAD_TYPE; - -extern sqlite3 *db_meta; - -#ifndef RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER -#define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3) -#endif - -#ifndef RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER -#define RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER (30) -#endif - -#ifndef ACLK_MAX_DIMENSION_CLEANUP -#define ACLK_MAX_DIMENSION_CLEANUP (500) -#endif - -struct aclk_chart_dimension_data { - uuid_t uuid; - char *payload; - size_t payload_size; - uint8_t check_payload; -}; - -struct aclk_chart_sync_stats { - int updates; - uint64_t batch_id; - uint64_t min_seqid; - uint64_t max_seqid; - uint64_t min_seqid_pend; - uint64_t max_seqid_pend; - uint64_t min_seqid_sent; - uint64_t max_seqid_sent; - uint64_t min_seqid_ack; - uint64_t max_seqid_ack; - time_t max_date_created; - time_t max_date_submitted; - time_t max_date_ack; -}; - -extern int queue_chart_to_aclk(RRDSET *st); -extern void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated); -extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); -int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id); -void aclk_get_chart_config(char **hash_id_list); -void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_start_streaming(char *node_id, uint64_t seq_id, time_t created_at, uint64_t batch_id); -void sql_chart_deduplicate(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void sql_check_rotation_state(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc); -void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc); -void aclk_send_dimension_update(RRDDIM *rd); -struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host); -void sql_check_chart_liveness(RRDSET *st); -void aclk_update_retention(struct aclk_database_worker_config *wc); -#endif //NETDATA_SQLITE_ACLK_CHART_H diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c index 3d11f83a..afe77499 100644 --- a/database/sqlite/sqlite_aclk_node.c +++ b/database/sqlite/sqlite_aclk_node.c @@ -3,27 +3,25 @@ #include "sqlite_functions.h" #include "sqlite_aclk_node.h" -#ifdef ENABLE_ACLK -#include "../../aclk/aclk_charts_api.h" -#endif +#include "../../aclk/aclk_contexts_api.h" +#include "../../aclk/aclk_capas.h" #ifdef ENABLE_ACLK DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) { RRDSET *st; char name[500]; - rrdhost_rdlock(host); rrdset_foreach_read(st, host) { if (rrdset_is_available_for_viewers(st)) { struct collector_info col = { - .plugin = st->plugin_name ? st->plugin_name : "", - .module = st->module_name ? st->module_name : "" + .plugin = rrdset_plugin_name(st), + .module = rrdset_module_name(st) }; snprintfz(name, 499, "%s:%s", col.plugin, col.module); dictionary_set(dict, name, &col, sizeof(struct collector_info)); } } - rrdhost_unlock(host); + rrdset_foreach_done(st); return dict; } @@ -36,7 +34,7 @@ void sql_build_node_collectors(struct aclk_database_worker_config *wc) return; struct update_node_collectors upd_node_collectors; - DICTIONARY *dict = dictionary_create(DICTIONARY_FLAG_SINGLE_THREADED); + DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED); upd_node_collectors.node_id = wc->node_id; upd_node_collectors.claim_id = get_agent_claimid(); @@ -47,7 +45,7 @@ void sql_build_node_collectors(struct aclk_database_worker_config *wc) dictionary_destroy(dict); freez(upd_node_collectors.claim_id); - log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, wc->host->hostname); + log_access("ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(wc->host)); #else UNUSED(wc); #endif @@ -74,14 +72,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.ml_info.ml_capable = ml_capable(localhost); node_info.ml_info.ml_enabled = ml_enabled(wc->host); - struct capability instance_caps[] = { - { .name = "proto", .version = 1, .enabled = 1 }, - { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(wc->host) }, - { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, - { .name = "ctx", .version = 1, .enabled = rrdcontext_enabled}, - { .name = NULL, .version = 0, .enabled = 0 } - }; - node_info.node_instance_capabilities = instance_caps; + node_info.node_instance_capabilities = aclk_get_node_instance_capas(wc->host); now_realtime_timeval(&node_info.updated_at); @@ -89,13 +80,12 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat char *host_version = NULL; if (host != localhost) { netdata_mutex_lock(&host->receiver_lock); - host_version = - strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : "unknown"); + host_version = strdupz(host->receiver && host->receiver->program_version ? host->receiver->program_version : "unknown"); netdata_mutex_unlock(&host->receiver_lock); } - node_info.data.name = host->hostname; - node_info.data.os = (char *) host->os; + node_info.data.name = rrdhost_hostname(host); + node_info.data.os = rrdhost_os(host); node_info.data.os_name = host->system_info->host_os_name; node_info.data.os_version = host->system_info->host_os_version; node_info.data.kernel_name = host->system_info->kernel_name; @@ -106,8 +96,8 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.memory = host->system_info->host_ram_total ? host->system_info->host_ram_total : "0"; node_info.data.disk_space = host->system_info->host_disk_space ? host->system_info->host_disk_space : "0"; node_info.data.version = host_version ? host_version : VERSION; - node_info.data.release_channel = (char *) get_release_channel(); - node_info.data.timezone = (char *) host->abbrev_timezone; + node_info.data.release_channel = get_release_channel(); + node_info.data.timezone = rrdhost_abbrev_timezone(host); node_info.data.virtualization_type = host->system_info->virtualization ? host->system_info->virtualization : "unknown"; node_info.data.container_type = host->system_info->container ? host->system_info->container : "unknown"; node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", ""); @@ -123,13 +113,14 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.ml_info.ml_capable = host->system_info->ml_capable; node_info.data.ml_info.ml_enabled = host->system_info->ml_enabled; - node_info.data.host_labels_ptr = host->host_labels; + node_info.data.host_labels_ptr = host->rrdlabels; aclk_update_node_info(&node_info); - log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, wc->host->hostname, wc->host_guid, wc->host == localhost ? "parent" : "child"); + log_access("ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)", wc->node_id, rrdhost_hostname(wc->host), wc->host_guid, wc->host == localhost ? "parent" : "child"); rrd_unlock(); freez(node_info.claim_id); + freez(node_info.node_instance_capabilities); freez(host_version); wc->node_collectors_send = now_realtime_sec(); diff --git a/database/sqlite/sqlite_context.c b/database/sqlite/sqlite_context.c index 901ab003..9c7a61c6 100644 --- a/database/sqlite/sqlite_context.c +++ b/database/sqlite/sqlite_context.c @@ -21,7 +21,6 @@ const char *database_context_cleanup[] = { }; sqlite3 *db_context_meta = NULL; - /* * Initialize the SQLite database * Return 0 on success @@ -125,22 +124,24 @@ void sql_close_context_database(void) // Fetching data // #define CTX_GET_CHART_LIST "SELECT c.chart_id, c.type||'.'||c.id, c.name, c.context, c.title, c.unit, c.priority, " \ - "c.update_every, c.chart_type, c.family FROM meta.chart c WHERE c.host_id = @host_id; " + "c.update_every, c.chart_type, c.family FROM meta.chart c WHERE c.host_id = @host_id and c.chart_id is not null; " void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, void *), void *data) { int rc; - sqlite3_stmt *res = NULL; + static __thread sqlite3_stmt *res = NULL; if (unlikely(!host_uuid)) { internal_error(true, "Requesting context chart list without host_id"); return; } - rc = sqlite3_prepare_v2(db_context_meta, CTX_GET_CHART_LIST, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch chart list"); - return; + if (unlikely(!res)) { + rc = prepare_statement(db_context_meta, CTX_GET_CHART_LIST, &res); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to fetch chart list"); + return; + } } rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); @@ -150,7 +151,7 @@ void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, voi } SQL_CHART_DATA chart_data = { 0 }; - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { uuid_copy(chart_data.chart_id, *((uuid_t *)sqlite3_column_blob(res, 0))); chart_data.id = (char *) sqlite3_column_text(res, 1); chart_data.name = (char *) sqlite3_column_text(res, 2); @@ -165,22 +166,25 @@ void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, voi } skip_load: - rc = sqlite3_finalize(res); + rc = sqlite3_reset(res); if (rc != SQLITE_OK) - error_report("Failed to finalize statement that fetches chart label data, rc = %d", rc); + error_report("Failed to reset statement that fetches chart label data, rc = %d", rc); } // Dimension list -#define CTX_GET_DIMENSION_LIST "SELECT d.dim_id, d.id, d.name FROM meta.dimension d WHERE d.chart_id = @id;" +#define CTX_GET_DIMENSION_LIST "SELECT d.dim_id, d.id, d.name, CASE WHEN INSTR(d.options,\"hidden\") > 0 THEN 1 ELSE 0 END " \ + "FROM meta.dimension d WHERE d.chart_id = @id and d.dim_id is not null ORDER BY d.rowid ASC;" void ctx_get_dimension_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_DIMENSION_DATA *, void *), void *data) { int rc; - sqlite3_stmt *res = NULL; - - rc = sqlite3_prepare_v2(db_context_meta, CTX_GET_DIMENSION_LIST, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch chart dimension data"); - return; + static __thread sqlite3_stmt *res = NULL; + + if (unlikely(!res)) { + rc = prepare_statement(db_context_meta, CTX_GET_DIMENSION_LIST, &res); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to fetch chart dimension data"); + return; + } } rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); @@ -191,17 +195,18 @@ void ctx_get_dimension_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_DIMENSION_DA SQL_DIMENSION_DATA dimension_data; - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { uuid_copy(dimension_data.dim_id, *((uuid_t *)sqlite3_column_blob(res, 0))); dimension_data.id = (char *) sqlite3_column_text(res, 1); dimension_data.name = (char *) sqlite3_column_text(res, 2); + dimension_data.hidden = sqlite3_column_int(res, 3); dict_cb(&dimension_data, data); } failed: - rc = sqlite3_finalize(res); + rc = sqlite3_reset(res); if (rc != SQLITE_OK) - error_report("Failed to finalize statement that fetches the chart dimension list, rc = %d", rc); + error_report("Failed to reset statement that fetches the chart dimension list, rc = %d", rc); } // LABEL LIST @@ -209,12 +214,14 @@ failed: void ctx_get_label_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_CLABEL_DATA *, void *), void *data) { int rc; - sqlite3_stmt *res = NULL; - - rc = sqlite3_prepare_v2(db_context_meta, CTX_GET_LABEL_LIST, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch chart lanbels"); - return; + static __thread sqlite3_stmt *res = NULL; + + if (unlikely(!res)) { + rc = prepare_statement(db_context_meta, CTX_GET_LABEL_LIST, &res); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to fetch chart labels"); + return; + } } rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); @@ -225,7 +232,7 @@ void ctx_get_label_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_CLABEL_DATA *, v SQL_CLABEL_DATA label_data; - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { label_data.label_key = (char *) sqlite3_column_text(res, 0); label_data.label_value = (char *) sqlite3_column_text(res, 1); label_data.label_source = sqlite3_column_int(res, 2); @@ -233,9 +240,9 @@ void ctx_get_label_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_CLABEL_DATA *, v } failed: - rc = sqlite3_finalize(res); + rc = sqlite3_reset(res); if (rc != SQLITE_OK) - error_report("Failed to finalize statement that fetches chart label data, rc = %d", rc); + error_report("Failed to reset statement that fetches chart label data, rc = %d", rc); return; } @@ -250,12 +257,14 @@ void ctx_get_context_list(uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEXT_D return; int rc; - sqlite3_stmt *res = NULL; - - rc = sqlite3_prepare_v2(db_context_meta, CTX_GET_CONTEXT_LIST, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch stored context list"); - return; + static __thread sqlite3_stmt *res = NULL; + + if (unlikely(!res)) { + rc = prepare_statement(db_context_meta, CTX_GET_CONTEXT_LIST, &res); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to fetch stored context list"); + return; + } } VERSIONED_CONTEXT_DATA context_data = {0}; @@ -267,7 +276,7 @@ void ctx_get_context_list(uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEXT_D goto failed; } - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { context_data.id = (char *) sqlite3_column_text(res, 0); context_data.version = sqlite3_column_int64(res, 1); context_data.title = (char *) sqlite3_column_text(res, 2); @@ -282,9 +291,9 @@ void ctx_get_context_list(uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEXT_D } failed: - rc = sqlite3_finalize(res); + rc = sqlite3_reset(res); if (rc != SQLITE_OK) - error_report("Failed to finalize statement that fetches stored context versioned data, rc = %d", rc); + error_report("Failed to reset statement that fetches stored context versioned data, rc = %d", rc); } @@ -437,6 +446,13 @@ skip_delete: return (rc_stored != SQLITE_DONE); } +int sql_context_cache_stats(int op) +{ + int count, dummy; + sqlite3_db_status(db_context_meta, op, &count, &dummy, 0); + return count; +} + // // TESTING FUNCTIONS // diff --git a/database/sqlite/sqlite_context.h b/database/sqlite/sqlite_context.h index 12937fff..2e52b9bf 100644 --- a/database/sqlite/sqlite_context.h +++ b/database/sqlite/sqlite_context.h @@ -6,6 +6,7 @@ #include "daemon/common.h" #include "sqlite3.h" +int sql_context_cache_stats(int op); typedef struct ctx_chart { uuid_t chart_id; const char *id; @@ -23,6 +24,7 @@ typedef struct ctx_dimension { uuid_t dim_id; char *id; char *name; + bool hidden; } SQL_DIMENSION_DATA; typedef struct ctx_label { @@ -50,19 +52,19 @@ typedef struct versioned_context_data { } VERSIONED_CONTEXT_DATA; -extern void ctx_get_context_list(uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEXT_DATA *, void *), void *data); +void ctx_get_context_list(uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEXT_DATA *, void *), void *data); -extern void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, void *), void *data); -extern void ctx_get_label_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_CLABEL_DATA *, void *), void *data); -extern void ctx_get_dimension_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_DIMENSION_DATA *, void *), void *data); +void ctx_get_chart_list(uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, void *), void *data); +void ctx_get_label_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_CLABEL_DATA *, void *), void *data); +void ctx_get_dimension_list(uuid_t *chart_uuid, void (*dict_cb)(SQL_DIMENSION_DATA *, void *), void *data); -extern int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data); +int ctx_store_context(uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data); #define ctx_update_context(host_uuid, context_data) ctx_store_context(host_uuid, context_data) -extern int ctx_delete_context(uuid_t *host_id, VERSIONED_CONTEXT_DATA *context_data); +int ctx_delete_context(uuid_t *host_id, VERSIONED_CONTEXT_DATA *context_data); -extern int sql_init_context_database(int memory); -extern void sql_close_context_database(void); -extern int ctx_unittest(void); +int sql_init_context_database(int memory); +void sql_close_context_database(void); +int ctx_unittest(void); #endif //NETDATA_SQLITE_CONTEXT_H diff --git a/database/sqlite/sqlite_db_migration.c b/database/sqlite/sqlite_db_migration.c index bd474336..8b1d0159 100644 --- a/database/sqlite/sqlite_db_migration.c +++ b/database/sqlite/sqlite_db_migration.c @@ -21,7 +21,7 @@ static int table_exists_in_database(const char *table) snprintf(sql, 127, "select 1 from sqlite_schema where type = 'table' and name = '%s';", table); - int rc = sqlite3_exec(db_meta, sql, return_int_cb, (void *) &exists, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, sql, return_int_cb, (void *) &exists, &err_msg); if (rc != SQLITE_OK) { info("Error checking table existence; %s", err_msg); sqlite3_free(err_msg); @@ -39,7 +39,7 @@ static int column_exists_in_table(const char *table, const char *column) snprintf(sql, 127, "SELECT 1 FROM pragma_table_info('%s') where name = '%s';", table, column); - int rc = sqlite3_exec(db_meta, sql, return_int_cb, (void *) &exists, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, sql, return_int_cb, (void *) &exists, &err_msg); if (rc != SQLITE_OK) { info("Error checking column existence; %s", err_msg); sqlite3_free(err_msg); @@ -64,6 +64,22 @@ const char *database_migrate_v2_v3[] = { NULL }; +const char *database_migrate_v4_v5[] = { + "DROP TABLE IF EXISTS chart_active;", + "DROP TABLE IF EXISTS dimension_active;", + "DROP TABLE IF EXISTS chart_hash;", + "DROP TABLE IF EXISTS chart_hash_map;", + "DROP VIEW IF EXISTS v_chart_hash;", + NULL +}; + +const char *database_migrate_v5_v6[] = { + "DROP TRIGGER IF EXISTS tr_dim_del;", + "DROP TABLE IF EXISTS dimension_delete;", + NULL +}; + + static int do_migration_v1_v2(sqlite3 *database, const char *name) { UNUSED(name); @@ -100,11 +116,11 @@ static int do_migration_v3_v4(sqlite3 *database, const char *name) return 1; } - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { char *table = strdupz((char *) sqlite3_column_text(res, 0)); if (!column_exists_in_table(table, "chart_context")) { snprintfz(sql, 255, "ALTER TABLE %s ADD chart_context text", table); - sqlite3_exec(database, sql, 0, 0, NULL); + sqlite3_exec_monitored(database, sql, 0, 0, NULL); } freez(table); } @@ -116,6 +132,57 @@ static int do_migration_v3_v4(sqlite3 *database, const char *name) return 0; } +static int do_migration_v4_v5(sqlite3 *database, const char *name) +{ + UNUSED(name); + info("Running \"%s\" database migration", name); + + return init_database_batch(database, DB_CHECK_NONE, 0, &database_migrate_v4_v5[0]); +} + +static int do_migration_v5_v6(sqlite3 *database, const char *name) +{ + UNUSED(name); + info("Running \"%s\" database migration", name); + + return init_database_batch(database, DB_CHECK_NONE, 0, &database_migrate_v5_v6[0]); +} + +static int do_migration_v6_v7(sqlite3 *database, const char *name) +{ + UNUSED(name); + info("Running \"%s\" database migration", name); + + char sql[256]; + + int rc; + sqlite3_stmt *res = NULL; + snprintfz(sql, 255, "SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'aclk_alert_%%';"); + rc = sqlite3_prepare_v2(database, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to alter aclk_alert tables"); + return 1; + } + + while (sqlite3_step_monitored(res) == SQLITE_ROW) { + char *table = strdupz((char *) sqlite3_column_text(res, 0)); + if (!column_exists_in_table(table, "filtered_alert_unique_id")) { + snprintfz(sql, 255, "ALTER TABLE %s ADD filtered_alert_unique_id", table); + sqlite3_exec_monitored(database, sql, 0, 0, NULL); + snprintfz(sql, 255, "UPDATE %s SET filtered_alert_unique_id = alert_unique_id", table); + sqlite3_exec_monitored(database, sql, 0, 0, NULL); + } + freez(table); + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when altering aclk_alert tables, rc = %d", rc); + + return 0; +} + + static int do_migration_noop(sqlite3 *database, const char *name) { UNUSED(database); @@ -135,7 +202,7 @@ static int migrate_database(sqlite3 *database, int target_version, char *db_name int user_version = 0; char *err_msg = NULL; - int rc = sqlite3_exec(database, "PRAGMA user_version;", return_int_cb, (void *) &user_version, &err_msg); + int rc = sqlite3_exec_monitored(database, "PRAGMA user_version;", return_int_cb, (void *) &user_version, &err_msg); if (rc != SQLITE_OK) { info("Error checking the %s database version; %s", db_name, err_msg); sqlite3_free(err_msg); @@ -163,6 +230,9 @@ DATABASE_FUNC_MIGRATION_LIST migration_action[] = { {.name = "v1 to v2", .func = do_migration_v1_v2}, {.name = "v2 to v3", .func = do_migration_v2_v3}, {.name = "v3 to v4", .func = do_migration_v3_v4}, + {.name = "v4 to v5", .func = do_migration_v4_v5}, + {.name = "v5 to v6", .func = do_migration_v5_v6}, + {.name = "v6 to v7", .func = do_migration_v6_v7}, // the terminator of this array {.name = NULL, .func = NULL} }; diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index f46450af..eeb3c382 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -3,7 +3,7 @@ #include "sqlite_functions.h" #include "sqlite_db_migration.h" -#define DB_METADATA_VERSION 4 +#define DB_METADATA_VERSION 7 const char *database_config[] = { "CREATE TABLE IF NOT EXISTS host(host_id BLOB PRIMARY KEY, hostname TEXT NOT NULL, " @@ -21,14 +21,10 @@ const char *database_config[] = { "CREATE TABLE IF NOT EXISTS dimension(dim_id blob PRIMARY KEY, chart_id blob, id text, name text, " "multiplier int, divisor int , algorithm int, options text);", - "DROP TABLE IF EXISTS chart_active;", - "DROP TABLE IF EXISTS dimension_active;", - - "CREATE TABLE IF NOT EXISTS chart_active(chart_id blob PRIMARY KEY, date_created int);", - "CREATE TABLE IF NOT EXISTS dimension_active(dim_id blob primary key, date_created int);", "CREATE TABLE IF NOT EXISTS metadata_migration(filename text, file_size, date_created int);", "CREATE INDEX IF NOT EXISTS ind_d1 on dimension (chart_id, id, name);", "CREATE INDEX IF NOT EXISTS ind_c1 on chart (host_id, id, type, name);", + "CREATE INDEX IF NOT EXISTS ind_c2 on chart (host_id, context);", "CREATE TABLE IF NOT EXISTS chart_label(chart_id blob, source_type int, label_key text, " "label_value text, date_created int, PRIMARY KEY (chart_id, label_key));", "CREATE TABLE IF NOT EXISTS node_instance (host_id blob PRIMARY KEY, claim_id, node_id, date_created);", @@ -45,39 +41,20 @@ const char *database_config[] = { "CREATE TABLE IF NOT EXISTS host_label(host_id blob, source_type int, label_key text NOT NULL, " "label_value text NOT NULL, date_created INT, PRIMARY KEY (host_id, label_key));", - "CREATE TABLE IF NOT EXISTS chart_hash_map(chart_id blob , hash_id blob, UNIQUE (chart_id, hash_id));", - - "CREATE TABLE IF NOT EXISTS chart_hash(hash_id blob PRIMARY KEY,type text, id text, name text, " - "family text, context text, title text, unit text, plugin text, " - "module text, priority integer, chart_type, last_used);", - - "CREATE VIEW IF NOT EXISTS v_chart_hash as SELECT ch.*, chm.chart_id FROM chart_hash ch, chart_hash_map chm " - "WHERE ch.hash_id = chm.hash_id;", - "CREATE TRIGGER IF NOT EXISTS ins_host AFTER INSERT ON host BEGIN INSERT INTO node_instance (host_id, date_created)" " SELECT new.host_id, unixepoch() WHERE new.host_id NOT IN (SELECT host_id FROM node_instance); END;", - "CREATE TRIGGER IF NOT EXISTS tr_v_chart_hash INSTEAD OF INSERT on v_chart_hash BEGIN " - "INSERT INTO chart_hash (hash_id, type, id, name, family, context, title, unit, plugin, " - "module, priority, chart_type, last_used) " - "values (new.hash_id, new.type, new.id, new.name, new.family, new.context, new.title, new.unit, new.plugin, " - "new.module, new.priority, new.chart_type, unixepoch()) " - "ON CONFLICT (hash_id) DO UPDATE SET last_used = unixepoch(); " - "INSERT INTO chart_hash_map (chart_id, hash_id) values (new.chart_id, new.hash_id) " - "on conflict (chart_id, hash_id) do nothing; END; ", - NULL }; const char *database_cleanup[] = { - "delete from chart where chart_id not in (select chart_id from dimension);", - "delete from host where host_id not in (select host_id from chart);", - "delete from chart_label where chart_id not in (select chart_id from chart);", - "DELETE FROM chart_hash_map WHERE chart_id NOT IN (SELECT chart_id FROM chart);", - "DELETE FROM chart_hash WHERE hash_id NOT IN (SELECT hash_id FROM chart_hash_map);", + "DELETE FROM chart WHERE chart_id NOT IN (SELECT chart_id FROM dimension);", + "DELETE FROM host WHERE host_id NOT IN (SELECT host_id FROM chart);", + "DELETE FROM chart_label WHERE chart_id NOT IN (SELECT chart_id FROM chart);", "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);", "DELETE FROM host_info WHERE host_id NOT IN (SELECT host_id FROM host);", "DELETE FROM host_label WHERE host_id NOT IN (SELECT host_id FROM host);", + "DROP TRIGGER IF EXISTS tr_dim_del;", NULL }; @@ -86,13 +63,49 @@ sqlite3 *db_meta = NULL; #define MAX_PREPARED_STATEMENTS (32) pthread_key_t key_pool[MAX_PREPARED_STATEMENTS]; -static uv_mutex_t sqlite_transaction_lock; +SQLITE_API int sqlite3_exec_monitored( + sqlite3 *db, /* An open database */ + const char *sql, /* SQL to be evaluated */ + int (*callback)(void*,int,char**,char**), /* Callback function */ + void *data, /* 1st argument to callback */ + char **errmsg /* Error msg written here */ +) { + int rc = sqlite3_exec(db, sql, callback, data, errmsg); + global_statistics_sqlite3_query_completed(rc == SQLITE_OK, rc == SQLITE_BUSY, rc == SQLITE_LOCKED); + return rc; +} + +SQLITE_API int sqlite3_step_monitored(sqlite3_stmt *stmt) { + int rc; + int cnt = 0; + + while (cnt++ < SQL_MAX_RETRY) { + rc = sqlite3_step(stmt); + switch (rc) { + case SQLITE_DONE: + global_statistics_sqlite3_query_completed(1, 0, 0); + break; + case SQLITE_ROW: + global_statistics_sqlite3_row_completed(); + break; + case SQLITE_BUSY: + case SQLITE_LOCKED: + global_statistics_sqlite3_query_completed(rc == SQLITE_DONE, rc == SQLITE_BUSY, rc == SQLITE_LOCKED); + usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); + continue; + default: + break; + } + break; + } + return rc; +} int execute_insert(sqlite3_stmt *res) { int rc; int cnt = 0; - while ((rc = sqlite3_step(res)) != SQLITE_DONE && ++cnt < SQL_MAX_RETRY && likely(!netdata_exit)) { + while ((rc = sqlite3_step_monitored(res)) != SQLITE_DONE && ++cnt < SQL_MAX_RETRY && likely(!netdata_exit)) { if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) { usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); error_report("Failed to insert/update, rc = %d -- attempt %d", rc, cnt); @@ -134,14 +147,14 @@ static void add_stmt_to_list(sqlite3_stmt *res) static void release_statement(void *statement) { int rc; -#ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_DEV_MODE info("Thread %d: Cleaning prepared statement on %p", gettid(), statement); #endif if (unlikely(rc = sqlite3_finalize((sqlite3_stmt *) statement) != SQLITE_OK)) error_report("Failed to finalize statement, rc = %d", rc); } -int prepare_statement(sqlite3 *database, char *query, sqlite3_stmt **statement) +int prepare_statement(sqlite3 *database, const char *query, sqlite3_stmt **statement) { static __thread uint32_t keys_used = 0; @@ -155,7 +168,7 @@ int prepare_statement(sqlite3 *database, char *query, sqlite3_stmt **statement) if (likely(rc == SQLITE_OK)) { if (likely(key)) { ret = pthread_setspecific(*key, *statement); -#ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_DEV_MODE info("Thread %d: Using key %u on statement %p", gettid(), keys_used, *statement); #endif } @@ -165,88 +178,6 @@ int prepare_statement(sqlite3 *database, char *query, sqlite3_stmt **statement) return rc; } -/* - * Store a chart or dimension UUID in chart_active or dimension_active - * The statement that will be prepared determines that - */ - -static int store_active_uuid_object(sqlite3_stmt **res, char *statement, uuid_t *uuid) -{ - int rc; - - // Check if we should need to prepare the statement - if (!*res) { - rc = prepare_statement(db_meta, statement, res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to store active object, rc = %d", rc); - return rc; - } - } - - rc = sqlite3_bind_blob(*res, 1, uuid, sizeof(*uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to bind input parameter to store active object, rc = %d", rc); - else - rc = execute_insert(*res); - return rc; -} - -/* - * Marks a chart with UUID as active - * Input: UUID - */ -void store_active_chart(uuid_t *chart_uuid) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return; - } - - if (unlikely(!chart_uuid)) - return; - - rc = store_active_uuid_object(&res, SQL_STORE_ACTIVE_CHART, chart_uuid); - if (rc != SQLITE_DONE) - error_report("Failed to store active chart, rc = %d", rc); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement in store active chart, rc = %d", rc); - return; -} - -/* - * Marks a dimension with UUID as active - * Input: UUID - */ -void store_active_dimension(uuid_t *dimension_uuid) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return; - } - - if (unlikely(!dimension_uuid)) - return; - - rc = store_active_uuid_object(&res, SQL_STORE_ACTIVE_DIMENSION, dimension_uuid); - if (rc != SQLITE_DONE) - error_report("Failed to store active dimension, rc = %d", rc); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement in store active dimension, rc = %d", rc); - return; -} - static int check_table_integrity_cb(void *data, int argc, char **argv, char **column) { int *status = data; @@ -273,7 +204,7 @@ static int check_table_integrity(char *table) strcpy(wstr,"PRAGMA integrity_check;"); } - int rc = sqlite3_exec(db_meta, wstr, check_table_integrity_cb, (void *) &status, &err_msg); + int rc = sqlite3_exec_monitored(db_meta, wstr, check_table_integrity_cb, (void *) &status, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error during database integrity check for %s, rc = %d (%s)", table ? table : "the entire database", rc, err_msg); @@ -306,14 +237,13 @@ static void rebuild_chart() info("Rebuilding chart table"); for (int i = 0; rebuild_chart_commands[i]; i++) { info("Executing %s", rebuild_chart_commands[i]); - rc = sqlite3_exec(db_meta, rebuild_chart_commands[i], 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db_meta, rebuild_chart_commands[i], 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error during database setup, rc = %d (%s)", rc, err_msg); error_report("SQLite failed statement %s", rebuild_chart_commands[i]); sqlite3_free(err_msg); } } - return; } const char *rebuild_dimension_commands[] = { @@ -339,14 +269,13 @@ void rebuild_dimension() info("Rebuilding dimension table"); for (int i = 0; rebuild_dimension_commands[i]; i++) { info("Executing %s", rebuild_dimension_commands[i]); - rc = sqlite3_exec(db_meta, rebuild_dimension_commands[i], 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db_meta, rebuild_dimension_commands[i], 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error during database setup, rc = %d (%s)", rc, err_msg); error_report("SQLite failed statement %s", rebuild_dimension_commands[i]); sqlite3_free(err_msg); } } - return; } static int attempt_database_fix() @@ -366,7 +295,7 @@ int init_database_batch(sqlite3 *database, int rebuild, int init_type, const cha char *err_msg = NULL; for (int i = 0; batch[i]; i++) { debug(D_METADATALOG, "Executing %s", batch[i]); - rc = sqlite3_exec(database, batch[i], 0, 0, &err_msg); + rc = sqlite3_exec_monitored(database, batch[i], 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("SQLite error during database %s, rc = %d (%s)", init_type ? "cleanup" : "setup", rc, err_msg); error_report("SQLite failed statement %s", batch[i]); @@ -384,6 +313,24 @@ int init_database_batch(sqlite3 *database, int rebuild, int init_type, const cha return 0; } +static void sqlite_uuid_parse(sqlite3_context *context, int argc, sqlite3_value **argv) +{ + uuid_t uuid; + + if ( argc != 1 ){ + sqlite3_result_null(context); + return ; + } + int rc = uuid_parse((const char *) sqlite3_value_text(argv[0]), uuid); + if (rc == -1) { + sqlite3_result_null(context); + return ; + } + + sqlite3_result_blob(context, &uuid, sizeof(uuid_t), SQLITE_TRANSIENT); +} + + /* * Initialize the SQLite database * Return 0 on success @@ -437,7 +384,7 @@ int sql_init_database(db_check_action_type_t rebuild, int memory) if (rebuild & DB_CHECK_RECLAIM_SPACE) { if (!(rebuild & DB_CHECK_CONT)) info("Reclaiming space of %s", sqlite_database); - rc = sqlite3_exec(db_meta, "VACUUM;", 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db_meta, "VACUUM;", 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("Failed to execute VACUUM rc = %d (%s)", rc, err_msg); sqlite3_free(err_msg); @@ -497,12 +444,14 @@ int sql_init_database(db_check_action_type_t rebuild, int memory) if (init_database_batch(db_meta, rebuild, 0, &database_cleanup[0])) return 1; - fatal_assert(0 == uv_mutex_init(&sqlite_transaction_lock)); info("SQLite database initialization completed"); for (int i = 0; i < MAX_PREPARED_STATEMENTS; i++) (void)pthread_key_create(&key_pool[i], release_statement); + rc = sqlite3_create_function(db_meta, "u2h", 1, SQLITE_ANY | SQLITE_DETERMINISTIC, 0, sqlite_uuid_parse, 0, 0); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to register internal u2h function"); return 0; } @@ -523,233 +472,9 @@ void sql_close_database(void) rc = sqlite3_close_v2(db_meta); if (unlikely(rc != SQLITE_OK)) error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc)); - return; -} - -#define FIND_UUID_TYPE "select 1 from host where host_id = @uuid union select 2 from chart where chart_id = @uuid union select 3 from dimension where dim_id = @uuid;" - -int find_uuid_type(uuid_t *uuid) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - int uuid_type = 3; - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, FIND_UUID_TYPE, &res); - if (rc != SQLITE_OK) { - error_report("Failed to bind prepare statement to find UUID type in the database"); - return 0; - } - } - - rc = sqlite3_bind_blob(res, 1, uuid, sizeof(*uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_step(res); - if (likely(rc == SQLITE_ROW)) - uuid_type = sqlite3_column_int(res, 0); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement during find uuid type, rc = %d", rc); - - return uuid_type; - -bind_fail: - return 0; -} - -int find_dimension_uuid(RRDSET *st, RRDDIM *rd, uuid_t *store_uuid) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - int status = 1; - - if (unlikely(!db_meta) && default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 1; - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, SQL_FIND_DIMENSION_UUID, &res); - if (rc != SQLITE_OK) { - error_report("Failed to bind prepare statement to lookup dimension UUID in the database"); - return 1; - } - } - - rc = sqlite3_bind_blob(res, 1, st->chart_uuid, sizeof(*st->chart_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 2, rd->id, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 3, rd->name, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_step(res); - if (likely(rc == SQLITE_ROW)) { - uuid_copy(*store_uuid, *((uuid_t *) sqlite3_column_blob(res, 0))); - status = 0; - } - else { - uuid_generate(*store_uuid); - status = sql_store_dimension(store_uuid, st->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, rd->algorithm); - if (unlikely(status)) - error_report("Failed to store dimension metadata in the database"); - } - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement find dimension uuid, rc = %d", rc); - return status; - -bind_fail: - error_report("Failed to bind input parameter to perform dimension UUID database lookup, rc = %d", rc); - return 1; -} - -#define DELETE_DIMENSION_UUID "delete from dimension where dim_id = @uuid;" - -void delete_dimension_uuid(uuid_t *dimension_uuid) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - -#ifdef NETDATA_INTERNAL_CHECKS - char uuid_str[GUID_LEN + 1]; - uuid_unparse_lower(*dimension_uuid, uuid_str); - debug(D_METADATALOG,"Deleting dimension uuid %s", uuid_str); -#endif - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, DELETE_DIMENSION_UUID, &res); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to delete a dimension uuid"); - return; - } - } - - rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_step(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to delete dimension uuid, rc = %d", rc); - -bind_fail: - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when deleting dimension UUID, rc = %d", rc); - return; -} - -/* - * Do a database lookup to find the UUID of a chart - * - */ -uuid_t *find_chart_uuid(RRDHOST *host, const char *type, const char *id, const char *name) -{ - static __thread sqlite3_stmt *res = NULL; - uuid_t *uuid = NULL; - int rc; - - if (unlikely(!db_meta) && default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return NULL; - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, SQL_FIND_CHART_UUID, &res); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to lookup chart UUID in the database"); - return NULL; - } - } - - rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 2, type, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 3, id, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 4, name ? name : id, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_step(res); - if (likely(rc == SQLITE_ROW)) { - uuid = mallocz(sizeof(uuid_t)); - uuid_copy(*uuid, sqlite3_column_blob(res, 0)); - } - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when searching for a chart UUID, rc = %d", rc); - -#ifdef NETDATA_INTERNAL_CHECKS - char uuid_str[GUID_LEN + 1]; - if (likely(uuid)) { - uuid_unparse_lower(*uuid, uuid_str); - debug(D_METADATALOG, "Found UUID %s for chart %s.%s", uuid_str, type, name ? name : id); - } - else - debug(D_METADATALOG, "UUID not found for chart %s.%s", type, name ? name : id); -#endif - return uuid; - -bind_fail: - error_report("Failed to bind input parameter to perform chart UUID database lookup, rc = %d", rc); - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement when searching for a chart UUID, rc = %d", rc); - return NULL; -} - -int update_chart_metadata(uuid_t *chart_uuid, RRDSET *st, const char *id, const char *name) -{ - int rc; - - if (unlikely(!db_meta) && default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - - rc = sql_store_chart( - chart_uuid, &st->rrdhost->host_uuid, st->type, id, name, st->family, st->context, st->title, st->units, st->plugin_name, - st->module_name, st->priority, st->update_every, st->chart_type, st->rrd_memory_mode, st->entries); - - return rc; -} - -uuid_t *create_chart_uuid(RRDSET *st, const char *id, const char *name) -{ - uuid_t *uuid = NULL; - int rc; - - uuid = mallocz(sizeof(uuid_t)); - uuid_generate(*uuid); - -#ifdef NETDATA_INTERNAL_CHECKS - char uuid_str[GUID_LEN + 1]; - uuid_unparse_lower(*uuid, uuid_str); - debug(D_METADATALOG,"Generating uuid [%s] for chart %s under host %s", uuid_str, st->id, st->rrdhost->hostname); -#endif - - rc = update_chart_metadata(uuid, st, id, name); - - if (unlikely(rc)) - error_report("Failed to store chart metadata in the database"); - - return uuid; } -static int exec_statement_with_uuid(const char *sql, uuid_t *uuid) +int exec_statement_with_uuid(const char *sql, uuid_t *uuid) { int rc, result = 1; sqlite3_stmt *res = NULL; @@ -763,7 +488,7 @@ static int exec_statement_with_uuid(const char *sql, uuid_t *uuid) rc = sqlite3_bind_blob(res, 1, uuid, sizeof(*uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host parameter to %s, rc = %d", sql, rc); - goto failed; + goto skip; } rc = execute_insert(res); @@ -772,7 +497,7 @@ static int exec_statement_with_uuid(const char *sql, uuid_t *uuid) else error_report("Failed to execute %s, rc = %d", sql, rc); -failed: +skip: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement %s, rc = %d", sql, rc); @@ -780,445 +505,13 @@ failed: } -// Migrate all hosts with hops zero to this host_uuid -void migrate_localhost(uuid_t *host_uuid) -{ - int rc; - - rc = exec_statement_with_uuid("UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0); ", host_uuid); - if (!rc) - rc = exec_statement_with_uuid("DELETE FROM host WHERE hops = 0 AND host_id <> @host_id; ", host_uuid); - if (!rc) - db_execute("DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"); - -} - -int sql_store_host( - uuid_t *host_uuid, const char *hostname, const char *registry_hostname, int update_every, const char *os, - const char *tzone, const char *tags, int hops) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - - if (unlikely((!res))) { - rc = prepare_statement(db_meta, SQL_STORE_HOST, &res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to store host, rc = %d", rc); - return 1; - } - } - - rc = sqlite3_bind_blob(res, 1, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 2, hostname, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 3, registry_hostname, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 4, update_every); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 5, os, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 6, tzone, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 7, tags, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 8, hops); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - int store_rc = sqlite3_step(res); - if (unlikely(store_rc != SQLITE_DONE)) - error_report("Failed to store host %s, rc = %d", hostname, rc); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement to store host %s, rc = %d", hostname, rc); - - return !(store_rc == SQLITE_DONE); -bind_fail: - error_report("Failed to bind parameter to store host %s, rc = %d", hostname, rc); - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement to store host %s, rc = %d", hostname, rc); - return 1; -} - -// -// Store host and host system info information in the database -#define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \ - "(host_id, hostname, registry_hostname, update_every, os, timezone," \ - "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \ - "entries, health_enabled) " \ - "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \ - "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \ - "@entries, @health_enabled);" - -int sql_store_host_info(RRDHOST *host) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - - if (unlikely((!res))) { - rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to store host, rc = %d", rc); - return 1; - } - } - - rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 2, host->hostname, 0); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 3, host->registry_hostname, 1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 4, host->rrd_update_every); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 5, host->os, 1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 6, host->timezone, 1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 7, host->tags, 1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 8, host->system_info ? host->system_info->hops : 0); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 9, host->rrd_memory_mode); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 10, host->abbrev_timezone, 1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 11, host->utc_offset); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 12, host->program_name, 1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = bind_text_null(res, 13, host->program_version, 1); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int64(res, 14, host->rrd_history_entries); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 15, host->health_enabled); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - int store_rc = sqlite3_step(res); - if (unlikely(store_rc != SQLITE_DONE)) - error_report("Failed to store host %s, rc = %d", host->hostname, rc); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement to store host %s, rc = %d", host->hostname, rc); - - return !(store_rc == SQLITE_DONE); -bind_fail: - error_report("Failed to bind parameter to store host %s, rc = %d", host->hostname, rc); - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement to store host %s, rc = %d", host->hostname, rc); - return 1; -} - -/* - * Store a chart in the database - */ - -int sql_store_chart( - uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family, - const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority, - int update_every, int chart_type, int memory_mode, long history_entries) -{ - static __thread sqlite3_stmt *res = NULL; - int rc, param = 0; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, SQL_STORE_CHART, &res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to store chart, rc = %d", rc); - return 1; - } - } - - param++; - rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - if (name && *name) - rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC); - else - rc = sqlite3_bind_null(res, 5); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_int(res, 12, priority); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_int(res, 13, update_every); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_int(res, 14, chart_type); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_int(res, 15, memory_mode); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_int(res, 16, history_entries); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store chart, rc = %d", rc); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement in chart store function, rc = %d", rc); - - return 0; - -bind_fail: - error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc); - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement in chart store function, rc = %d", rc); - return 1; -} - -/* - * Store a dimension - */ -int sql_store_dimension( - uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier, - collected_number divisor, int algorithm) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to store dimension, rc = %d", rc); - return 1; - } - } - - rc = sqlite3_bind_blob(res, 1, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_blob(res, 2, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 3, id, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_text(res, 4, name, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 5, multiplier); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 6, divisor); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = sqlite3_bind_int(res, 7, algorithm); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store dimension, rc = %d", rc); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement in store dimension, rc = %d", rc); - return 0; - -bind_fail: - error_report("Failed to bind parameter to store dimension, rc = %d", rc); - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement in store dimension, rc = %d", rc); - return 1; -} - -/* - * Store set option for a dimension - */ -int sql_set_dimension_option(uuid_t *dim_uuid, char *option) -{ - sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - - rc = sqlite3_prepare_v2(db_meta, "UPDATE dimension SET options = @options WHERE dim_id = @dim_id", -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to update dimension options"); - return 0; - }; - - rc = sqlite3_bind_blob(res, 2, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - if (!option || !strcmp(option,"unhide")) - rc = sqlite3_bind_null(res, 1); - else - rc = sqlite3_bind_text(res, 1, option, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to update dimension option, rc = %d", rc); - -bind_fail: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement in update dimension options, rc = %d", rc); - return 0; -} - // -// Support for archived charts +// Support for archived charts (TO BE REMOVED) // #define SELECT_DIMENSION "select d.id, d.name from dimension d where d.chart_id = @chart_uuid;" -void sql_rrdim2json(sqlite3_stmt *res_dim, uuid_t *chart_uuid, BUFFER *wb, size_t *dimensions_count) +static void sql_rrdim2json(sqlite3_stmt *res_dim, uuid_t *chart_uuid, BUFFER *wb, size_t *dimensions_count) { int rc; @@ -1229,7 +522,7 @@ void sql_rrdim2json(sqlite3_stmt *res_dim, uuid_t *chart_uuid, BUFFER *wb, size_ int dimensions = 0; buffer_sprintf(wb, "\t\t\t\"dimensions\": {\n"); - while (sqlite3_step(res_dim) == SQLITE_ROW) { + while (sqlite3_step_monitored(res_dim) == SQLITE_ROW) { if (dimensions) buffer_strcat(wb, ",\n\t\t\t\t\""); else @@ -1291,11 +584,11 @@ void sql_rrdset2json(RRDHOST *host, BUFFER *wb) ",\n\t\"memory_mode\": \"%s\"" ",\n\t\"custom_info\": \"%s\"" ",\n\t\"charts\": {" - , host->hostname - , host->program_version + , rrdhost_hostname(host) + , rrdhost_program_version(host) , get_release_channel() - , host->os - , host->timezone + , rrdhost_os(host) + , rrdhost_timezone(host) , host->rrd_update_every , host->rrd_history_entries , rrd_memory_mode_name(host->rrd_memory_mode) @@ -1305,7 +598,7 @@ void sql_rrdset2json(RRDHOST *host, BUFFER *wb) size_t c = 0; size_t dimensions = 0; - while (sqlite3_step(res_chart) == SQLITE_ROW) { + while (sqlite3_step_monitored(res_chart) == SQLITE_ROW) { char id[512]; sprintf(id, "%s.%s", sqlite3_column_text(res_chart, 3), sqlite3_column_text(res_chart, 1)); RRDSET *st = rrdset_find(host, id); @@ -1386,7 +679,7 @@ void sql_rrdset2json(RRDHOST *host, BUFFER *wb) "\n\t\t\t\"hostname\": \"%s\"" "\n\t\t}" , (found > 0) ? "," : "" - , h->hostname + , rrdhost_hostname(h) ); found++; @@ -1400,7 +693,7 @@ void sql_rrdset2json(RRDHOST *host, BUFFER *wb) , "\n\t\t{" "\n\t\t\t\"hostname\": \"%s\"" "\n\t\t}" - , host->hostname + , rrdhost_hostname(host) ); } @@ -1414,95 +707,6 @@ failed: rc = sqlite3_finalize(res_chart); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize the prepared statement when reading archived charts"); - - return; -} - -void free_temporary_host(RRDHOST *host) -{ - if (host) { - freez(host->hostname); - freez((char *)host->os); - freez((char *)host->tags); - freez((char *)host->timezone); - freez(host->program_name); - freez(host->program_version); - freez(host->registry_hostname); - freez(host->system_info); - freez(host); - } -} - -#define SELECT_HOST "select host_id, registry_hostname, update_every, os, timezone, tags from host where hostname = @hostname order by rowid desc;" -#define SELECT_HOST_BY_UUID "select h.host_id, h.registry_hostname, h.update_every, h.os, h.timezone, h.tags from host h, node_instance ni " \ - "where (ni.host_id = @host_id or ni.node_id = @host_id) AND ni.host_id = h.host_id;" - -RRDHOST *sql_create_host_by_uuid(char *hostname) -{ - int rc; - RRDHOST *host = NULL; - uuid_t host_uuid; - - sqlite3_stmt *res = NULL; - - rc = uuid_parse(hostname, host_uuid); - if (!rc) { - rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_BY_UUID, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch host by uuid"); - return NULL; - } - rc = sqlite3_bind_blob(res, 1, &host_uuid, sizeof(host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host_id parameter to fetch host information"); - goto failed; - } - } - else { - rc = sqlite3_prepare_v2(db_meta, SELECT_HOST, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch host by hostname"); - return NULL; - } - rc = sqlite3_bind_text(res, 1, hostname, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind hostname parameter to fetch host information"); - goto failed; - } - } - - rc = sqlite3_step(res); - if (unlikely(rc != SQLITE_ROW)) { - error_report("Failed to find hostname %s", hostname); - goto failed; - } - - char uuid_str[GUID_LEN + 1]; - uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 0)), uuid_str); - - host = callocz(1, sizeof(RRDHOST)); - - set_host_properties(host, sqlite3_column_int(res, 2), RRD_MEMORY_MODE_DBENGINE, hostname, - (char *) sqlite3_column_text(res, 1), (const char *) uuid_str, - (char *) sqlite3_column_text(res, 3), (char *) sqlite3_column_text(res, 5), - (char *) sqlite3_column_text(res, 4), NULL, 0, NULL, NULL); - - uuid_copy(host->host_uuid, *((uuid_t *) sqlite3_column_blob(res, 0))); - - host->system_info = callocz(1, sizeof(*host->system_info));; - rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED); - -#ifdef ENABLE_DBENGINE - for(int tier = 0; tier < storage_tiers ; tier++) - host->storage_instance[tier] = (STORAGE_INSTANCE *)multidb_ctx[tier]; -#endif - -failed: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when reading host information"); - - return host; } void db_execute(const char *cmd) @@ -1511,35 +715,23 @@ void db_execute(const char *cmd) int cnt = 0; while (cnt < SQL_MAX_RETRY) { char *err_msg; - rc = sqlite3_exec(db_meta, cmd, 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db_meta, cmd, 0, 0, &err_msg); if (rc != SQLITE_OK) { error_report("Failed to execute '%s', rc = %d (%s) -- attempt %d", cmd, rc, err_msg, cnt); sqlite3_free(err_msg); if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) { usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); } - else break; + else + break; } else break; + ++cnt; } - return; -} - -void db_lock(void) -{ - uv_mutex_lock(&sqlite_transaction_lock); - return; } -void db_unlock(void) -{ - uv_mutex_unlock(&sqlite_transaction_lock); - return; -} - - #define SELECT_MIGRATED_FILE "select 1 from metadata_migration where filename = @path;" int file_is_migrated(char *path) @@ -1559,7 +751,7 @@ int file_is_migrated(char *path) return 0; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("Failed to finalize the prepared statement when checking if metadata file is migrated"); @@ -1587,7 +779,7 @@ void add_migrated_file(char *path, uint64_t file_size) return; } - rc = sqlite3_bind_int64(res, 2, file_size); + rc = sqlite3_bind_int64(res, 2, (sqlite_int64) file_size); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind size parameter to store migration information"); return; @@ -1599,494 +791,9 @@ void add_migrated_file(char *path, uint64_t file_size) if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("Failed to finalize the prepared statement when checking if metadata file is migrated"); - - return; -} - -static int sql_store_label(sqlite3_stmt *res, uuid_t *uuid, int source_type, const char *label, const char *value) -{ - int rc; - - rc = sqlite3_bind_blob(res, 1, uuid, sizeof(*uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind UUID parameter to store label information"); - goto skip_store; - } - - rc = sqlite3_bind_int(res, 2, source_type); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind type parameter to store label information"); - goto skip_store; - } - - rc = sqlite3_bind_text(res, 3, label, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind label parameter to store label information"); - goto skip_store; - } - - rc = sqlite3_bind_text(res, 4, value, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind value parameter to store label information"); - goto skip_store; - } - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store label entry, rc = %d", rc); - -skip_store: - if (unlikely(sqlite3_reset(res) != SQLITE_OK)) - error_report("Failed to reset the prepared statement when storing label information"); - - return rc != SQLITE_DONE; -} - -#define SQL_INS_CHART_LABEL "insert or replace into chart_label " \ - "(chart_id, source_type, label_key, label_value, date_created) " \ - "values (@chart, @source, @label, @value, unixepoch());" - -void sql_store_chart_label(uuid_t *chart_uuid, int source_type, char *label, char *value) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return; - } - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, SQL_INS_CHART_LABEL, &res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement store chart labels"); - return; - } - } - - sql_store_label(res, chart_uuid, source_type, label, value); - - return; -} - -#define SQL_INS_HOST_LABEL "INSERT OR REPLACE INTO host_label " \ - "(host_id, source_type, label_key, label_value, date_created) " \ - "values (@chart, @source, @label, @value, unixepoch());" - -static void sql_store_host_label(uuid_t *host_uuid, int source_type, const char *label, const char *value) -{ - static __thread sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return; - } - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, SQL_INS_HOST_LABEL, &res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement store chart labels"); - return; - } - } - - (void) sql_store_label(res, host_uuid, source_type, label, value); -} - -int find_dimension_first_last_t(char *machine_guid, char *chart_id, char *dim_id, - uuid_t *uuid, time_t *first_entry_t, time_t *last_entry_t, uuid_t *rrdeng_uuid, int tier) -{ -#ifdef ENABLE_DBENGINE - int rc; - uuid_t legacy_uuid; - uuid_t multihost_legacy_uuid; - time_t dim_first_entry_t, dim_last_entry_t; - - rc = rrdeng_metric_latest_time_by_uuid(uuid, &dim_first_entry_t, &dim_last_entry_t, tier); - if (unlikely(rc)) { - rrdeng_generate_legacy_uuid(dim_id, chart_id, &legacy_uuid); - rc = rrdeng_metric_latest_time_by_uuid(&legacy_uuid, &dim_first_entry_t, &dim_last_entry_t, tier); - if (likely(rc)) { - rrdeng_convert_legacy_uuid_to_multihost(machine_guid, &legacy_uuid, &multihost_legacy_uuid); - rc = rrdeng_metric_latest_time_by_uuid(&multihost_legacy_uuid, &dim_first_entry_t, &dim_last_entry_t, tier); - if (likely(!rc)) - uuid_copy(*rrdeng_uuid, multihost_legacy_uuid); - } - else - uuid_copy(*rrdeng_uuid, legacy_uuid); - } - else - uuid_copy(*rrdeng_uuid, *uuid); - - if (likely(!rc)) { - *first_entry_t = MIN(*first_entry_t, dim_first_entry_t); - *last_entry_t = MAX(*last_entry_t, dim_last_entry_t); - } - return rc; -#else - UNUSED(machine_guid); - UNUSED(chart_id); - UNUSED(dim_id); - UNUSED(uuid); - UNUSED(first_entry_t); - UNUSED(last_entry_t); - UNUSED(rrdeng_uuid); - return 1; -#endif -} -#include "../storage_engine.h" -#ifdef ENABLE_DBENGINE -static RRDDIM *create_rrdim_entry(ONEWAYALLOC *owa, RRDSET *st, char *id, char *name, uuid_t *metric_uuid) -{ - STORAGE_ENGINE *eng = storage_engine_get(RRD_MEMORY_MODE_DBENGINE); - - if (unlikely(!eng)) - return NULL; - - RRDDIM *rd = onewayalloc_callocz(owa, 1, sizeof(*rd)); - rd->rrdset = st; - rd->update_every = st->update_every; - rd->last_stored_value = NAN; - rrddim_flag_set(rd, RRDDIM_FLAG_NONE); - - uuid_copy(rd->metric_uuid, *metric_uuid); - rd->id = onewayalloc_strdupz(owa, id); - rd->name = onewayalloc_strdupz(owa, name); - - for(int tier = 0; tier < storage_tiers ;tier++) { - rd->tiers[tier] = onewayalloc_callocz(owa, 1, sizeof(*rd->tiers[tier])); - rd->rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; - rd->tiers[tier]->tier_grouping = get_tier_grouping(tier); - rd->tiers[tier]->mode = RRD_MEMORY_MODE_DBENGINE; - rd->tiers[tier]->query_ops.init = rrdeng_load_metric_init; - rd->tiers[tier]->query_ops.next_metric = rrdeng_load_metric_next; - rd->tiers[tier]->query_ops.is_finished = rrdeng_load_metric_is_finished; - rd->tiers[tier]->query_ops.finalize = rrdeng_load_metric_finalize; - rd->tiers[tier]->query_ops.latest_time = rrdeng_metric_latest_time; - rd->tiers[tier]->query_ops.oldest_time = rrdeng_metric_oldest_time; - rd->tiers[tier]->db_metric_handle = eng->api.init(rd, st->rrdhost->storage_instance[tier]); - } - - return rd; -} -#endif - -#define SELECT_CHART_CONTEXT "select d.dim_id, d.id, d.name, c.id, c.type, c.name, c.update_every, c.chart_id, " \ - "c.context, CASE WHEN d.options = 'hidden' THEN 1 else 0 END from chart c, " \ - "dimension d, host h " \ - "where d.chart_id = c.chart_id and c.host_id = h.host_id and c.host_id = @host_id and c.context = @context " \ - "order by c.chart_id asc, c.type||c.id desc;" - -#define SELECT_CHART_SINGLE "select d.dim_id, d.id, d.name, c.id, c.type, c.name, c.update_every, c.chart_id, " \ - "c.context, CASE WHEN d.options = 'hidden' THEN 1 else 0 END from chart c, " \ - "dimension d, host h " \ - "where d.chart_id = c.chart_id and c.host_id = h.host_id and c.host_id = @host_id and c.type||'.'||c.id = @chart " \ - "order by c.chart_id asc, c.type||'.'||c.id desc;" - -void sql_build_context_param_list(ONEWAYALLOC *owa, struct context_param **param_list, RRDHOST *host, char *context, char *chart) -{ -#ifdef ENABLE_DBENGINE - int rc; - - if (unlikely(!param_list) || host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return; - - if (unlikely(!(*param_list))) { - *param_list = onewayalloc_mallocz(owa, sizeof(struct context_param)); - (*param_list)->first_entry_t = LONG_MAX; - (*param_list)->last_entry_t = 0; - (*param_list)->rd = NULL; - (*param_list)->flags = CONTEXT_FLAGS_ARCHIVE; - if (chart) - (*param_list)->flags |= CONTEXT_FLAGS_CHART; - else - (*param_list)->flags |= CONTEXT_FLAGS_CONTEXT; - } - - sqlite3_stmt *res = NULL; - - if (context) - rc = sqlite3_prepare_v2(db_meta, SELECT_CHART_CONTEXT, -1, &res, 0); - else - rc = sqlite3_prepare_v2(db_meta, SELECT_CHART_SINGLE, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to fetch host archived charts"); - return; - } - - rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host parameter to fetch archived charts"); - goto failed; - } - - if (context) - rc = sqlite3_bind_text(res, 2, context, -1, SQLITE_STATIC); - else - rc = sqlite3_bind_text(res, 2, chart, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host parameter to fetch archived charts"); - goto failed; - } - - RRDSET *st = NULL; - char machine_guid[GUID_LEN + 1]; - uuid_unparse_lower(host->host_uuid, machine_guid); - uuid_t rrdeng_uuid; - uuid_t chart_id; - - while (sqlite3_step(res) == SQLITE_ROW) { - char id[512]; - sprintf(id, "%s.%s", sqlite3_column_text(res, 3), sqlite3_column_text(res, 1)); - - if (!st || uuid_compare(*(uuid_t *)sqlite3_column_blob(res, 7), chart_id)) { - if (unlikely(st && !st->counter)) { - onewayalloc_freez(owa, st->context); - onewayalloc_freez(owa, (char *) st->name); - onewayalloc_freez(owa, st); - } - st = onewayalloc_callocz(owa, 1, sizeof(*st)); - char n[RRD_ID_LENGTH_MAX + 1]; - - snprintfz( - n, RRD_ID_LENGTH_MAX, "%s.%s", (char *)sqlite3_column_text(res, 4), - (char *)sqlite3_column_text(res, 3)); - st->name = onewayalloc_strdupz(owa, n); - st->update_every = sqlite3_column_int(res, 6); - st->counter = 0; - if (chart) { - st->context = onewayalloc_strdupz(owa, (char *)sqlite3_column_text(res, 8)); - strncpyz(st->id, chart, RRD_ID_LENGTH_MAX); - } - uuid_copy(chart_id, *(uuid_t *)sqlite3_column_blob(res, 7)); - st->last_entry_t = 0; - st->rrdhost = host; - } - - if (unlikely(find_dimension_first_last_t(machine_guid, (char *)st->name, (char *)sqlite3_column_text(res, 1), - (uuid_t *)sqlite3_column_blob(res, 0), &(*param_list)->first_entry_t, &(*param_list)->last_entry_t, - &rrdeng_uuid, 0))) - continue; - - st->counter++; - st->last_entry_t = MAX(st->last_entry_t, (*param_list)->last_entry_t); - - RRDDIM *rd = create_rrdim_entry(owa, st, (char *)sqlite3_column_text(res, 1), (char *)sqlite3_column_text(res, 2), &rrdeng_uuid); - if (unlikely(!rd)) - continue; - if (sqlite3_column_int(res, 9) == 1) - rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN); - rd->next = (*param_list)->rd; - (*param_list)->rd = rd; - } - if (st) { - if (!st->counter) { - onewayalloc_freez(owa,st->context); - onewayalloc_freez(owa,(char *)st->name); - onewayalloc_freez(owa,st); - } - else - if (!st->context && context) - st->context = onewayalloc_strdupz(owa,context); - } - -failed: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when reading archived charts"); -#else - UNUSED(param_list); - UNUSED(host); - UNUSED(context); - UNUSED(chart); -#endif - return; -} - - -/* - * Store a chart hash in the database - */ - -#define SQL_STORE_CHART_HASH "insert into v_chart_hash (hash_id, type, id, " \ - "name, family, context, title, unit, plugin, module, priority, chart_type, last_used, chart_id) " \ - "values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11, ?12, unixepoch(), ?13);" - -int sql_store_chart_hash( - uuid_t *hash_id, uuid_t *chart_id, const char *type, const char *id, const char *name, const char *family, - const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority, - RRDSET_TYPE chart_type) -{ - static __thread sqlite3_stmt *res = NULL; - int rc, param = 0; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - - if (unlikely(!res)) { - rc = prepare_statement(db_meta, SQL_STORE_CHART_HASH, &res); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to store chart, rc = %d", rc); - return 1; - } - } - - param++; - rc = sqlite3_bind_blob(res, 1, hash_id, sizeof(*hash_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 2, type, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 3, id, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - if (name && *name) - rc = sqlite3_bind_text(res, 4, name, -1, SQLITE_STATIC); - else - rc = sqlite3_bind_null(res, 4); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 5, family, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 6, context, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 7, title, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 8, units, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 9, plugin, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_text(res, 10, module, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_int(res, 11, (int) priority); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_int(res, 12, chart_type); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - param++; - rc = sqlite3_bind_blob(res, 13, chart_id, sizeof(*chart_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store chart hash_id, rc = %d", rc); - - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement in chart hash_id store function, rc = %d", rc); - - return 0; - - bind_fail: - error_report("Failed to bind parameter %d to store chart hash_id, rc = %d", param, rc); - rc = sqlite3_reset(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement in chart hash_id store function, rc = %d", rc); - return 1; } -/* - chart hashes are used for cloud communication. - if cloud is disabled or openssl is not available (which will prevent cloud connectivity) - skip hash calculations -*/ -void compute_chart_hash(RRDSET *st) -{ -#if !defined DISABLE_CLOUD && defined ENABLE_HTTPS - EVP_MD_CTX *evpctx; - unsigned char hash_value[EVP_MAX_MD_SIZE]; - unsigned int hash_len; - char priority_str[32]; - - if (rrdhost_flag_check(st->rrdhost, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS)) { - internal_error(true, "Skipping compute_chart_hash for host %s because context streaming is enabled", st->rrdhost->hostname); - return; - } - sprintf(priority_str, "%ld", st->priority); - - evpctx = EVP_MD_CTX_create(); - EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL); - //EVP_DigestUpdate(evpctx, st->type, strlen(st->type)); - EVP_DigestUpdate(evpctx, st->id, strlen(st->id)); - EVP_DigestUpdate(evpctx, st->name, strlen(st->name)); - EVP_DigestUpdate(evpctx, st->family, strlen(st->family)); - EVP_DigestUpdate(evpctx, st->context, strlen(st->context)); - EVP_DigestUpdate(evpctx, st->title, strlen(st->title)); - EVP_DigestUpdate(evpctx, st->units, strlen(st->units)); - EVP_DigestUpdate(evpctx, st->plugin_name, strlen(st->plugin_name)); - if (st->module_name) - EVP_DigestUpdate(evpctx, st->module_name, strlen(st->module_name)); -// EVP_DigestUpdate(evpctx, priority_str, strlen(priority_str)); - EVP_DigestUpdate(evpctx, &st->priority, sizeof(st->priority)); - EVP_DigestUpdate(evpctx, &st->chart_type, sizeof(st->chart_type)); - EVP_DigestFinal_ex(evpctx, hash_value, &hash_len); - EVP_MD_CTX_destroy(evpctx); - fatal_assert(hash_len > sizeof(uuid_t)); - - char uuid_str[GUID_LEN + 1]; - uuid_unparse_lower(*((uuid_t *) &hash_value), uuid_str); - //info("Calculating HASH %s for chart %s", uuid_str, st->name); - uuid_copy(st->state->hash_id, *((uuid_t *) &hash_value)); - - (void)sql_store_chart_hash( - (uuid_t *)&hash_value, - st->chart_uuid, - st->type, - st->id, - st->name, - st->family, - st->context, - st->title, - st->units, - st->plugin_name, - st->module_name, - st->priority, - st->chart_type); -#else - UNUSED(st); -#endif - return; -} #define SQL_STORE_CLAIM_ID "insert into node_instance " \ "(host_id, claim_id, date_created) values (@host_id, @claim_id, unixepoch()) " \ @@ -2156,7 +863,6 @@ static inline void set_host_node_id(RRDHOST *host, uuid_t *node_id) sql_create_aclk_table(host, &host->host_uuid, node_id); else uuid_unparse_lower(*node_id, wc->node_id); - return; } #define SQL_UPDATE_NODE_ID "update node_instance set node_id = @node_id where host_id = @host_id;" @@ -2199,7 +905,7 @@ int update_node_id(uuid_t *host_id, uuid_t *node_id) char host_guid[GUID_LEN + 1]; uuid_unparse_lower(*host_id, host_guid); rrd_wrlock(); - host = rrdhost_find_by_guid(host_guid, 0); + host = rrdhost_find_by_guid(host_guid); if (likely(host)) set_host_node_id(host, node_id); rrd_unlock(); @@ -2242,7 +948,7 @@ char *get_hostname_by_node_id(char *node) goto failed; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) hostname = strdupz((char *)sqlite3_column_text(res, 0)); @@ -2280,7 +986,7 @@ int get_host_id(uuid_t *node_id, uuid_t *host_id) goto failed; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW && host_id)) uuid_copy(*host_id, *((uuid_t *) sqlite3_column_blob(res, 0))); @@ -2316,7 +1022,7 @@ int get_node_id(uuid_t *host_id, uuid_t *node_id) goto failed; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW && node_id)) uuid_copy(*node_id, *((uuid_t *) sqlite3_column_blob(res, 0))); @@ -2375,9 +1081,9 @@ failed: #define SQL_GET_NODE_INSTANCE_LIST "select ni.node_id, ni.host_id, h.hostname " \ "from node_instance ni, host h where ni.host_id = h.host_id;" -struct node_instance_list *get_node_list(void) +struct node_instance_list *get_node_list(void) { - struct node_instance_list *node_list = NULL; + struct node_instance_list *node_list = NULL; sqlite3_stmt *res = NULL; int rc; @@ -2395,7 +1101,7 @@ struct node_instance_list *get_node_list(void) int row = 0; char host_guid[37]; - while (sqlite3_step(res) == SQLITE_ROW) + while (sqlite3_step_monitored(res) == SQLITE_ROW) row++; if (sqlite3_reset(res) != SQLITE_OK) { @@ -2405,8 +1111,9 @@ struct node_instance_list *get_node_list(void) node_list = callocz(row + 1, sizeof(*node_list)); int max_rows = row; row = 0; + // TODO: Check to remove lock rrd_rdlock(); - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { if (sqlite3_column_bytes(res, 0) == sizeof(uuid_t)) uuid_copy(node_list[row].node_id, *((uuid_t *)sqlite3_column_blob(res, 0))); if (sqlite3_column_bytes(res, 1) == sizeof(uuid_t)) { @@ -2414,7 +1121,7 @@ struct node_instance_list *get_node_list(void) uuid_copy(node_list[row].host_id, *host_id); node_list[row].queryable = 1; uuid_unparse_lower(*host_id, host_guid); - RRDHOST *host = rrdhost_find_by_guid(host_guid, 0); + RRDHOST *host = rrdhost_find_by_guid(host_guid); node_list[row].live = host && (host == localhost || host->receiver) ? 1 : 0; node_list[row].hops = (host && host->system_info) ? host->system_info->hops : uuid_compare(*host_id, localhost->host_uuid) ? 1 : 0; @@ -2461,7 +1168,7 @@ void sql_load_node_id(RRDHOST *host) goto failed; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { if (likely(sqlite3_column_bytes(res, 0) == sizeof(uuid_t))) set_host_node_id(host, (uuid_t *)sqlite3_column_blob(res, 0)); @@ -2472,8 +1179,6 @@ void sql_load_node_id(RRDHOST *host) failed: if (unlikely(sqlite3_reset(res) != SQLITE_OK)) error_report("Failed to reset the prepared statement when loading node instance information"); - - return; }; @@ -2494,213 +1199,179 @@ void sql_build_host_system_info(uuid_t *host_id, struct rrdhost_system_info *sys rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host parameter host information"); - goto skip_loading; + goto skip; } - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { rrdhost_set_system_info_variable(system_info, (char *) sqlite3_column_text(res, 0), (char *) sqlite3_column_text(res, 1)); } -skip_loading: +skip: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) error_report("Failed to finalize the prepared statement when reading host information"); - return; } +#define SELECT_HOST_LABELS "SELECT label_key, label_value, source_type FROM host_label WHERE host_id = @host_id " \ + "AND label_key IS NOT NULL AND label_value IS NOT NULL;" -#define SQL_INS_HOST_SYSTEM_INFO "INSERT OR REPLACE INTO host_info " \ - "(host_id, system_key, system_value, date_created) " \ - "VALUES (@host, @key, @value, unixepoch());" - -void sql_store_host_system_info_key_value(uuid_t *host_id, const char *name, const char *value) +DICTIONARY *sql_load_host_labels(uuid_t *host_id) { - sqlite3_stmt *res = NULL; int rc; - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("Database has not been initialized"); - return; - } + DICTIONARY *labels = NULL; + sqlite3_stmt *res = NULL; - rc = sqlite3_prepare_v2(db_meta, SQL_INS_HOST_SYSTEM_INFO, -1, &res, 0); + rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_LABELS, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to store system info"); - return; + error_report("Failed to prepare statement to read host information"); + return NULL; } rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host parameter to store system information"); - goto skip_store; + error_report("Failed to bind host parameter host information"); + goto skip; } - rc = sqlite3_bind_text(res, 2, name, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind label parameter to store name information"); - goto skip_store; - } + labels = rrdlabels_create(); - rc = sqlite3_bind_text(res, 3, value, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind value parameter to store value information"); - goto skip_store; + while (sqlite3_step_monitored(res) == SQLITE_ROW) { + rrdlabels_add( + labels, + (const char *)sqlite3_column_text(res, 0), + (const char *)sqlite3_column_text(res, 1), + sqlite3_column_int(res, 2)); } - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to store host system info, rc = %d", rc); - -skip_store: +skip: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when storing host system information"); - - return; + error_report("Failed to finalize the prepared statement when reading host information"); + return labels; } - -void sql_store_host_system_info(uuid_t *host_id, const struct rrdhost_system_info *system_info) +// Utils +int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_be_null) { - if (unlikely(!system_info)) - return; - - if (system_info->container_os_name) - sql_store_host_system_info_key_value(host_id, "NETDATA_CONTAINER_OS_NAME", system_info->container_os_name); - - if (system_info->container_os_id) - sql_store_host_system_info_key_value(host_id, "NETDATA_CONTAINER_OS_ID", system_info->container_os_id); - - if (system_info->container_os_id_like) - sql_store_host_system_info_key_value(host_id, "NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like); - - if (system_info->container_os_version) - sql_store_host_system_info_key_value(host_id, "NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version); - - if (system_info->container_os_version_id) - sql_store_host_system_info_key_value(host_id, "NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id); - - if (system_info->host_os_detection) - sql_store_host_system_info_key_value(host_id, "NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection); - - if (system_info->host_os_name) - sql_store_host_system_info_key_value(host_id, "NETDATA_HOST_OS_NAME", system_info->host_os_name); - - if (system_info->host_os_id) - sql_store_host_system_info_key_value(host_id, "NETDATA_HOST_OS_ID", system_info->host_os_id); - - if (system_info->host_os_id_like) - sql_store_host_system_info_key_value(host_id, "NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like); - - if (system_info->host_os_version) - sql_store_host_system_info_key_value(host_id, "NETDATA_HOST_OS_VERSION", system_info->host_os_version); + if (likely(text)) + return sqlite3_bind_text(res, position, text, -1, SQLITE_STATIC); + if (!can_be_null) + return 1; + return sqlite3_bind_null(res, position); +} - if (system_info->host_os_version_id) - sql_store_host_system_info_key_value(host_id, "NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id); +int sql_metadata_cache_stats(int op) +{ + int count, dummy; + sqlite3_db_status(db_meta, op, &count, &dummy, 0); + return count; +} - if (system_info->host_os_detection) - sql_store_host_system_info_key_value(host_id, "NETDATA_HOST_OS_DETECTION", system_info->host_os_detection); +#define SQL_FIND_CHART_UUID \ + "SELECT chart_id FROM chart WHERE host_id = @host AND type=@type AND id=@id AND (name IS NULL OR name=@name) AND chart_id IS NOT NULL;" - if (system_info->kernel_name) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name); +#define SQL_FIND_DIMENSION_UUID \ + "SELECT dim_id FROM dimension WHERE chart_id=@chart AND id=@id AND name=@name AND LENGTH(dim_id)=16;" - if (system_info->host_cores) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores); - if (system_info->host_cpu_freq) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq); +//Do a database lookup to find the UUID of a chart +//If found store it in store_uuid and return 0 +int sql_find_chart_uuid(RRDHOST *host, RRDSET *st, uuid_t *store_uuid) +{ + static __thread sqlite3_stmt *res = NULL; + int rc; - if (system_info->host_ram_total) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total); + const char *name = string2str(st->parts.name); - if (system_info->host_disk_space) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space); + if (unlikely(!db_meta) && default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) + return 1; - if (system_info->kernel_version) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version); + if (unlikely(!res)) { + rc = prepare_statement(db_meta, SQL_FIND_CHART_UUID, &res); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to lookup chart UUID in the database"); + return 1; + } + } - if (system_info->architecture) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture); + rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; - if (system_info->virtualization) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization); + rc = sqlite3_bind_text(res, 2, string2str(st->parts.type), -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; - if (system_info->virt_detection) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection); + rc = sqlite3_bind_text(res, 3, string2str(st->parts.id), -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; - if (system_info->container) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_CONTAINER", system_info->container); + rc = sqlite3_bind_text(res, 4, name && *name ? name : string2str(st->parts.id), -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; - if (system_info->container_detection) - sql_store_host_system_info_key_value(host_id, "NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection); + int status = 1; + rc = sqlite3_step_monitored(res); + if (likely(rc == SQLITE_ROW)) { + uuid_copy(*store_uuid, sqlite3_column_blob(res, 0)); + status = 0; + } - if (system_info->is_k8s_node) - sql_store_host_system_info_key_value(host_id, "NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when searching for a chart UUID, rc = %d", rc); - return; -} + return status; -static int save_host_label_callback(const char *name, const char *value, RRDLABEL_SRC label_source, void *data) -{ - RRDHOST *host = (RRDHOST *)data; - sql_store_host_label(&host->host_uuid, (int)label_source & ~(RRDLABEL_FLAG_INTERNAL), name, value); - return 0; +bind_fail: + error_report("Failed to bind input parameter to perform chart UUID database lookup, rc = %d", rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when searching for a chart UUID, rc = %d", rc); + return 1; } -#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;" -void sql_store_host_labels(RRDHOST *host) -{ - int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid); - if (rc != SQLITE_OK) - error_report("Failed to remove old host labels for host %s", host->hostname); - - rrdlabels_walkthrough_read(host->host_labels, save_host_label_callback, host); -} - -#define SELECT_HOST_LABELS "SELECT label_key, label_value, source_type FROM host_label WHERE host_id = @host_id " \ - "AND label_key IS NOT NULL AND label_value IS NOT NULL;" - -DICTIONARY *sql_load_host_labels(uuid_t *host_id) +int sql_find_dimension_uuid(RRDSET *st, RRDDIM *rd, uuid_t *store_uuid) { + static __thread sqlite3_stmt *res = NULL; int rc; + int status = 1; - DICTIONARY *labels = NULL; - sqlite3_stmt *res = NULL; + if (unlikely(!db_meta) && default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) + return 1; - rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_LABELS, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to read host information"); - return NULL; + if (unlikely(!res)) { + rc = prepare_statement(db_meta, SQL_FIND_DIMENSION_UUID, &res); + if (rc != SQLITE_OK) { + error_report("Failed to bind prepare statement to lookup dimension UUID in the database"); + return 1; + } } - rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to bind host parameter host information"); - goto skip_loading; - } + rc = sqlite3_bind_blob(res, 1, st->chart_uuid, sizeof(*st->chart_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; - labels = rrdlabels_create(); + rc = sqlite3_bind_text(res, 2, rrddim_id(rd), -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; - while (sqlite3_step(res) == SQLITE_ROW) { - rrdlabels_add( - labels, - (const char *)sqlite3_column_text(res, 0), - (const char *)sqlite3_column_text(res, 1), - sqlite3_column_int(res, 2)); + rc = sqlite3_bind_text(res, 3, rrddim_name(rd), -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_step_monitored(res); + if (likely(rc == SQLITE_ROW)) { + uuid_copy(*store_uuid, *((uuid_t *) sqlite3_column_blob(res, 0))); + status = 0; } -skip_loading: - if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("Failed to finalize the prepared statement when reading host information"); - return labels; -} + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement find dimension uuid, rc = %d", rc); + return status; -// Utils -int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_be_null) -{ - if (likely(text)) - return sqlite3_bind_text(res, position, text, -1, SQLITE_STATIC); - if (!can_be_null) - return 1; - return sqlite3_bind_null(res, position); +bind_fail: + error_report("Failed to bind input parameter to perform dimension UUID database lookup, rc = %d", rc); + return 1; } diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index e6808aa8..5731d5c9 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -25,29 +25,7 @@ typedef enum db_check_action_type { } db_check_action_type_t; #define SQL_MAX_RETRY (100) -#define SQLITE_INSERT_DELAY (50) // Insert delay in case of lock - -#define SQL_STORE_HOST "insert or replace into host (host_id,hostname,registry_hostname,update_every,os,timezone,tags, hops) " \ - "values (?1,?2,?3,?4,?5,?6,?7,?8);" - -#define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \ - "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \ - "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);" - -#define SQL_FIND_CHART_UUID \ - "select chart_id from chart where host_id = @host and type=@type and id=@id and (name is null or name=@name);" - -#define SQL_STORE_ACTIVE_CHART \ - "insert or replace into chart_active (chart_id, date_created) values (@id, unixepoch());" - -#define SQL_STORE_DIMENSION \ - "INSERT OR REPLACE into dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm) values (?0001,?0002,?0003,?0004,?0005,?0006,?0007);" - -#define SQL_FIND_DIMENSION_UUID \ - "select dim_id from dimension where chart_id=@chart and id=@id and name=@name and length(dim_id)=16;" - -#define SQL_STORE_ACTIVE_DIMENSION \ - "insert or replace into dimension_active (dim_id, date_created) values (@id, unixepoch());" +#define SQLITE_INSERT_DELAY (10) // Insert delay in case of lock #define CHECK_SQLITE_CONNECTION(db_meta) \ if (unlikely(!db_meta)) { \ @@ -58,59 +36,51 @@ typedef enum db_check_action_type { return 1; \ } -extern int sql_init_database(db_check_action_type_t rebuild, int memory); -extern void sql_close_database(void); -extern int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_be_null); -extern int sql_store_host(uuid_t *guid, const char *hostname, const char *registry_hostname, int update_every, const char *os, - const char *timezone, const char *tags, int hops); - -extern int sql_store_host_info(RRDHOST *host); - -extern int sql_store_chart( - uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family, - const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority, - int update_every, int chart_type, int memory_mode, long history_entries); -extern int sql_store_dimension(uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier, - collected_number divisor, int algorithm); - -extern int find_dimension_uuid(RRDSET *st, RRDDIM *rd, uuid_t *store_uuid); -extern void store_active_dimension(uuid_t *dimension_uuid); - -extern uuid_t *find_chart_uuid(RRDHOST *host, const char *type, const char *id, const char *name); -extern uuid_t *create_chart_uuid(RRDSET *st, const char *id, const char *name); -extern int update_chart_metadata(uuid_t *chart_uuid, RRDSET *st, const char *id, const char *name); -extern void store_active_chart(uuid_t *dimension_uuid); - -extern int find_uuid_type(uuid_t *uuid); - -extern void sql_rrdset2json(RRDHOST *host, BUFFER *wb); - -extern RRDHOST *sql_create_host_by_uuid(char *guid); -extern int prepare_statement(sqlite3 *database, char *query, sqlite3_stmt **statement); -extern int execute_insert(sqlite3_stmt *res); -extern void db_execute(const char *cmd); -extern int file_is_migrated(char *path); -extern void add_migrated_file(char *path, uint64_t file_size); -extern void db_unlock(void); -extern void db_lock(void); -extern void delete_dimension_uuid(uuid_t *dimension_uuid); -extern void sql_store_chart_label(uuid_t *chart_uuid, int source_type, char *label, char *value); -extern void sql_build_context_param_list(ONEWAYALLOC *owa, struct context_param **param_list, RRDHOST *host, char *context, char *chart); -extern void store_claim_id(uuid_t *host_id, uuid_t *claim_id); -extern int update_node_id(uuid_t *host_id, uuid_t *node_id); -extern int get_node_id(uuid_t *host_id, uuid_t *node_id); -extern int get_host_id(uuid_t *node_id, uuid_t *host_id); -extern void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id); -extern struct node_instance_list *get_node_list(void); -extern void sql_load_node_id(RRDHOST *host); -extern void compute_chart_hash(RRDSET *st); -extern int sql_set_dimension_option(uuid_t *dim_uuid, char *option); -char *get_hostname_by_node_id(char *node_id); -void free_temporary_host(RRDHOST *host); +SQLITE_API int sqlite3_step_monitored(sqlite3_stmt *stmt); +SQLITE_API int sqlite3_exec_monitored( + sqlite3 *db, /* An open database */ + const char *sql, /* SQL to be evaluated */ + int (*callback)(void*,int,char**,char**), /* Callback function */ + void *data, /* 1st argument to callback */ + char **errmsg /* Error msg written here */ + ); + +// Initialization and shutdown int init_database_batch(sqlite3 *database, int rebuild, int init_type, const char *batch[]); -void migrate_localhost(uuid_t *host_uuid); -extern void sql_store_host_system_info(uuid_t *host_id, const struct rrdhost_system_info *system_info); -extern void sql_build_host_system_info(uuid_t *host_id, struct rrdhost_system_info *system_info); -void sql_store_host_labels(RRDHOST *host); +int sql_init_database(db_check_action_type_t rebuild, int memory); +void sql_close_database(void); + +// Helpers +int bind_text_null(sqlite3_stmt *res, int position, const char *text, bool can_be_null); +int prepare_statement(sqlite3 *database, const char *query, sqlite3_stmt **statement); +int execute_insert(sqlite3_stmt *res); +int file_is_migrated(char *path); +int exec_statement_with_uuid(const char *sql, uuid_t *uuid); +void add_migrated_file(char *path, uint64_t file_size); +void db_execute(const char *cmd); + +// Look up functions +int get_node_id(uuid_t *host_id, uuid_t *node_id); +int get_host_id(uuid_t *node_id, uuid_t *host_id); +struct node_instance_list *get_node_list(void); +void sql_load_node_id(RRDHOST *host); +char *get_hostname_by_node_id(char *node_id); +int sql_find_chart_uuid(RRDHOST *host, RRDSET *st, uuid_t *store_uuid); +int sql_find_dimension_uuid(RRDSET *st, RRDDIM *rd, uuid_t *store_uuid); + +// Help build archived hosts in memory when agent starts +void sql_build_host_system_info(uuid_t *host_id, struct rrdhost_system_info *system_info); DICTIONARY *sql_load_host_labels(uuid_t *host_id); + +// For queries: To be removed when context queries are implemented +void sql_rrdset2json(RRDHOST *host, BUFFER *wb); + +// TODO: move to metadata +int update_node_id(uuid_t *host_id, uuid_t *node_id); + +void invalidate_node_instances(uuid_t *host_id, uuid_t *claim_id); + +// Provide statistics +int sql_metadata_cache_stats(int op); + #endif //NETDATA_SQLITE_FUNCTIONS_H diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c index 8e59cad1..c189305b 100644 --- a/database/sqlite/sqlite_health.c +++ b/database/sqlite/sqlite_health.c @@ -4,6 +4,7 @@ #include "sqlite_functions.h" #define MAX_HEALTH_SQL_SIZE 2048 +#define sqlite3_bind_string_or_null(res,key,param) ((key) ? sqlite3_bind_text(res, param, string2str(key), -1, SQLITE_STATIC) : sqlite3_bind_null(res, param)) /* Health related SQL queries Creates a health log table in sqlite, one per host guid @@ -15,7 +16,7 @@ int sql_create_health_log_table(RRDHOST *host) { if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("HEALTH [%s]: Database has not been initialized", host->hostname); + error_report("HEALTH [%s]: Database has not been initialized", rrdhost_hostname(host)); return 1; } @@ -24,9 +25,9 @@ int sql_create_health_log_table(RRDHOST *host) { snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_CREATE_HEALTH_LOG_TABLE(uuid_str)); - rc = sqlite3_exec(db_meta, command, 0, 0, &err_msg); + rc = sqlite3_exec_monitored(db_meta, command, 0, 0, &err_msg); if (rc != SQLITE_OK) { - error_report("HEALTH [%s]: SQLite error during creation of health log table, rc = %d (%s)", host->hostname, rc, err_msg); + error_report("HEALTH [%s]: SQLite error during creation of health log table, rc = %d (%s)", rrdhost_hostname(host), rc, err_msg); sqlite3_free(err_msg); return 1; } @@ -49,7 +50,7 @@ void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae) { if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("HEALTH [%s]: Database has not been initialized", host->hostname); + error_report("HEALTH [%s]: Database has not been initialized", rrdhost_hostname(host)); return; } @@ -60,7 +61,7 @@ void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae) { rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("HEALTH [%s]: Failed to prepare statement for SQL_UPDATE_HEALTH_LOG", host->hostname); + error_report("HEALTH [%s]: Failed to prepare statement for SQL_UPDATE_HEALTH_LOG", rrdhost_hostname(host)); return; } @@ -96,12 +97,12 @@ void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae) { rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) { - error_report("HEALTH [%s]: Failed to update health log, rc = %d", host->hostname, rc); + error_report("HEALTH [%s]: Failed to update health log, rc = %d", rrdhost_hostname(host), rc); } failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("HEALTH [%s]: Failed to finalize the prepared statement for updating health log.", host->hostname); + error_report("HEALTH [%s]: Failed to finalize the prepared statement for updating health log.", rrdhost_hostname(host)); return; } @@ -122,7 +123,7 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("HEALTH [%s]: Database has not been initialized", host->hostname); + error_report("HEALTH [%s]: Database has not been initialized", rrdhost_hostname(host)); return; } @@ -133,11 +134,11 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { - error_report("HEALTH [%s]: Failed to prepare statement for SQL_INSERT_HEALTH_LOG", host->hostname); + error_report("HEALTH [%s]: Failed to prepare statement for SQL_INSERT_HEALTH_LOG", rrdhost_hostname(host)); return; } - rc = sqlite3_bind_text(res, 1, host->hostname, -1, SQLITE_STATIC); + rc = sqlite3_bind_text(res, 1, rrdhost_hostname(host), -1, SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind hostname parameter for SQL_INSERT_HEALTH_LOG"); goto failed; @@ -215,49 +216,49 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { goto failed; } - rc = sqlite3_bind_text(res, 14, ae->name, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->name, 14); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind name parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 15, ae->chart, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->chart, 15); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind chart parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 16, ae->family, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->family, 16); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind family parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 17, ae->exec, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->exec, 17); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind exec parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 18, ae->recipient, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->recipient, 18); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind recipient parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 19, ae->source, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->source, 19); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind source parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 20, ae->units, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->units, 20); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id parameter to store node instance information"); goto failed; } - rc = sqlite3_bind_text(res, 21, ae->info, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->info, 21); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind info parameter for SQL_INSERT_HEALTH_LOG"); goto failed; @@ -305,25 +306,25 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { goto failed; } - rc = sqlite3_bind_text(res, 29, ae->classification, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->classification, 29); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind classification parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 30, ae->component, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->component, 30); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind component parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 31, ae->type, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->type, 31); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind type parameter for SQL_INSERT_HEALTH_LOG"); goto failed; } - rc = sqlite3_bind_text(res, 32, ae->chart_context, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, ae->chart_context, 32); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind chart_context parameter for SQL_INSERT_HEALTH_LOG"); goto failed; @@ -331,7 +332,7 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) { - error_report("HEALTH [%s]: Failed to execute SQL_INSERT_HEALTH_LOG, rc = %d", host->hostname, rc); + error_report("HEALTH [%s]: Failed to execute SQL_INSERT_HEALTH_LOG, rc = %d", rrdhost_hostname(host), rc); goto failed; } @@ -340,7 +341,7 @@ void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae) { failed: if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) - error_report("HEALTH [%s]: Failed to finalize the prepared statement for inserting to health log.", host->hostname); + error_report("HEALTH [%s]: Failed to finalize the prepared statement for inserting to health log.", rrdhost_hostname(host)); return; } @@ -381,7 +382,7 @@ void sql_health_alarm_log_cleanup(RRDHOST *host) { char uuid_str[GUID_LEN + 1]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_CLEANUP_HEALTH_LOG(uuid_str, uuid_str, host->health_log_entries_written - rotate_every)); + snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_CLEANUP_HEALTH_LOG(uuid_str, uuid_str, (unsigned long int) (host->health_log_entries_written - rotate_every))); rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); if (unlikely(rc != SQLITE_OK)) { @@ -389,7 +390,7 @@ void sql_health_alarm_log_cleanup(RRDHOST *host) { return; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (unlikely(rc != SQLITE_DONE)) error_report("Failed to cleanup health log table, rc = %d", rc); @@ -428,7 +429,7 @@ void sql_health_alarm_log_count(RRDHOST *host) { return; } - rc = sqlite3_step(res); + rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) host->health_log_entries_written = (size_t) sqlite3_column_int64(res, 0); @@ -436,7 +437,7 @@ void sql_health_alarm_log_count(RRDHOST *host) { if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize the prepared statement to count health log entries from db"); - info("HEALTH [%s]: Table health_log_%s, contains %lu entries.", host->hostname, uuid_str, host->health_log_entries_written); + info("HEALTH [%s]: Table health_log_%s, contains %lu entries.", rrdhost_hostname(host), uuid_str, (unsigned long int) host->health_log_entries_written); } #define SQL_INJECT_REMOVED(guid, guid2) "insert into health_log_%s (hostname, unique_id, alarm_id, alarm_event_id, config_hash_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, " \ @@ -556,7 +557,7 @@ uint32_t sql_get_max_unique_id (char *uuid_str) return 0; } - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { max_unique_id = (uint32_t) sqlite3_column_int64(res, 0); } @@ -584,7 +585,7 @@ void sql_check_removed_alerts_state(char *uuid_str) return; } - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); unique_id = (uint32_t) sqlite3_column_int64(res, 1); alarm_id = (uint32_t) sqlite3_column_int64(res, 2); @@ -607,7 +608,7 @@ void sql_check_removed_alerts_state(char *uuid_str) #define SQL_LOAD_HEALTH_LOG(guid,limit) "SELECT hostname, unique_id, alarm_id, alarm_event_id, config_hash_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, name, chart, family, exec, recipient, source, units, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, class, component, type, chart_context FROM (SELECT hostname, unique_id, alarm_id, alarm_event_id, config_hash_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, delay_up_to_timestamp, name, chart, family, exec, recipient, source, units, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, class, component, type, chart_context FROM health_log_%s order by unique_id desc limit %u) order by unique_id asc;", guid, limit void sql_health_alarm_log_load(RRDHOST *host) { sqlite3_stmt *res = NULL; - int rc; + int ret; ssize_t errored = 0, loaded = 0; char command[MAX_HEALTH_SQL_SIZE + 1]; @@ -615,7 +616,7 @@ void sql_health_alarm_log_load(RRDHOST *host) { if (unlikely(!db_meta)) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - error_report("HEALTH [%s]: Database has not been initialized", host->hostname); + error_report("HEALTH [%s]: Database has not been initialized", rrdhost_hostname(host)); return; } @@ -626,47 +627,55 @@ void sql_health_alarm_log_load(RRDHOST *host) { snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_LOAD_HEALTH_LOG(uuid_str, host->health_log.max)); - rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("HEALTH [%s]: Failed to prepare sql statement to load health log.", host->hostname); + ret = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); + if (unlikely(ret != SQLITE_OK)) { + error_report("HEALTH [%s]: Failed to prepare sql statement to load health log.", rrdhost_hostname(host)); return; } + DICTIONARY *all_rrdcalcs = dictionary_create( + DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE); + RRDCALC *rc; + foreach_rrdcalc_in_rrdhost_read(host, rc) { + dictionary_set(all_rrdcalcs, rrdcalc_name(rc), rc, sizeof(*rc)); + } + foreach_rrdcalc_in_rrdhost_done(rc); + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step_monitored(res) == SQLITE_ROW) { ALARM_ENTRY *ae = NULL; // check that we have valid ids uint32_t unique_id = (uint32_t) sqlite3_column_int64(res, 1); if(!unique_id) { - error_report("HEALTH [%s]: Got invalid unique id. Ignoring it.", host->hostname); + error_report("HEALTH [%s]: Got invalid unique id. Ignoring it.", rrdhost_hostname(host)); errored++; continue; } uint32_t alarm_id = (uint32_t) sqlite3_column_int64(res, 2); if(!alarm_id) { - error_report("HEALTH [%s]: Got invalid alarm id. Ignoring it.", host->hostname); + error_report("HEALTH [%s]: Got invalid alarm id. Ignoring it.", rrdhost_hostname(host)); errored++; continue; } //need name, chart and family if (sqlite3_column_type(res, 13) == SQLITE_NULL) { - error_report("HEALTH [%s]: Got null name field. Ignoring it.", host->hostname); + error_report("HEALTH [%s]: Got null name field. Ignoring it.", rrdhost_hostname(host)); errored++; continue; } if (sqlite3_column_type(res, 14) == SQLITE_NULL) { - error_report("HEALTH [%s]: Got null chart field. Ignoring it.", host->hostname); + error_report("HEALTH [%s]: Got null chart field. Ignoring it.", rrdhost_hostname(host)); errored++; continue; } if (sqlite3_column_type(res, 15) == SQLITE_NULL) { - error_report("HEALTH [%s]: Got null family field. Ignoring it.", host->hostname); + error_report("HEALTH [%s]: Got null family field. Ignoring it.", rrdhost_hostname(host)); errored++; continue; } @@ -675,18 +684,7 @@ void sql_health_alarm_log_load(RRDHOST *host) { time_t last_repeat = 0; last_repeat = (time_t)sqlite3_column_int64(res, 27); - RRDCALC *rc = alarm_max_last_repeat(host, (char *) sqlite3_column_text(res, 14), simple_hash((char *) sqlite3_column_text(res, 14))); - if (!rc) { - for(rc = host->alarms; rc ; rc = rc->next) { - RRDCALC *rdcmp = (RRDCALC *) avl_insert_lock(&(host)->alarms_idx_name, (avl_t *)rc); - if(rdcmp != rc) { - error("Cannot insert the alarm index ID using log %s", rc->name); - } - } - - rc = alarm_max_last_repeat(host, (char *) sqlite3_column_text(res, 14), simple_hash((char *) sqlite3_column_text(res, 14))); - } - + rc = dictionary_get(all_rrdcalcs, (char *) sqlite3_column_text(res, 14)); if(unlikely(rc)) { if (rrdcalc_isrepeating(rc)) { rc->last_repeat = last_repeat; @@ -719,36 +717,32 @@ void sql_health_alarm_log_load(RRDHOST *host) { ae->exec_run_timestamp = (time_t) sqlite3_column_int64(res, 11); ae->delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 12); - ae->name = strdupz((char *) sqlite3_column_text(res, 13)); - ae->hash_name = simple_hash(ae->name); - - ae->chart = strdupz((char *) sqlite3_column_text(res, 14)); - ae->hash_chart = simple_hash(ae->chart); - - ae->family = strdupz((char *) sqlite3_column_text(res, 15)); + ae->name = string_strdupz((char *) sqlite3_column_text(res, 13)); + ae->chart = string_strdupz((char *) sqlite3_column_text(res, 14)); + ae->family = string_strdupz((char *) sqlite3_column_text(res, 15)); if (sqlite3_column_type(res, 16) != SQLITE_NULL) - ae->exec = strdupz((char *) sqlite3_column_text(res, 16)); + ae->exec = string_strdupz((char *) sqlite3_column_text(res, 16)); else ae->exec = NULL; if (sqlite3_column_type(res, 17) != SQLITE_NULL) - ae->recipient = strdupz((char *) sqlite3_column_text(res, 17)); + ae->recipient = string_strdupz((char *) sqlite3_column_text(res, 17)); else ae->recipient = NULL; if (sqlite3_column_type(res, 18) != SQLITE_NULL) - ae->source = strdupz((char *) sqlite3_column_text(res, 18)); + ae->source = string_strdupz((char *) sqlite3_column_text(res, 18)); else ae->source = NULL; if (sqlite3_column_type(res, 19) != SQLITE_NULL) - ae->units = strdupz((char *) sqlite3_column_text(res, 19)); + ae->units = string_strdupz((char *) sqlite3_column_text(res, 19)); else ae->units = NULL; if (sqlite3_column_type(res, 20) != SQLITE_NULL) - ae->info = strdupz((char *) sqlite3_column_text(res, 20)); + ae->info = string_strdupz((char *) sqlite3_column_text(res, 20)); else ae->info = NULL; @@ -763,30 +757,30 @@ void sql_health_alarm_log_load(RRDHOST *host) { ae->last_repeat = last_repeat; if (sqlite3_column_type(res, 28) != SQLITE_NULL) - ae->classification = strdupz((char *) sqlite3_column_text(res, 28)); + ae->classification = string_strdupz((char *) sqlite3_column_text(res, 28)); else ae->classification = NULL; if (sqlite3_column_type(res, 29) != SQLITE_NULL) - ae->component = strdupz((char *) sqlite3_column_text(res, 29)); + ae->component = string_strdupz((char *) sqlite3_column_text(res, 29)); else ae->component = NULL; if (sqlite3_column_type(res, 30) != SQLITE_NULL) - ae->type = strdupz((char *) sqlite3_column_text(res, 30)); + ae->type = string_strdupz((char *) sqlite3_column_text(res, 30)); else ae->type = NULL; if (sqlite3_column_type(res, 31) != SQLITE_NULL) - ae->chart_context = strdupz((char *) sqlite3_column_text(res, 31)); + ae->chart_context = string_strdupz((char *) sqlite3_column_text(res, 31)); else ae->chart_context = NULL; char value_string[100 + 1]; - freez(ae->old_value_string); - freez(ae->new_value_string); - ae->old_value_string = strdupz(format_value_and_unit(value_string, 100, ae->old_value, ae->units, -1)); - ae->new_value_string = strdupz(format_value_and_unit(value_string, 100, ae->new_value, ae->units, -1)); + string_freez(ae->old_value_string); + string_freez(ae->new_value_string); + ae->old_value_string = string_strdupz(format_value_and_unit(value_string, 100, ae->old_value, ae_units(ae), -1)); + ae->new_value_string = string_strdupz(format_value_and_unit(value_string, 100, ae->new_value, ae_units(ae), -1)); ae->next = host->health_log.alarms; host->health_log.alarms = ae; @@ -802,6 +796,9 @@ void sql_health_alarm_log_load(RRDHOST *host) { netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); + dictionary_destroy(all_rrdcalcs); + all_rrdcalcs = NULL; + if(!host->health_max_unique_id) host->health_max_unique_id = (uint32_t)now_realtime_sec(); if(!host->health_max_alarm_id) host->health_max_alarm_id = (uint32_t)now_realtime_sec(); @@ -809,10 +806,10 @@ void sql_health_alarm_log_load(RRDHOST *host) { if (unlikely(!host->health_log.next_alarm_id || host->health_log.next_alarm_id <= host->health_max_alarm_id)) host->health_log.next_alarm_id = host->health_max_alarm_id + 1; - info("HEALTH [%s]: Table health_log_%s, loaded %zd alarm entries, errors in %zd entries.", host->hostname, uuid_str, loaded, errored); + log_health("[%s]: Table health_log_%s, loaded %zd alarm entries, errors in %zd entries.", rrdhost_hostname(host), uuid_str, loaded, errored); - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) + ret = sqlite3_finalize(res); + if (unlikely(ret != SQLITE_OK)) error_report("Failed to finalize the health log read statement"); sql_health_alarm_log_count(host); @@ -849,159 +846,153 @@ int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg) } param++; - rc = sqlite3_bind_blob(res, 1, hash_id, sizeof(*hash_id), SQLITE_STATIC); + rc = sqlite3_bind_blob(res, param, hash_id, sizeof(*hash_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - if (cfg->alarm && *cfg->alarm) - rc = sqlite3_bind_text(res, 2, cfg->alarm, -1, SQLITE_STATIC); - else - rc = sqlite3_bind_null(res, 2); + rc = sqlite3_bind_string_or_null(res, cfg->alarm, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - if (cfg->template_key && *cfg->template_key) - rc = sqlite3_bind_text(res, 3, cfg->template_key, -1, SQLITE_STATIC); - else - rc = sqlite3_bind_null(res, 3); + rc = sqlite3_bind_string_or_null(res, cfg->template_key, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 4, cfg->on, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->on, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 5, cfg->classification, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->classification, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 6, cfg->component, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->component, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 7, cfg->type, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->type, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 8, cfg->os, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->os, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 9, cfg->host, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->host, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 10, cfg->lookup, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->lookup, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 11, cfg->every, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->every, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 12, cfg->units, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->units, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 13, cfg->calc, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->calc, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 14, cfg->families, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->families, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 15, cfg->plugin, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->plugin, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 16, cfg->module, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->module, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 17, cfg->charts, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->charts, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 18, cfg->green, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->green, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 19, cfg->red, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->red, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 20, cfg->warn, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->warn, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 21, cfg->crit, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->crit, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 22, cfg->exec, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->exec, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 23, cfg->to, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->to, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 24, cfg->info, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->info, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 25, cfg->delay, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->delay, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 26, cfg->options, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->options, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 27, cfg->repeat, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->repeat, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 28, cfg->host_labels, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->host_labels, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; if (cfg->p_db_lookup_after) { param++; - rc = sqlite3_bind_text(res, 29, cfg->p_db_lookup_dimensions, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_dimensions, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; param++; - rc = sqlite3_bind_text(res, 30, cfg->p_db_lookup_method, -1, SQLITE_STATIC); + rc = sqlite3_bind_string_or_null(res, cfg->p_db_lookup_method, param); if (unlikely(rc != SQLITE_OK)) goto bind_fail; @@ -1071,7 +1062,7 @@ int sql_store_alert_config_hash(uuid_t *hash_id, struct alert_config *cfg) skip hash calculations */ #if !defined DISABLE_CLOUD && defined ENABLE_HTTPS -#define DIGEST_ALERT_CONFIG_VAL(v) ((v) ? EVP_DigestUpdate(evpctx, (v), strlen((v))) : EVP_DigestUpdate(evpctx, "", 1)) +#define DIGEST_ALERT_CONFIG_VAL(v) ((v) ? EVP_DigestUpdate(evpctx, (string2str(v)), string_strlen((v))) : EVP_DigestUpdate(evpctx, "", 1)) #endif int alert_hash_and_store_config( uuid_t hash_id, diff --git a/database/sqlite/sqlite_health.h b/database/sqlite/sqlite_health.h index ef837894..87060dac 100644 --- a/database/sqlite/sqlite_health.h +++ b/database/sqlite/sqlite_health.h @@ -6,12 +6,12 @@ #include "sqlite3.h" extern sqlite3 *db_meta; -extern void sql_health_alarm_log_load(RRDHOST *host); -extern int sql_create_health_log_table(RRDHOST *host); -extern void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae); -extern void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae); -extern void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae); -extern void sql_health_alarm_log_cleanup(RRDHOST *host); -extern int alert_hash_and_store_config(uuid_t hash_id, struct alert_config *cfg, int store_hash); -extern void sql_aclk_alert_clean_dead_entries(RRDHOST *host); +void sql_health_alarm_log_load(RRDHOST *host); +int sql_create_health_log_table(RRDHOST *host); +void sql_health_alarm_log_update(RRDHOST *host, ALARM_ENTRY *ae); +void sql_health_alarm_log_insert(RRDHOST *host, ALARM_ENTRY *ae); +void sql_health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae); +void sql_health_alarm_log_cleanup(RRDHOST *host); +int alert_hash_and_store_config(uuid_t hash_id, struct alert_config *cfg, int store_hash); +void sql_aclk_alert_clean_dead_entries(RRDHOST *host); #endif //NETDATA_SQLITE_HEALTH_H diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c new file mode 100644 index 00000000..4eb21215 --- /dev/null +++ b/database/sqlite/sqlite_metadata.c @@ -0,0 +1,1580 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "sqlite_metadata.h" + +// SQL statements + +#define SQL_STORE_CLAIM_ID "insert into node_instance " \ + "(host_id, claim_id, date_created) values (@host_id, @claim_id, unixepoch()) " \ + "on conflict(host_id) do update set claim_id = excluded.claim_id;" + +#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;" + +#define STORE_HOST_LABEL \ + "INSERT OR REPLACE INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES " + +#define STORE_CHART_LABEL \ + "INSERT OR REPLACE INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES " + +#define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())" + +#define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;" + +#define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \ + "(host_id, hostname, registry_hostname, update_every, os, timezone," \ + "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \ + "entries, health_enabled) " \ + "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \ + "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \ + "@entries, @health_enabled);" + +#define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \ + "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \ + "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);" + +#define SQL_STORE_DIMENSION "INSERT OR REPLACE INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \ + "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options);" + +#define SELECT_DIMENSION_LIST "SELECT dim_id, rowid FROM dimension WHERE rowid > @row_id" + +#define STORE_HOST_INFO "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES " +#define STORE_HOST_INFO_VALUES "(u2h('%s'), '%s','%s', unixepoch())" + +#define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \ + "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);" +#define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;" +#define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);" + +#define METADATA_CMD_Q_MAX_SIZE (1024) // Max queue size; callers will block until there is room +#define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds +#define METADATA_MAINTENANCE_RETRY (60) // Retry run if already running or last run did actual work +#define METADATA_MAINTENANCE_INTERVAL (3600) // Repeat maintenance after latest successful + +#define METADATA_HOST_CHECK_FIRST_CHECK (5) // First check for pending metadata +#define METADATA_HOST_CHECK_INTERVAL (30) // Repeat check for pending metadata +#define METADATA_HOST_CHECK_IMMEDIATE (5) // Repeat immediate run because we have more metadata to write + +#define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying) +#define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop +#define METADATA_MAX_TRANSACTION_BATCH (128) // Maximum commands to add in a transaction + +enum metadata_opcode { + METADATA_DATABASE_NOOP = 0, + METADATA_DATABASE_TIMER, + METADATA_ADD_CHART, + METADATA_ADD_CHART_LABEL, + METADATA_ADD_DIMENSION, + METADATA_DEL_DIMENSION, + METADATA_ADD_DIMENSION_OPTION, + METADATA_ADD_HOST_SYSTEM_INFO, + METADATA_ADD_HOST_INFO, + METADATA_STORE_CLAIM_ID, + METADATA_STORE_HOST_LABELS, + METADATA_STORE_BUFFER, + + METADATA_SKIP_TRANSACTION, // Dummy -- OPCODES less than this one can be in a tranasction + + METADATA_SCAN_HOSTS, + METADATA_MAINTENANCE, + METADATA_SYNC_SHUTDOWN, + METADATA_UNITTEST, + // leave this last + // we need it to check for worker utilization + METADATA_MAX_ENUMERATIONS_DEFINED +}; + +#define MAX_PARAM_LIST (2) +struct metadata_cmd { + enum metadata_opcode opcode; + struct completion *completion; + const void *param[MAX_PARAM_LIST]; +}; + +struct metadata_database_cmdqueue { + unsigned head, tail; + struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE]; +}; + +typedef enum { + METADATA_FLAG_CLEANUP = (1 << 0), // Cleanup is running + METADATA_FLAG_SCANNING_HOSTS = (1 << 1), // Scanning of hosts in worker thread + METADATA_FLAG_SHUTDOWN = (1 << 2), // Shutting down +} METADATA_FLAG; + +#define METADATA_WORKER_BUSY (METADATA_FLAG_CLEANUP | METADATA_FLAG_SCANNING_HOSTS) + +struct metadata_wc { + uv_thread_t thread; + time_t check_metadata_after; + time_t check_hosts_after; + volatile unsigned queue_size; + uv_loop_t *loop; + uv_async_t async; + METADATA_FLAG flags; + uint64_t row_id; + uv_timer_t timer_req; + struct completion init_complete; + /* FIFO command queue */ + uv_mutex_t cmd_mutex; + uv_cond_t cmd_cond; + struct metadata_database_cmdqueue cmd_queue; +}; + +#define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag)) +#define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST) +#define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST) + +// +// For unittest +// +struct thread_unittest { + int join; + unsigned added; + unsigned processed; + unsigned *done; +}; + + +// Metadata functions + +struct query_build { + BUFFER *sql; + int count; + char uuid_str[UUID_STR_LEN]; +}; + +static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + struct query_build *lb = data; + if (unlikely(!lb->count)) + buffer_sprintf(lb->sql, STORE_HOST_LABEL); + else + buffer_strcat(lb->sql, ", "); + buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int)ls & ~(RRDLABEL_FLAG_INTERNAL), name, value); + lb->count++; + return 1; +} + +static int chart_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + struct query_build *lb = data; + if (unlikely(!lb->count)) + buffer_sprintf(lb->sql, STORE_CHART_LABEL); + else + buffer_strcat(lb->sql, ", "); + buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, ls, name, value); + lb->count++; + return 1; +} + +static void check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer) +{ + size_t old_version = st->rrdlabels_last_saved_version; + size_t new_version = dictionary_version(st->rrdlabels); + + if(new_version != old_version) { + buffer_flush(work_buffer); + struct query_build tmp = {.sql = work_buffer, .count = 0}; + uuid_unparse_lower(st->chart_uuid, tmp.uuid_str); + rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp); + st->rrdlabels_last_saved_version = new_version; + db_execute(buffer_tostring(work_buffer)); + } +} + +// Migrate all hosts with hops zero to this host_uuid +void migrate_localhost(uuid_t *host_uuid) +{ + int rc; + + rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid); + if (!rc) + rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid); + if (!rc) + db_execute(DELETE_MISSING_NODE_INSTANCES); + +} + +static void store_claim_id(uuid_t *host_id, uuid_t *claim_id) +{ + sqlite3_stmt *res = NULL; + int rc; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) + error_report("Database has not been initialized"); + return; + } + + rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement store chart labels"); + return; + } + + rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind host_id parameter to store node instance information"); + goto failed; + } + + if (claim_id) + rc = sqlite3_bind_blob(res, 2, claim_id, sizeof(*claim_id), SQLITE_STATIC); + else + rc = sqlite3_bind_null(res, 2); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind claim_id parameter to store node instance information"); + goto failed; + } + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to store node instance information, rc = %d", rc); + +failed: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("Failed to finalize the prepared statement when storing node instance information"); +} + +static void delete_dimension_uuid(uuid_t *dimension_uuid) +{ + static __thread sqlite3_stmt *res = NULL; + int rc; + + if (unlikely(!res)) { + rc = prepare_statement(db_meta, DELETE_DIMENSION_UUID, &res); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement to delete a dimension uuid"); + return; + } + } + + rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto skip_execution; + + rc = sqlite3_step_monitored(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to delete dimension uuid, rc = %d", rc); + +skip_execution: + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when deleting dimension UUID, rc = %d", rc); +} + +// +// Store host and host system info information in the database +static int sql_store_host_info(RRDHOST *host) +{ + static __thread sqlite3_stmt *res = NULL; + int rc, param = 0; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) + return 0; + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely((!res))) { + rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to store host, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_hostname(host), 0); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_registry_hostname(host), 1); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, host->rrd_update_every); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_os(host), 1); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_timezone(host), 1); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_tags(host), 1); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, host->system_info ? host->system_info->hops : 0); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, host->rrd_memory_mode); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_abbrev_timezone(host), 1); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, host->utc_offset); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_program_name(host), 1); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = bind_text_null(res, ++param, rrdhost_program_version(host), 1); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int64(res, ++param, host->rrd_history_entries); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int ) host->health_enabled); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + int store_rc = sqlite3_step_monitored(res); + if (unlikely(store_rc != SQLITE_DONE)) + error_report("Failed to store host %s, rc = %d", rrdhost_hostname(host), rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc); + + return !(store_rc == SQLITE_DONE); +bind_fail: + error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc); + return 1; +} + +static void sql_store_host_system_info_key_value(const char *name, const char *value, void *data) +{ + struct query_build *lb = data; + + if (unlikely(!value)) + return; + + if (unlikely(!lb->count)) + buffer_sprintf( + lb->sql, STORE_HOST_INFO); + else + buffer_strcat(lb->sql, ", "); + buffer_sprintf(lb->sql, STORE_HOST_INFO_VALUES, lb->uuid_str, name, value); + lb->count++; +} + +static BUFFER *sql_store_host_system_info(RRDHOST *host) +{ + struct rrdhost_system_info *system_info = host->system_info; + + if (unlikely(!system_info)) + return NULL; + + BUFFER *work_buffer = buffer_create(1024); + + struct query_build key_data = {.sql = work_buffer, .count = 0}; + uuid_unparse_lower(host->host_uuid, key_data.uuid_str); + + sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data); + sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data); + sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data); + sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data); + sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data); + sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data); + sql_store_host_system_info_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data); + sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data); + sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data); + sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data); + sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data); + sql_store_host_system_info_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data); + sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data); + sql_store_host_system_info_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data); + + return work_buffer; +} + + +/* + * Store set option for a dimension + */ +static int sql_set_dimension_option(uuid_t *dim_uuid, char *option) +{ + sqlite3_stmt *res = NULL; + int rc; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) + return 0; + error_report("Database has not been initialized"); + return 1; + } + + rc = sqlite3_prepare_v2(db_meta, "UPDATE dimension SET options = @options WHERE dim_id = @dim_id", -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to update dimension options"); + return 0; + }; + + rc = sqlite3_bind_blob(res, 2, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + if (!option || !strcmp(option,"unhide")) + rc = sqlite3_bind_null(res, 1); + else + rc = sqlite3_bind_text(res, 1, option, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to update dimension option, rc = %d", rc); + +bind_fail: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement in update dimension options, rc = %d", rc); + return 0; +} + +/* + * Store a chart in the database + */ + +static int sql_store_chart( + uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family, + const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority, + int update_every, int chart_type, int memory_mode, long history_entries) +{ + static __thread sqlite3_stmt *res = NULL; + int rc, param = 0; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) + return 0; + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db_meta, SQL_STORE_CHART, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to store chart, rc = %d", rc); + return 1; + } + } + + param++; + rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + if (name && *name) + rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC); + else + rc = sqlite3_bind_null(res, 5); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_int(res, 12, (int) priority); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_int(res, 13, update_every); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_int(res, 14, chart_type); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_int(res, 15, memory_mode); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + param++; + rc = sqlite3_bind_int(res, 16, (int) history_entries); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to store chart, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement in chart store function, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement in chart store function, rc = %d", rc); + return 1; +} + +/* + * Store a dimension + */ +static int sql_store_dimension( + uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier, + collected_number divisor, int algorithm, bool hidden) +{ + static __thread sqlite3_stmt *res = NULL; + int rc, param = 0; + + if (unlikely(!db_meta)) { + if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) + return 0; + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to store dimension, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_blob(res, ++param, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) multiplier); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int ) divisor); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, algorithm); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + if (hidden) + rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC); + else + rc = sqlite3_bind_null(res, ++param); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to store dimension, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement in store dimension, rc = %d", rc); + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to store dimension, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement in store dimension, rc = %d", rc); + return 1; +} + +static bool dimension_can_be_deleted(uuid_t *dim_uuid) +{ +#ifdef ENABLE_DBENGINE + bool no_retention = true; + for (size_t tier = 0; tier < storage_tiers; tier++) { + if (!multidb_ctx[tier]) + continue; + time_t first_time_t = 0, last_time_t = 0; + if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t) == 0) { + if (first_time_t > 0) { + no_retention = false; + break; + } + } + } + return no_retention; +#else + return false; +#endif +} + +static void check_dimension_metadata(struct metadata_wc *wc) +{ + int rc; + sqlite3_stmt *res = NULL; + + rc = sqlite3_prepare_v2(db_meta, SELECT_DIMENSION_LIST, -1, &res, 0); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to fetch host dimensions"); + return; + } + + rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) wc->row_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to row parameter"); + goto skip_run; + } + + uint32_t total_checked = 0; + uint32_t total_deleted= 0; + uint64_t last_row_id = wc->row_id; + + info("METADATA: Checking dimensions starting after row %"PRIu64, wc->row_id); + + while (sqlite3_step_monitored(res) == SQLITE_ROW && total_deleted < MAX_METADATA_CLEANUP) { + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + break; + + last_row_id = sqlite3_column_int64(res, 1); + rc = dimension_can_be_deleted((uuid_t *)sqlite3_column_blob(res, 0)); + if (rc == true) { + delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0)); + total_deleted++; + } + total_checked++; + } + wc->row_id = last_row_id; + time_t now = now_realtime_sec(); + if (total_deleted > 0) { + wc->check_metadata_after = now + METADATA_MAINTENANCE_RETRY; + } else + wc->row_id = 0; + info("METADATA: Checked %u, deleted %u -- will resume after row %"PRIu64" in %lld seconds", total_checked, total_deleted, wc->row_id, + (long long)(wc->check_metadata_after - now)); + +skip_run: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize the prepared statement when reading dimensions"); +} + + +// +// EVENT LOOP STARTS HERE +// +static uv_mutex_t metadata_async_lock; + +static void metadata_init_cmd_queue(struct metadata_wc *wc) +{ + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + wc->queue_size = 0; + fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); + fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); +} + +int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd) +{ + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&wc->cmd_mutex); + + if (cmd->opcode == METADATA_SYNC_SHUTDOWN) { + metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN); + uv_mutex_unlock(&wc->cmd_mutex); + return 0; + } + + if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE || + metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { + uv_mutex_unlock(&wc->cmd_mutex); + return 1; + } + + fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE); + /* enqueue command */ + wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; + wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.tail + 1 : 0; + wc->queue_size = queue_size + 1; + uv_mutex_unlock(&wc->cmd_mutex); + return 0; +} + +static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) +{ + unsigned queue_size; + + /* wait for free space in queue */ + uv_mutex_lock(&wc->cmd_mutex); + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { + uv_mutex_unlock(&wc->cmd_mutex); + (void) uv_async_send(&wc->async); + return; + } + + if (cmd->opcode == METADATA_SYNC_SHUTDOWN) { + metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN); + uv_mutex_unlock(&wc->cmd_mutex); + (void) uv_async_send(&wc->async); + return; + } + + while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) { + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { + uv_mutex_unlock(&wc->cmd_mutex); + return; + } + uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); + } + fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE); + /* enqueue command */ + wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; + wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.tail + 1 : 0; + wc->queue_size = queue_size + 1; + uv_mutex_unlock(&wc->cmd_mutex); + + /* wake up event loop */ + (void) uv_async_send(&wc->async); +} + +static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadata_opcode *next_opcode) +{ + struct metadata_cmd ret; + unsigned queue_size; + + uv_mutex_lock(&wc->cmd_mutex); + queue_size = wc->queue_size; + if (queue_size == 0) { + memset(&ret, 0, sizeof(ret)); + ret.opcode = METADATA_DATABASE_NOOP; + ret.completion = NULL; + *next_opcode = METADATA_DATABASE_NOOP; + } else { + /* dequeue command */ + ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; + + if (queue_size == 1) { + wc->cmd_queue.head = wc->cmd_queue.tail = 0; + } else { + wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ? + wc->cmd_queue.head + 1 : 0; + } + wc->queue_size = queue_size - 1; + if (wc->queue_size > 0) + *next_opcode = wc->cmd_queue.cmd_array[wc->cmd_queue.head].opcode; + else + *next_opcode = METADATA_DATABASE_NOOP; + /* wake up producers */ + uv_cond_signal(&wc->cmd_cond); + } + uv_mutex_unlock(&wc->cmd_mutex); + + return ret; +} + +static void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); + uv_update_time(handle->loop); +} + +#define TIMER_INITIAL_PERIOD_MS (1000) +#define TIMER_REPEAT_PERIOD_MS (1000) + +static void timer_cb(uv_timer_t* handle) +{ + uv_stop(handle->loop); + uv_update_time(handle->loop); + + struct metadata_wc *wc = handle->data; + struct metadata_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + + time_t now = now_realtime_sec(); + + if (wc->check_metadata_after && wc->check_metadata_after < now) { + cmd.opcode = METADATA_MAINTENANCE; + if (!metadata_enq_cmd_noblock(wc, &cmd)) + wc->check_metadata_after = now + METADATA_MAINTENANCE_INTERVAL; + } + + if (wc->check_hosts_after && wc->check_hosts_after < now) { + cmd.opcode = METADATA_SCAN_HOSTS; + if (!metadata_enq_cmd_noblock(wc, &cmd)) + wc->check_hosts_after = now + METADATA_HOST_CHECK_INTERVAL; + } +} + +static void after_metadata_cleanup(uv_work_t *req, int status) +{ + UNUSED(status); + + struct metadata_wc *wc = req->data; + metadata_flag_clear(wc, METADATA_FLAG_CLEANUP); +} +static void start_metadata_cleanup(uv_work_t *req) +{ + struct metadata_wc *wc = req->data; + check_dimension_metadata(wc); +} + +struct scan_metadata_payload { + uv_work_t request; + struct metadata_wc *wc; + struct completion *completion; + uint32_t max_count; +}; + +// Callback after scan of hosts is done +static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) +{ + struct scan_metadata_payload *data = req->data; + struct metadata_wc *wc = data->wc; + + metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); + internal_error(true, "METADATA: scanning hosts complete"); + if (unlikely(data->completion)) { + completion_mark_complete(data->completion); + internal_error(true, "METADATA: Sending completion done"); + } + freez(data); +} + +static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { + RRDSET *st; + int rc; + + bool more_to_do = false; + uint32_t scan_count = 1; + BUFFER *work_buffer = buffer_create(1024); + + rrdset_foreach_reentrant(st, host) { + if (scan_count == max_count) { + more_to_do = true; + break; + } + if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) { + rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE); + scan_count++; + + check_and_update_chart_labels(st, work_buffer); + + rc = sql_store_chart( + &st->chart_uuid, + &st->rrdhost->host_uuid, + string2str(st->parts.type), + string2str(st->parts.id), + string2str(st->parts.name), + rrdset_family(st), + rrdset_context(st), + rrdset_title(st), + rrdset_units(st), + rrdset_plugin_name(st), + rrdset_module_name(st), + st->priority, + st->update_every, + st->chart_type, + st->rrd_memory_mode, + st->entries); + if (unlikely(rc)) + internal_error(true, "METADATA: Failed to store chart metadata %s", string2str(st->id)); + } + + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) { + rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE); + + rc = sql_store_dimension( + &rd->metric_uuid, + &rd->rrdset->chart_uuid, + string2str(rd->id), + string2str(rd->name), + rd->multiplier, + rd->divisor, + rd->algorithm, + rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)); + + if (unlikely(rc)) + error_report("METADATA: Failed to store dimension %s", string2str(rd->id)); + } + } + rrddim_foreach_done(rd); + } + rrdset_foreach_done(st); + + buffer_free(work_buffer); + return more_to_do; +} + +// Worker thread to scan hosts for pending metadata to store +static void start_metadata_hosts(uv_work_t *req __maybe_unused) +{ + RRDHOST *host; + + struct scan_metadata_payload *data = req->data; + struct metadata_wc *wc = data->wc; + + bool run_again = false; + dfe_start_reentrant(rrdhost_root_index, host) { + if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE)) + continue; + internal_error(true, "METADATA: Scanning host %s", rrdhost_hostname(host)); + rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE); + if (unlikely(metadata_scan_host(host, data->max_count))) { + run_again = true; + rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE); + internal_error(true,"METADATA: Rescheduling host %s to run; more charts to store", rrdhost_hostname(host)); + } + } + dfe_done(host); + if (unlikely(run_again)) + wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE; + else + wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL; +} + +static void metadata_event_loop(void *arg) +{ + worker_register("METASYNC"); + worker_register_job_name(METADATA_DATABASE_NOOP, "noop"); + worker_register_job_name(METADATA_DATABASE_TIMER, "timer"); + worker_register_job_name(METADATA_ADD_CHART, "add chart"); + worker_register_job_name(METADATA_ADD_CHART_LABEL, "add chart label"); + worker_register_job_name(METADATA_ADD_DIMENSION, "add dimension"); + worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension"); + worker_register_job_name(METADATA_ADD_DIMENSION_OPTION, "dimension option"); + worker_register_job_name(METADATA_ADD_HOST_SYSTEM_INFO, "host system info"); + worker_register_job_name(METADATA_ADD_HOST_INFO, "host info"); + worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id"); + worker_register_job_name(METADATA_STORE_HOST_LABELS, "host labels"); + worker_register_job_name(METADATA_MAINTENANCE, "maintenance"); + + + int ret; + uv_loop_t *loop; + unsigned cmd_batch_size; + struct metadata_wc *wc = arg; + enum metadata_opcode opcode, next_opcode; + uv_work_t metadata_cleanup_worker; + + uv_thread_set_name_np(wc->thread, "METASYNC"); + loop = wc->loop = mallocz(sizeof(uv_loop_t)); + ret = uv_loop_init(loop); + if (ret) { + error("uv_loop_init(): %s", uv_strerror(ret)); + goto error_after_loop_init; + } + loop->data = wc; + + ret = uv_async_init(wc->loop, &wc->async, async_cb); + if (ret) { + error("uv_async_init(): %s", uv_strerror(ret)); + goto error_after_async_init; + } + wc->async.data = wc; + + ret = uv_timer_init(loop, &wc->timer_req); + if (ret) { + error("uv_timer_init(): %s", uv_strerror(ret)); + goto error_after_timer_init; + } + wc->timer_req.data = wc; + fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS)); + + info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE); + + struct metadata_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + metadata_flag_clear(wc, METADATA_FLAG_CLEANUP); + metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); + + wc->check_metadata_after = now_realtime_sec() + METADATA_MAINTENANCE_FIRST_CHECK; + wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK; + + int shutdown = 0; + int in_transaction = 0; + int commands_in_transaction = 0; + // This can be used in the event loop for all opcodes (not workers) + BUFFER *work_buffer = buffer_create(1024); + wc->row_id = 0; + completion_mark_complete(&wc->init_complete); + + while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) { + RRDDIM *rd = NULL; + RRDSET *st = NULL; + RRDHOST *host = NULL; + DICTIONARY_ITEM *dict_item = NULL; + BUFFER *buffer = NULL; + uuid_t *uuid; + int rc; + + worker_is_idle(); + uv_run(loop, UV_RUN_DEFAULT); + + /* wait for commands */ + cmd_batch_size = 0; + do { + if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE)) + break; + + cmd = metadata_deq_cmd(wc, &next_opcode); + opcode = cmd.opcode; + + if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { + shutdown = 1; + continue; + } + + ++cmd_batch_size; + + // If we are not in transaction and this command is the same with the next ; start a transaction + if (!in_transaction && opcode < METADATA_SKIP_TRANSACTION && opcode == next_opcode) { + if (opcode != METADATA_DATABASE_NOOP) { + in_transaction = 1; + db_execute("BEGIN TRANSACTION;"); + } + } + + if (likely(in_transaction)) { + commands_in_transaction++; + } + + if (likely(opcode != METADATA_DATABASE_NOOP)) + worker_is_busy(opcode); + + switch (opcode) { + case METADATA_DATABASE_NOOP: + case METADATA_DATABASE_TIMER: + break; + case METADATA_ADD_CHART: + dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; + st = (RRDSET *) dictionary_acquired_item_value(dict_item); + + rc = sql_store_chart( + &st->chart_uuid, + &st->rrdhost->host_uuid, + string2str(st->parts.type), + string2str(st->parts.id), + string2str(st->parts.name), + rrdset_family(st), + rrdset_context(st), + rrdset_title(st), + rrdset_units(st), + rrdset_plugin_name(st), + rrdset_module_name(st), + st->priority, + st->update_every, + st->chart_type, + st->rrd_memory_mode, + st->entries); + + if (unlikely(rc)) + error_report("Failed to store chart %s", rrdset_id(st)); + + dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item); + break; + case METADATA_ADD_CHART_LABEL: + dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; + st = (RRDSET *) dictionary_acquired_item_value(dict_item); + check_and_update_chart_labels(st, work_buffer); + dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item); + break; + case METADATA_ADD_DIMENSION: + dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; + rd = (RRDDIM *) dictionary_acquired_item_value(dict_item); + + rc = sql_store_dimension( + &rd->metric_uuid, + &rd->rrdset->chart_uuid, + string2str(rd->id), + string2str(rd->name), + rd->multiplier, + rd->divisor, + rd->algorithm, + rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)); + + if (unlikely(rc)) + error_report("Failed to store dimension %s", rrddim_id(rd)); + + dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item); + break; + case METADATA_DEL_DIMENSION: + uuid = (uuid_t *) cmd.param[0]; + if (likely(dimension_can_be_deleted(uuid))) + delete_dimension_uuid(uuid); + freez(uuid); + break; + case METADATA_ADD_DIMENSION_OPTION: + dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; + rd = (RRDDIM *) dictionary_acquired_item_value(dict_item); + rc = sql_set_dimension_option( + &rd->metric_uuid, rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN) ? "hidden" : NULL); + if (unlikely(rc)) + error_report("Failed to store dimension option for %s", string2str(rd->id)); + dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item); + break; + case METADATA_ADD_HOST_SYSTEM_INFO: + buffer = (BUFFER *) cmd.param[0]; + db_execute(buffer_tostring(buffer)); + buffer_free(buffer); + break; + case METADATA_ADD_HOST_INFO: + dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; + host = (RRDHOST *) dictionary_acquired_item_value(dict_item); + rc = sql_store_host_info(host); + if (unlikely(rc)) + error_report("Failed to store host info in the database for %s", string2str(host->hostname)); + dictionary_acquired_item_release(rrdhost_root_index, dict_item); + break; + case METADATA_STORE_CLAIM_ID: + store_claim_id((uuid_t *) cmd.param[0], (uuid_t *) cmd.param[1]); + freez((void *) cmd.param[0]); + freez((void *) cmd.param[1]); + break; + case METADATA_STORE_HOST_LABELS: + dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; + host = (RRDHOST *) dictionary_acquired_item_value(dict_item); + rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid); + + if (likely(rc == SQLITE_OK)) { + buffer_flush(work_buffer); + struct query_build tmp = {.sql = work_buffer, .count = 0}; + uuid_unparse_lower(host->host_uuid, tmp.uuid_str); + rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp); + db_execute(buffer_tostring(work_buffer)); + } + + dictionary_acquired_item_release(rrdhost_root_index, dict_item); + break; + + case METADATA_SCAN_HOSTS: + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS))) + break; + + struct scan_metadata_payload *data = mallocz(sizeof(*data)); + data->request.data = data; + data->wc = wc; + data->completion = cmd.completion; // Completion by the worker + + if (unlikely(cmd.completion)) { + data->max_count = 0; // 0 will process all pending updates + cmd.completion = NULL; // Do not complete after launching worker (worker will do) + } + else + data->max_count = 1000; + + metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS); + if (unlikely( + uv_queue_work(loop,&data->request, + start_metadata_hosts, + after_metadata_hosts))) { + // Failed to launch worker -- let the event loop handle completion + cmd.completion = data->completion; + freez(data); + metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); + } + break; + case METADATA_STORE_BUFFER: + buffer = (BUFFER *) cmd.param[0]; + db_execute(buffer_tostring(buffer)); + buffer_free(buffer); + break; + case METADATA_MAINTENANCE: + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP))) + break; + + metadata_cleanup_worker.data = wc; + metadata_flag_set(wc, METADATA_FLAG_CLEANUP); + if (unlikely( + uv_queue_work(loop, &metadata_cleanup_worker, start_metadata_cleanup, after_metadata_cleanup))) { + metadata_flag_clear(wc, METADATA_FLAG_CLEANUP); + } + break; + case METADATA_UNITTEST:; + struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0]; + sleep_usec(1000); // processing takes 1ms + __atomic_fetch_add(&tu->processed, 1, __ATOMIC_SEQ_CST); + break; + default: + break; + } + if (in_transaction && (commands_in_transaction >= METADATA_MAX_TRANSACTION_BATCH || opcode != next_opcode)) { + in_transaction = 0; + db_execute("COMMIT TRANSACTION;"); + commands_in_transaction = 0; + } + + if (cmd.completion) + completion_mark_complete(cmd.completion); + } while (opcode != METADATA_DATABASE_NOOP); + } + + if (!uv_timer_stop(&wc->timer_req)) + uv_close((uv_handle_t *)&wc->timer_req, NULL); + + /* + * uv_async_send after uv_close does not seem to crash in linux at the moment, + * it is however undocumented behaviour we need to be aware if this becomes + * an issue in the future. + */ + uv_close((uv_handle_t *)&wc->async, NULL); + uv_run(loop, UV_RUN_DEFAULT); + + uv_cond_destroy(&wc->cmd_cond); + /* uv_mutex_destroy(&wc->cmd_mutex); */ + //fatal_assert(0 == uv_loop_close(loop)); + int rc; + + do { + rc = uv_loop_close(loop); + } while (rc != UV_EBUSY); + + freez(loop); + worker_unregister(); + + buffer_free(work_buffer); + info("METADATA: Shutting down event loop"); + completion_mark_complete(&wc->init_complete); + return; + +error_after_timer_init: + uv_close((uv_handle_t *)&wc->async, NULL); +error_after_async_init: + fatal_assert(0 == uv_loop_close(loop)); +error_after_loop_init: + freez(loop); + worker_unregister(); +} + +struct metadata_wc metasync_worker = {.loop = NULL}; + +void metadata_sync_shutdown(void) +{ + completion_init(&metasync_worker.init_complete); + + struct metadata_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + info("METADATA: Sending a shutdown command"); + cmd.opcode = METADATA_SYNC_SHUTDOWN; + metadata_enq_cmd(&metasync_worker, &cmd); + + /* wait for metadata thread to shut down */ + info("METADATA: Waiting for shutdown ACK"); + completion_wait_for(&metasync_worker.init_complete); + completion_destroy(&metasync_worker.init_complete); + info("METADATA: Shutdown complete"); +} + +void metadata_sync_shutdown_prepare(void) +{ + struct metadata_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + + struct completion compl; + completion_init(&compl); + + info("METADATA: Sending a scan host command"); + uint32_t max_wait_iterations = 2000; + while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_SCANNING_HOSTS)) && max_wait_iterations--) { + if (max_wait_iterations == 1999) + info("METADATA: Current worker is running; waiting to finish"); + sleep_usec(1000); + } + + cmd.opcode = METADATA_SCAN_HOSTS; + cmd.completion = &compl; + metadata_enq_cmd(&metasync_worker, &cmd); + + info("METADATA: Waiting for host scan completion"); + completion_wait_for(&compl); + completion_destroy(&compl); + info("METADATA: Host scan complete; can continue with shutdown"); +} + +// ------------------------------------------------------------- +// Init function called on agent startup + +void metadata_sync_init(void) +{ + struct metadata_wc *wc = &metasync_worker; + + fatal_assert(0 == uv_mutex_init(&metadata_async_lock)); + + memset(wc, 0, sizeof(*wc)); + metadata_init_cmd_queue(wc); + completion_init(&wc->init_complete); + + fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc)); + + completion_wait_for(&wc->init_complete); + completion_destroy(&wc->init_complete); + + info("SQLite metadata sync initialization complete"); +} + + +// Helpers + +static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *param0, const void *param1) +{ + struct metadata_cmd cmd; + cmd.opcode = opcode; + cmd.param[0] = param0; + cmd.param[1] = param1; + cmd.completion = NULL; + metadata_enq_cmd(&metasync_worker, &cmd); + +} + +// Public +void metaqueue_chart_update(RRDSET *st) +{ + const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id)); + queue_metadata_cmd(METADATA_ADD_CHART, acquired_st, NULL); +} + +// +// RD may not be collected, so we may store it needlessly +void metaqueue_dimension_update(RRDDIM *rd) +{ + const DICTIONARY_ITEM *acquired_rd = + dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id)); + + if (unlikely(rrdset_flag_check(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE))) { + metaqueue_chart_update(rd->rrdset); + rrdset_flag_clear(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE); + } + + queue_metadata_cmd(METADATA_ADD_DIMENSION, acquired_rd, NULL); +} + +void metaqueue_dimension_update_flags(RRDDIM *rd) +{ + const DICTIONARY_ITEM *acquired_rd = + dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id)); + queue_metadata_cmd(METADATA_ADD_DIMENSION_OPTION, acquired_rd, NULL); +} + +void metaqueue_host_update_system_info(RRDHOST *host) +{ + BUFFER *work_buffer = sql_store_host_system_info(host); + + if (unlikely(!work_buffer)) + return; + + queue_metadata_cmd(METADATA_ADD_HOST_SYSTEM_INFO, work_buffer, NULL); +} + +void metaqueue_host_update_info(const char *machine_guid) +{ + const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid); + queue_metadata_cmd(METADATA_ADD_HOST_INFO, acquired_host, NULL); +} + +void metaqueue_delete_dimension_uuid(uuid_t *uuid) +{ + if (unlikely(!metasync_worker.loop)) + return; + uuid_t *use_uuid = mallocz(sizeof(*uuid)); + uuid_copy(*use_uuid, *uuid); + queue_metadata_cmd(METADATA_DEL_DIMENSION, use_uuid, NULL); +} + +void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid) +{ + if (unlikely(!host_uuid)) + return; + + uuid_t *local_host_uuid = mallocz(sizeof(*host_uuid)); + uuid_t *local_claim_uuid = NULL; + + uuid_copy(*local_host_uuid, *host_uuid); + if (likely(claim_uuid)) { + local_claim_uuid = mallocz(sizeof(*claim_uuid)); + uuid_copy(*local_claim_uuid, *claim_uuid); + } + queue_metadata_cmd(METADATA_STORE_CLAIM_ID, local_host_uuid, local_claim_uuid); +} + +void metaqueue_store_host_labels(const char *machine_guid) +{ + const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid); + queue_metadata_cmd(METADATA_STORE_HOST_LABELS, acquired_host, NULL); +} + +void metaqueue_buffer(BUFFER *buffer) +{ + queue_metadata_cmd(METADATA_STORE_BUFFER, buffer, NULL); +} + +void metaqueue_chart_labels(RRDSET *st) +{ + const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id)); + queue_metadata_cmd(METADATA_ADD_CHART_LABEL, acquired_st, NULL); +} + + +// +// unitests +// + +static void *unittest_queue_metadata(void *arg) { + struct thread_unittest *tu = arg; + + struct metadata_cmd cmd; + cmd.opcode = METADATA_UNITTEST; + cmd.param[0] = tu; + cmd.param[1] = NULL; + cmd.completion = NULL; + metadata_enq_cmd(&metasync_worker, &cmd); + + do { + __atomic_fetch_add(&tu->added, 1, __ATOMIC_SEQ_CST); + metadata_enq_cmd(&metasync_worker, &cmd); + sleep_usec(10000); + } while (!__atomic_load_n(&tu->join, __ATOMIC_RELAXED)); + return arg; +} + +static void *metadata_unittest_threads(void) +{ + + unsigned done; + + struct thread_unittest tu = { + .join = 0, + .added = 0, + .processed = 0, + .done = &done, + }; + + // Queue messages / Time it + time_t seconds_to_run = 5; + int threads_to_create = 4; + fprintf( + stderr, + "\nChecking metadata queue using %d threads for %lld seconds...\n", + threads_to_create, + (long long)seconds_to_run); + + netdata_thread_t threads[threads_to_create]; + tu.join = 0; + for (int i = 0; i < threads_to_create; i++) { + char buf[100 + 1]; + snprintf(buf, 100, "meta%d", i); + netdata_thread_create( + &threads[i], + buf, + NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE, + unittest_queue_metadata, + &tu); + } + uv_async_send(&metasync_worker.async); + sleep_usec(seconds_to_run * USEC_PER_SEC); + + __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED); + for (int i = 0; i < threads_to_create; i++) { + void *retval; + netdata_thread_join(threads[i], &retval); + } +// uv_async_send(&metasync_worker.async); + sleep_usec(5 * USEC_PER_SEC); + + fprintf(stderr, "Added %u elements, processed %u\n", tu.added, tu.processed); + + return 0; +} + +int metadata_unittest(void) +{ + metadata_sync_init(); + + // Queue items for a specific period of time + metadata_unittest_threads(); + + fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size); + metadata_sync_shutdown(); + + return 0; +} diff --git a/database/sqlite/sqlite_metadata.h b/database/sqlite/sqlite_metadata.h new file mode 100644 index 00000000..9293facf --- /dev/null +++ b/database/sqlite/sqlite_metadata.h @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_SQLITE_METADATA_H +#define NETDATA_SQLITE_METADATA_H + +#include "sqlite3.h" +#include "sqlite_functions.h" + +// To initialize and shutdown +void metadata_sync_init(void); +void metadata_sync_shutdown(void); +void metadata_sync_shutdown_prepare(void); + +void metaqueue_dimension_update(RRDDIM *rd); +void metaqueue_chart_update(RRDSET *st); +void metaqueue_dimension_update_flags(RRDDIM *rd); +void metaqueue_host_update_system_info(RRDHOST *host); +void metaqueue_host_update_info(const char *machine_guid); +void metaqueue_delete_dimension_uuid(uuid_t *uuid); +void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid); +void metaqueue_store_host_labels(const char *machine_guid); +void metaqueue_chart_labels(RRDSET *st); +void migrate_localhost(uuid_t *host_uuid); +void metaqueue_buffer(BUFFER *buffer); + +// UNIT TEST +int metadata_unittest(void); +#endif //NETDATA_SQLITE_METADATA_H |