diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-06-09 04:52:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-06-09 04:52:39 +0000 |
commit | 89f3604407aff8f4cb2ed958252c61e23c767e24 (patch) | |
tree | 7fbf408102cab051557d38193524d8c6e991d070 /database/sqlite | |
parent | Adding upstream version 1.34.1. (diff) | |
download | netdata-89f3604407aff8f4cb2ed958252c61e23c767e24.tar.xz netdata-89f3604407aff8f4cb2ed958252c61e23c767e24.zip |
Adding upstream version 1.35.0.upstream/1.35.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 118 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 15 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 172 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.h | 1 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 315 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.h | 15 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 21 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.h | 1 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.c | 123 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.h | 6 | ||||
-rw-r--r-- | database/sqlite/sqlite_health.c | 164 |
11 files changed, 774 insertions, 177 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 989328097..950856d9a 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -10,6 +10,11 @@ #include "../../aclk/aclk.h" #endif +void sanity_check(void) { + // make sure the compiler will stop on misconfigurations + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); +} + const char *aclk_sync_config[] = { "CREATE TABLE IF NOT EXISTS dimension_delete (dimension_id blob, dimension_name text, chart_type_id text, " "dim_id blob, chart_id blob, host_id blob, date_created);", @@ -29,6 +34,28 @@ const char *aclk_sync_config[] = { uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; +int retention_running = 0; + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void stop_retention_run() +{ + uv_mutex_lock(&aclk_async_lock); + retention_running = 0; + uv_mutex_unlock(&aclk_async_lock); +} + +static int request_retention_run() +{ + int rc = 0; + uv_mutex_lock(&aclk_async_lock); + if (unlikely(retention_running)) + rc = 1; + else + retention_running = 1; + uv_mutex_unlock(&aclk_async_lock); + return rc; +} +#endif int claimed() { @@ -313,9 +340,6 @@ static void timer_cb(uv_timer_t* handle) if (aclk_use_new_cloud_arch && aclk_connected) { if (wc->rotation_after && wc->rotation_after < now) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - aclk_database_enq_cmd_noblock(wc, &cmd); - cmd.opcode = ACLK_DATABASE_UPD_RETENTION; if (!aclk_database_enq_cmd_noblock(wc, &cmd)) wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL; @@ -339,7 +363,7 @@ static void timer_cb(uv_timer_t* handle) } } - if (wc->alert_updates) { + if (wc->alert_updates && !wc->pause_alert_updates) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; cmd.count = ACLK_MAX_ALERT_UPDATES; aclk_database_enq_cmd_noblock(wc, &cmd); @@ -348,10 +372,65 @@ static void timer_cb(uv_timer_t* handle) #endif } + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void after_send_retention(uv_work_t *req, int status) +{ + struct aclk_database_worker_config *wc = req->data; + (void)status; + stop_retention_run(); + wc->retention_running = 0; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DIM_DELETION; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a dimension deletion message"); + + cmd.opcode = ACLK_DATABASE_NODE_INFO; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a node update info message"); +} + + +static void send_retention(uv_work_t *req) +{ + struct aclk_database_worker_config *wc = req->data; + + if (unlikely(wc->is_shutting_down)) + return; + + aclk_update_retention(wc); +} +#endif + #define MAX_CMD_BATCH_SIZE (256) void aclk_database_worker(void *arg) { + worker_register("ACLKSYNC"); + worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + worker_register_job_name(ACLK_DATABASE_ADD_CHART, "chart add"); + worker_register_job_name(ACLK_DATABASE_ADD_DIMENSION, "dimension add"); + worker_register_job_name(ACLK_DATABASE_PUSH_CHART, "chart push"); + worker_register_job_name(ACLK_DATABASE_PUSH_CHART_CONFIG, "chart conf push"); + worker_register_job_name(ACLK_DATABASE_RESET_CHART, "chart reset"); + worker_register_job_name(ACLK_DATABASE_CHART_ACK, "chart ack"); + worker_register_job_name(ACLK_DATABASE_UPD_RETENTION, "retention check"); + worker_register_job_name(ACLK_DATABASE_DIM_DELETION, "dimension delete"); + worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan"); +#endif + worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log"); + worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); + worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); + worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); + worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); + worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); + struct aclk_database_worker_config *wc = arg; uv_loop_t *loop; int ret; @@ -401,6 +480,7 @@ void aclk_database_worker(void *arg) memset(&cmd, 0, sizeof(cmd)); #ifdef ENABLE_NEW_CLOUD_PROTOCOL + uv_work_t retention_work; sql_get_last_chart_sequence(wc); wc->chart_payload_count = sql_get_pending_count(wc); if (!wc->chart_payload_count) @@ -412,7 +492,9 @@ void aclk_database_worker(void *arg) wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY; debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); + while (likely(!netdata_exit)) { + worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ @@ -427,6 +509,10 @@ void aclk_database_worker(void *arg) opcode = cmd.opcode; ++cmd_batch_size; + + if(likely(opcode != ACLK_DATABASE_NOOP)) + worker_is_busy(opcode); + switch (opcode) { case ACLK_DATABASE_NOOP: /* the command queue was empty, do nothing */ @@ -439,6 +525,7 @@ void aclk_database_worker(void *arg) if (wc->host == localhost) sql_check_aclk_table_list(wc); break; + case ACLK_DATABASE_DELETE_HOST: debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data); sql_delete_aclk_table_list(wc, cmd); @@ -504,9 +591,21 @@ void aclk_database_worker(void *arg) aclk_process_dimension_deletion(wc, cmd); break; case ACLK_DATABASE_UPD_RETENTION: + if (unlikely(wc->retention_running)) + break; + + if (unlikely(request_retention_run())) { + wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY; + break; + } + debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str); - aclk_update_retention(wc, cmd); - aclk_process_dimension_deletion(wc, cmd); + retention_work.data = wc; + wc->retention_running = 1; + if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) { + wc->retention_running = 0; + stop_retention_run(); + } break; // NODE_INSTANCE DETECTION @@ -535,6 +634,8 @@ void aclk_database_worker(void *arg) cmd.completion = NULL; wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd); } + if (localhost == wc->host) + (void) sqlite3_wal_checkpoint(db_meta, NULL); break; default: debug(D_ACLK_SYNC, "%s: default.", __func__); @@ -577,6 +678,8 @@ void aclk_database_worker(void *arg) wc->host->dbsync_worker = NULL; freez(wc); rrd_unlock(); + + worker_unregister(); return; error_after_timer_init: @@ -585,6 +688,7 @@ error_after_async_init: fatal_assert(0 == uv_loop_close(loop)); error_after_loop_init: freez(loop); + worker_unregister(); } // ------------------------------------------------------------- @@ -628,7 +732,7 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) db_execute(buffer_tostring(sql)); buffer_flush(sql); - buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str, uuid_str, uuid_str); + buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str); db_execute(buffer_tostring(sql)); buffer_flush(sql); diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 894d93489..37e3d4530 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -16,7 +16,8 @@ #endif #define ACLK_MAX_ALERT_UPDATES (5) #define ACLK_DATABASE_CLEANUP_FIRST (60) -#define ACLK_DATABASE_ROTATION_DELAY (60) +#define ACLK_DATABASE_ROTATION_DELAY (180) +#define ACLK_DATABASE_RETENTION_RETRY (60) #define ACLK_DATABASE_CLEANUP_INTERVAL (3600) #define ACLK_DATABASE_ROTATION_INTERVAL (3600) #define ACLK_DELETE_ACK_INTERNAL (600) @@ -103,9 +104,7 @@ static inline char *get_str_from_uuid(uuid_t *uuid) #define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ "alert_unique_id, date_created, date_submitted, date_cloud_ack, " \ - "unique(alert_unique_id)); " \ - "insert into aclk_alert_%s (alert_unique_id, date_created) " \ - "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s where new_status <> 0 and new_status <> -2 order by unique_id asc on conflict (alert_unique_id) do nothing;" + "unique(alert_unique_id));" #define INDEX_ACLK_CHART "CREATE INDEX IF NOT EXISTS aclk_chart_index_%s ON aclk_chart_%s (unique_id);" @@ -135,7 +134,11 @@ enum aclk_database_opcode { ACLK_DATABASE_PUSH_ALERT_CONFIG, ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, ACLK_DATABASE_QUEUE_REMOVED_ALERTS, - ACLK_DATABASE_TIMER + ACLK_DATABASE_TIMER, + + // leave this last + // we need it to check for worker utilization + ACLK_MAX_ENUMERATIONS_DEFINED }; struct aclk_chart_payload_t { @@ -176,6 +179,7 @@ struct aclk_database_worker_config { uint64_t alerts_batch_id; // batch id for alerts to use uint64_t alerts_start_seq_id; // cloud has asked to start streaming from uint64_t alert_sequence_id; // last alert sequence_id + int pause_alert_updates; uint32_t chart_payload_count; uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message @@ -194,6 +198,7 @@ struct aclk_database_worker_config { int node_info_send; int chart_pending; int chart_reset_count; + int retention_running; volatile unsigned is_shutting_down; volatile unsigned is_orphan; struct aclk_database_worker_config *next; diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 54e8be4a7..53c6c2a65 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -8,9 +8,120 @@ #include "../../aclk/aclk.h" #endif +time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) { + sqlite3_stmt *res = NULL; + int rc = 0; + time_t when = 0; + char sql[ACLK_SYNC_QUERY_SIZE]; + + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \ + "and unique_id > %u and unique_id < %u " \ + "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id); + + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to find removed gap."); + return 0; + } + + rc = sqlite3_step(res); + if (likely(rc == SQLITE_ROW)) { + when = (time_t) sqlite3_column_int64(res, 0); + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc); + + return when; +} + +#define MAX_REMOVED_PERIOD 900 +//decide if some events should be sent or not +int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) +{ + sqlite3_stmt *res = NULL; + char uuid_str[GUID_LEN + 1]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + int send = 1, rc = 0; + + if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) { + return 0; + } + + if (unlikely(uuid_is_null(ae->config_hash_id))) + return 0; + + char sql[ACLK_SYNC_QUERY_SIZE]; + uuid_t config_hash_id; + RRDCALC_STATUS status; + uint32_t unique_id; + + //get the previous sent event of this alarm_id + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \ + where hl.unique_id = aa.alert_unique_id \ + and hl.alarm_id = %u and hl.unique_id <> %u \ + order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id); + + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to filter alert events."); + send = 1; + return send; + } + + rc = sqlite3_step(res); + if (likely(rc == SQLITE_ROW)) { + status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); + if (sqlite3_column_type(res, 1) != SQLITE_NULL) + uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1))); + unique_id = (uint32_t) sqlite3_column_int64(res, 2); + + } else { + send = 1; + goto done; + } + + if (ae->new_status != (RRDCALC_STATUS)status) { + send = 1; + goto done; + } + + if (uuid_compare(ae->config_hash_id, config_hash_id)) { + send = 1; + goto done; + } + + //same status, same config + if (ae->new_status == RRDCALC_STATUS_CLEAR) { + send = 0; + goto done; + } + + //detect a long off period of the agent, TODO make global + if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) { + time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str); + + if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) { + send = 1; + goto done; + } else { + send = 0; + goto done; + } + } + +done: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc); + + return send; +} + // will replace call to aclk_update_alarm in health/health_log.c // and handle both cases -int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) +int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) { //check aclk architecture and handle old json alarm update to cloud //include also the valid statuses for this case @@ -30,17 +141,15 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) if (!claimed()) return 0; - if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) + if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) { return 0; + } - if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) - return 0; - - if (unlikely(!host->dbsync_worker)) - return 1; - - if (unlikely(uuid_is_null(ae->config_hash_id))) - return 0; + if (!skip_filter) { + if (!should_send_to_cloud(host, ae)) { + return 0; + } + } int rc = 0; @@ -76,6 +185,10 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) } ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; + if (wc) { + wc->pause_alert_updates = 0; + } bind_fail: if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK)) @@ -86,6 +199,7 @@ bind_fail: #else UNUSED(host); UNUSED(ae); + UNUSED(skip_filter); #endif return 0; } @@ -283,6 +397,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d wc->alerts_batch_id); log_first_sequence_id = 0; log_last_sequence_id = 0; + wc->pause_alert_updates = 1; } rc = sqlite3_finalize(res); @@ -296,6 +411,27 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d return; } +void sql_queue_existing_alerts_to_aclk(RRDHOST *host) +{ + char uuid_str[GUID_LEN + 1]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + BUFFER *sql = buffer_create(1024); + + buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \ + "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \ + "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ + "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str); + + db_execute(buffer_tostring(sql)); + + buffer_free(sql); + + struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker; + if (wc) { + wc->pause_alert_updates = 0; + } +} + void aclk_send_alarm_health_log(char *node_id) { if (unlikely(!node_id)) @@ -421,6 +557,8 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a freez(claim_id); buffer_free(sql); + + aclk_alert_reloaded = 1; #endif return; @@ -593,6 +731,9 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } + + if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1)) + sql_queue_existing_alerts_to_aclk(host); } else wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); @@ -602,6 +743,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start wc->alerts_batch_id = batch_id; wc->alerts_start_seq_id = start_seq_id; wc->alert_updates = 1; + wc->pause_alert_updates = 0; __sync_synchronize(); } else @@ -631,9 +773,11 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config db_execute(buffer_tostring(sql)); - log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A"); buffer_free(sql); + + wc->pause_alert_updates = 0; #endif return; } @@ -644,6 +788,9 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host) if (unlikely(!host->dbsync_worker)) return; + if (!claimed()) + return; + struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS; @@ -912,9 +1059,6 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host) if (!claimed()) return; - if (unlikely(!host->dbsync_worker)) - return; - char uuid_str[GUID_LEN + 1]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h index 957cb94ac..0181b4842 100644 --- a/database/sqlite/sqlite_aclk_alert.h +++ b/database/sqlite/sqlite_aclk_alert.h @@ -26,5 +26,6 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id); int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status); +extern int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter); #endif //NETDATA_SQLITE_ACLK_ALERT_H diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index 7afa1d451..a9db5282a 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -22,20 +22,20 @@ sql_queue_chart_payload(struct aclk_database_worker_config *wc, void *data, enum return rc; } -static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) +static time_t payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) { static __thread sqlite3_stmt *res = NULL; int rc; - int send_status = 0; + time_t send_status = 0; if (unlikely(!res)) { char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " - "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", - uuid_str, uuid_str); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT acl.date_submitted FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " + "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", + uuid_str, uuid_str); rc = prepare_statement(db_meta, sql, &res); if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to check payload data"); + error_report("Failed to prepare statement to check payload data on %s", sql); return 0; } } @@ -49,7 +49,7 @@ static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payl goto bind_fail; while (sqlite3_step(res) == SQLITE_ROW) { - send_status = sqlite3_column_int(res, 0); + send_status = (time_t) sqlite3_column_int64(res, 0); } bind_fail: @@ -58,23 +58,36 @@ bind_fail: return send_status; } -static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id, - ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status) +static int aclk_add_chart_payload( + struct aclk_database_worker_config *wc, + uuid_t *uuid, + char *claim_id, + ACLK_PAYLOAD_TYPE payload_type, + void *payload, + size_t payload_size, + time_t *send_status, + int check_sent) { static __thread sqlite3_stmt *res_chart = NULL; int rc; + time_t date_submitted; - rc = payload_sent(wc->uuid_str, uuid, payload, payload_size); - if (send_status) - *send_status = rc; - if (rc == 1) + if (unlikely(!payload)) return 0; + if (check_sent) { + date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size); + if (send_status) + *send_status = date_submitted; + if (date_submitted) + return 0; + } + if (unlikely(!res_chart)) { char sql[ACLK_SYNC_QUERY_SIZE]; snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, - "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ - "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str); + "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ + "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str); rc = prepare_statement(db_meta, sql, &res_chart); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to store chart payload data"); @@ -146,7 +159,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat chart_payload.id = strdupz(st->id); struct label_index *labels = &st->state->labels; - netdata_rwlock_wrlock(&labels->labels_rwlock); + netdata_rwlock_rdlock(&labels->labels_rwlock); struct label *label_list = labels->head; struct label *chart_label = NULL; while (label_list) { @@ -159,7 +172,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat size_t size; char *payload = generate_chart_instance_updated(&size, &chart_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL); + rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1); freez(payload); chart_instance_updated_destroy(&chart_payload); } @@ -168,7 +181,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid, const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time, - int *send_status) + time_t *send_status) { int rc = 0; size_t size; @@ -197,7 +210,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w dim_payload.last_timestamp.tv_sec = last_time; char *payload = generate_chart_dimension_updated(&size, &dim_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status); + rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1); freez(payload); return rc; } @@ -271,39 +284,22 @@ bind_fail: int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) { - int rc = 0; + int rc = 1; CHECK_SQLITE_CONNECTION(db_meta); - char *claim_id = is_agent_claimed(); - - RRDDIM *rd = cmd.data; - - if (likely(claim_id)) { - int send_status = 0; - time_t now = now_realtime_sec(); - - time_t first_t = rd->state->query_ops.oldest_time(rd); - time_t last_t = rd->state->query_ops.latest_time(rd); - - int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); + struct aclk_chart_dimension_data *aclk_cd_data = cmd.data; - rc = aclk_upd_dimension_event( - wc, - claim_id, - &rd->state->metric_uuid, - rd->id, - rd->name, - rd->rrdset->id, - first_t, - live ? 0 : last_t, - &send_status); + char *claim_id = is_agent_claimed(); + if (!claim_id) + goto cleanup; - if (!send_status) - rd->state->aclk_live_status = live; + rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION, + (void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, aclk_cd_data->check_payload); - freez(claim_id); - } - rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); + freez(claim_id); +cleanup: + freez(aclk_cd_data->payload); + freez(aclk_cd_data); return rc; } @@ -337,6 +333,12 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d char sql[ACLK_SYNC_QUERY_SIZE]; static __thread sqlite3_stmt *res = NULL; + char *hostname = NULL; + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(wc->node_id); + if (unlikely(!res)) { snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \ "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \ @@ -346,6 +348,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to send a chart update via ACLK"); freez(claim_id); + freez(hostname); return; } } @@ -419,7 +422,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", first_sequence, last_sequence, wc->batch_id); @@ -440,7 +443,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", now_realtime_sec() - wc->startup_time); } @@ -459,6 +462,7 @@ bind_fail: error_report("Failed to reset statement when pushing chart events, rc = %d", rc); freez(claim_id); + freez(hostname); return; } @@ -562,7 +566,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_ error_report("Failed to ACK sequence id, rc = %d", rc); else log_access( - "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED in the database upto %" PRIu64, + "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED IN THE DATABASE UP TO %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1); @@ -583,8 +587,13 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl cmd.param1); db_execute(buffer_tostring(sql)); if (cmd.param1 == 1) { + char *hostname = NULL; + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(wc->node_id); buffer_flush(sql); - log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, hostname? hostname : "N/A"); buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \ "DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str); db_lock(); @@ -609,6 +618,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl RRDDIM *rd; rrddim_foreach_read(rd, st) { + rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); rd->state->aclk_live_status = (rd->state->aclk_live_status == 0); } rrdset_unlock(st); @@ -616,9 +626,10 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl rrdhost_unlock(host); } else error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid); + freez(hostname); } else { log_access( - "ACLK STA [%s (%s)]: Restarting chart sync from sequence %" PRIu64, + "ACLK STA [%s (%s)]: RESTARTING CHART SYNC FROM SEQUENCE %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1); @@ -705,25 +716,28 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at if (unlikely(!node_id)) return; - // log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM from %"PRIu64" t=%ld batch=%"PRIu64, node_id, - // sequence_id, created_at, batch_id); - uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) { log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM ignored, invalid node id", node_id); return; } - struct aclk_database_worker_config *wc = NULL; + struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); rrd_rdlock(); RRDHOST *host = localhost; while(host) { - if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) { + if (wc || (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))) { rrd_unlock(); - wc = (struct aclk_database_worker_config *)host->dbsync_worker ? - (struct aclk_database_worker_config *)host->dbsync_worker : - (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); + if (!wc) + wc = (struct aclk_database_worker_config *)host->dbsync_worker ? + (struct aclk_database_worker_config *)host->dbsync_worker : + (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); + char *hostname = NULL; if (likely(wc)) { + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(node_id); wc->chart_reset_count++; __sync_synchronize(); wc->chart_updates = 0; @@ -731,9 +745,10 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at __sync_synchronize(); wc->batch_created = now_realtime_sec(); log_access( - "ACLK REQ [%s (%s)]: CHARTS STREAM from %" PRIu64 " t=%ld resets=%d", + "ACLK REQ [%s (%s)]: CHARTS STREAM from %"PRIu64" (LOCAL %"PRIu64") t=%ld resets=%d" , wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", + sequence_id + 1, wc->chart_sequence_id, wc->chart_timestamp, wc->chart_reset_count); @@ -742,7 +757,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at "ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST " "remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ", wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", sequence_id, wc->chart_sequence_id, wc->chart_reset_count); @@ -756,7 +771,6 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at freez(chart_reset.claim_id); wc->chart_reset_count = -1; } - return; } else { struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -766,8 +780,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at log_access( "ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", - wc->chart_sequence_id, + hostname ? hostname : "N/A", + sequence_id + 1, wc->chart_timestamp, wc->batch_id); cmd.opcode = ACLK_DATABASE_RESET_CHART; @@ -775,20 +789,15 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at cmd.completion = NULL; aclk_database_enq_cmd(wc, &cmd); } else { -// log_access( -// "ACLK RES [%s (%s)]: CHARTS STREAM from %" PRIu64 -// " t=%ld resets=%d", -// wc->node_id, -// wc->host ? wc->host->hostname : "N/A", -// wc->chart_sequence_id, -// wc->chart_timestamp, -// wc->chart_reset_count); wc->chart_reset_count = 0; wc->chart_updates = 1; } } - } else - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); + } else { + hostname = get_hostname_by_node_id(node_id); + log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, hostname ? hostname : "N/A"); + } + freez(hostname); return; } host = host->next; @@ -838,9 +847,8 @@ failed: "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \ "WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;" -void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +void aclk_update_retention(struct aclk_database_worker_config *wc) { - UNUSED(cmd); int rc; if (!aclk_use_new_cloud_arch || !aclk_connected) @@ -887,7 +895,10 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d time_t last_entry_t; uint32_t update_every = 0; uint32_t dimension_update_count = 0; - int send_status; + uint32_t total_checked = 0; + uint32_t total_deleted= 0; + uint32_t total_stopped= 0; + time_t send_status; struct retention_updated rotate_data; @@ -904,7 +915,9 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.node_id = strdupz(wc->node_id); time_t now = now_realtime_sec(); - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step(res) == SQLITE_ROW && dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP) { + if (unlikely(netdata_exit)) + break; if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) { if (update_every) { debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); @@ -942,23 +955,40 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d if (likely(!rc && first_entry_t)) start_time = MIN(start_time, first_entry_t); - if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) { + if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every)); - if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { - (void)aclk_upd_dimension_event( - wc, - claim_id, - (uuid_t *)sqlite3_column_blob(res, 0), - (const char *)(const char *)sqlite3_column_text(res, 3), - (const char *)(const char *)sqlite3_column_text(res, 4), - (const char *)(const char *)sqlite3_column_text(res, 2), - first_entry_t, - live ? 0 : last_entry_t, - &send_status); - if (!send_status) + if (rc) { + first_entry_t = 0; + last_entry_t = 0; + live = 0; + } + if (!wc->host || !first_entry_t) { + if (!first_entry_t) { + delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0)); + total_deleted++; dimension_update_count++; + } + else { + (void)aclk_upd_dimension_event( + wc, + claim_id, + (uuid_t *)sqlite3_column_blob(res, 0), + (const char *)(const char *)sqlite3_column_text(res, 3), + (const char *)(const char *)sqlite3_column_text(res, 4), + (const char *)(const char *)sqlite3_column_text(res, 2), + first_entry_t, + live ? 0 : last_entry_t, + &send_status); + + if (!send_status) { + if (last_entry_t) + total_stopped++; + dimension_update_count++; + } + } } } + total_checked++; } if (update_every) { debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); @@ -970,7 +1000,20 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.interval_duration_count++; } + char *hostname = NULL; + if (!wc->host) + hostname = get_hostname_by_node_id(wc->node_id); + + if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) + log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", + wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + else + log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", + wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + freez(hostname); + #ifdef NETDATA_INTERNAL_CHECKS + info("Retention update for %s (chart updates = %d)", wc->host_guid, wc->chart_updates); for (int i = 0; i < rotate_data.interval_duration_count; ++i) info( "Update for host %s (node %s) for %u Retention = %u", @@ -979,7 +1022,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.interval_durations[i].update_every, rotate_data.interval_durations[i].retention); #endif - aclk_retention_updated(&rotate_data); + if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) + aclk_retention_updated(&rotate_data); freez(rotate_data.node_id); freez(rotate_data.interval_durations); @@ -1048,11 +1092,64 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc) return; } -int queue_dimension_to_aclk(RRDDIM *rd) +void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) { - int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker, - rd, ACLK_DATABASE_ADD_DIMENSION); - return rc; + int live = !last_updated; + + if (likely(rd->state->aclk_live_status == live)) + return; + + time_t created_at = rd->state->query_ops.oldest_time(rd); + + if (unlikely(!created_at && rd->updated)) + created_at = rd->last_collected_time.tv_sec; + + rd->state->aclk_live_status = live; + + struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker; + if (unlikely(!wc)) + return; + + char *claim_id = is_agent_claimed(); + if (unlikely(!claim_id)) + return; + + struct chart_dimension_updated dim_payload; + memset(&dim_payload, 0, sizeof(dim_payload)); + dim_payload.node_id = wc->node_id; + dim_payload.claim_id = claim_id; + dim_payload.name = rd->name; + dim_payload.id = rd->id; + dim_payload.chart_id = rd->rrdset->id; + dim_payload.created_at.tv_sec = created_at; + dim_payload.last_timestamp.tv_sec = last_updated; + + size_t size = 0; + char *payload = generate_chart_dimension_updated(&size, &dim_payload); + + freez(claim_id); + if (unlikely(!payload)) + return; + + struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data)); + uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid); + aclk_cd_data->payload = payload; + aclk_cd_data->payload_size = size; + aclk_cd_data->check_payload = 1; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + + cmd.opcode = ACLK_DATABASE_ADD_DIMENSION; + cmd.data = aclk_cd_data; + int rc = aclk_database_enq_cmd_noblock(wc, &cmd); + + if (unlikely(rc)) { + freez(aclk_cd_data->payload); + freez(aclk_cd_data); + rd->state->aclk_live_status = !live; + } + return; } void aclk_send_dimension_update(RRDDIM *rd) @@ -1203,6 +1300,12 @@ void sql_check_chart_liveness(RRDSET *st) { return; rrdset_rdlock(st); + + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + rrdset_unlock(st); + return; + } + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) { debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name); @@ -1215,20 +1318,8 @@ void sql_check_chart_liveness(RRDSET *st) { debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name); rrddim_foreach_read(rd, st) { - if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { - int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every; - if (unlikely(live != rd->state->aclk_live_status)) { - if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - if (likely(!queue_dimension_to_aclk(rd))) { - debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live); - rd->state->aclk_live_status = live; - rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); - } - } - } - else - debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name); - } + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) + queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); } rrdset_unlock(st); } diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h index 1d25de24e..84325bf6c 100644 --- a/database/sqlite/sqlite_aclk_chart.h +++ b/database/sqlite/sqlite_aclk_chart.h @@ -16,10 +16,21 @@ extern sqlite3 *db_meta; #define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3) #endif +#ifndef RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER +#define RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER (30) +#endif + #ifndef ACLK_MAX_DIMENSION_CLEANUP #define ACLK_MAX_DIMENSION_CLEANUP (500) #endif +struct aclk_chart_dimension_data { + uuid_t uuid; + char *payload; + size_t payload_size; + uint8_t check_payload; +}; + struct aclk_chart_sync_stats { int updates; uint64_t batch_id; @@ -37,9 +48,8 @@ struct aclk_chart_sync_stats { }; extern int queue_chart_to_aclk(RRDSET *st); -extern int queue_dimension_to_aclk(RRDDIM *rd); +extern void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated); extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); -extern int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae); int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); @@ -57,4 +67,5 @@ uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc); void aclk_send_dimension_update(RRDDIM *rd); struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host); void sql_check_chart_liveness(RRDSET *st); +void aclk_update_retention(struct aclk_database_worker_config *wc); #endif //NETDATA_SQLITE_ACLK_CHART_H diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c index 97e6bebd1..239a24b8c 100644 --- a/database/sqlite/sqlite_aclk_node.c +++ b/database/sqlite/sqlite_aclk_node.c @@ -24,6 +24,15 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.child = (wc->host != localhost); node_info.ml_info.ml_capable = ml_capable(localhost); node_info.ml_info.ml_enabled = ml_enabled(wc->host); + + struct capability instance_caps[] = { + { .name = "proto", .version = 1, .enabled = 1 }, + { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(wc->host) }, + { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_info.node_instance_capabilities = instance_caps; + now_realtime_timeval(&node_info.updated_at); RRDHOST *host = wc->host; @@ -47,7 +56,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.memory = host->system_info->host_ram_total ? host->system_info->host_ram_total : "0"; node_info.data.disk_space = host->system_info->host_disk_space ? host->system_info->host_disk_space : "0"; node_info.data.version = host_version ? host_version : VERSION; - node_info.data.release_channel = "nightly"; + node_info.data.release_channel = (char *) get_release_channel(); node_info.data.timezone = (char *) host->abbrev_timezone; node_info.data.virtualization_type = host->system_info->virtualization ? host->system_info->virtualization : "unknown"; node_info.data.container_type = host->system_info->container ? host->system_info->container : "unknown"; @@ -55,11 +64,19 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.services = NULL; // char ** node_info.data.service_count = 0; node_info.data.machine_guid = wc->host_guid; + + struct capability node_caps[] = { + { .name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled }, + { .name = "mc", .version = host->system_info->mc_version ? host->system_info->mc_version : 0, .enabled = host->system_info->mc_version ? 1 : 0 }, + { .name = NULL, .version = 0, .enabled = 0 } + }; + node_info.node_capabilities = node_caps; + node_info.data.ml_info.ml_capable = host->system_info->ml_capable; node_info.data.ml_info.ml_enabled = host->system_info->ml_enabled; struct label_index *labels = &host->labels; - netdata_rwlock_wrlock(&labels->labels_rwlock); + netdata_rwlock_rdlock(&labels->labels_rwlock); node_info.data.host_labels_head = labels->head; aclk_update_node_info(&node_info); diff --git a/database/sqlite/sqlite_aclk_node.h b/database/sqlite/sqlite_aclk_node.h index 9cb411586..b8f8c6bbf 100644 --- a/database/sqlite/sqlite_aclk_node.h +++ b/database/sqlite/sqlite_aclk_node.h @@ -4,5 +4,4 @@ #define NETDATA_SQLITE_ACLK_NODE_H void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); #endif //NETDATA_SQLITE_ACLK_NODE_H diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index 1e1d2a741..502633c67 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -5,8 +5,6 @@ #define DB_METADATA_VERSION "1" const char *database_config[] = { - "PRAGMA auto_vacuum=incremental; PRAGMA synchronous=1 ; PRAGMA journal_mode=WAL; PRAGMA temp_store=MEMORY;", - "PRAGMA journal_size_limit=16777216;", "CREATE TABLE IF NOT EXISTS host(host_id blob PRIMARY KEY, hostname text, " "registry_hostname text, update_every int, os text, timezone text, tags text);", "CREATE TABLE IF NOT EXISTS chart(chart_id blob PRIMARY KEY, host_id blob, type text, id text, name text, " @@ -62,6 +60,9 @@ const char *database_cleanup[] = { "delete from chart where chart_id not in (select chart_id from dimension);", "delete from host where host_id not in (select host_id from chart);", "delete from chart_label where chart_id not in (select chart_id from chart);", + "DELETE FROM chart_hash_map WHERE chart_id NOT IN (SELECT chart_id FROM chart);", + "DELETE FROM chart_hash WHERE hash_id NOT IN (SELECT hash_id FROM chart_hash_map);", + "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);", NULL }; @@ -72,10 +73,12 @@ static uv_mutex_t sqlite_transaction_lock; int execute_insert(sqlite3_stmt *res) { int rc; - - while ((rc = sqlite3_step(res)) != SQLITE_DONE && unlikely(netdata_exit)) { - if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) + int cnt = 0; + while ((rc = sqlite3_step(res)) != SQLITE_DONE && ++cnt < SQL_MAX_RETRY && likely(!netdata_exit)) { + if (likely(rc == SQLITE_BUSY || rc == SQLITE_LOCKED)) { usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); + error_report("Failed to insert/update, rc = %d -- attempt %d", rc, cnt); + } else { error_report("SQLite error %d", rc); break; @@ -93,8 +96,12 @@ static void add_stmt_to_list(sqlite3_stmt *res) static sqlite3_stmt *statements[MAX_OPEN_STATEMENTS]; if (unlikely(!res)) { - while (idx > 0) - sqlite3_finalize(statements[--idx]); + while (idx > 0) { + int rc; + rc = sqlite3_finalize(statements[--idx]); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement during shutdown, rc = %d", rc); + } return; } @@ -302,7 +309,7 @@ static int attempt_database_fix() error_report("Failed to close database, rc = %d", rc); info("Attempting to fix database"); db_meta = NULL; - return sql_init_database(DB_CHECK_FIX_DB | DB_CHECK_CONT); + return sql_init_database(DB_CHECK_FIX_DB | DB_CHECK_CONT, 0); } static int init_database_batch(int rebuild, int init_type, const char *batch[]) @@ -333,13 +340,17 @@ static int init_database_batch(int rebuild, int init_type, const char *batch[]) * Initialize the SQLite database * Return 0 on success */ -int sql_init_database(db_check_action_type_t rebuild) +int sql_init_database(db_check_action_type_t rebuild, int memory) { char *err_msg = NULL; char sqlite_database[FILENAME_MAX + 1]; int rc; - snprintfz(sqlite_database, FILENAME_MAX, "%s/netdata-meta.db", netdata_configured_cache_dir); + if (likely(!memory)) + snprintfz(sqlite_database, FILENAME_MAX, "%s/netdata-meta.db", netdata_configured_cache_dir); + else + strcpy(sqlite_database, ":memory:"); + rc = sqlite3_open(sqlite_database, &db_meta); if (rc != SQLITE_OK) { error_report("Failed to initialize database at %s, due to \"%s\"", sqlite_database, sqlite3_errstr(rc)); @@ -390,6 +401,40 @@ int sql_init_database(db_check_action_type_t rebuild) info("SQLite database %s initialization", sqlite_database); + char buf[1024 + 1] = ""; + const char *list[2] = { buf, NULL }; + + // https://www.sqlite.org/pragma.html#pragma_auto_vacuum + // PRAGMA schema.auto_vacuum = 0 | NONE | 1 | FULL | 2 | INCREMENTAL; + snprintfz(buf, 1024, "PRAGMA auto_vacuum=%s;", config_get(CONFIG_SECTION_SQLITE, "auto vacuum", "INCREMENTAL")); + if(init_database_batch(rebuild, 0, list)) return 1; + + // https://www.sqlite.org/pragma.html#pragma_synchronous + // PRAGMA schema.synchronous = 0 | OFF | 1 | NORMAL | 2 | FULL | 3 | EXTRA; + snprintfz(buf, 1024, "PRAGMA synchronous=%s;", config_get(CONFIG_SECTION_SQLITE, "synchronous", "NORMAL")); + if(init_database_batch(rebuild, 0, list)) return 1; + + // https://www.sqlite.org/pragma.html#pragma_journal_mode + // PRAGMA schema.journal_mode = DELETE | TRUNCATE | PERSIST | MEMORY | WAL | OFF + snprintfz(buf, 1024, "PRAGMA journal_mode=%s;", config_get(CONFIG_SECTION_SQLITE, "journal mode", "WAL")); + if(init_database_batch(rebuild, 0, list)) return 1; + + // https://www.sqlite.org/pragma.html#pragma_temp_store + // PRAGMA temp_store = 0 | DEFAULT | 1 | FILE | 2 | MEMORY; + snprintfz(buf, 1024, "PRAGMA temp_store=%s;", config_get(CONFIG_SECTION_SQLITE, "temp store", "MEMORY")); + if(init_database_batch(rebuild, 0, list)) return 1; + + // https://www.sqlite.org/pragma.html#pragma_journal_size_limit + // PRAGMA schema.journal_size_limit = N ; + snprintfz(buf, 1024, "PRAGMA journal_size_limit=%lld;", config_get_number(CONFIG_SECTION_SQLITE, "journal size limit", 16777216)); + if(init_database_batch(rebuild, 0, list)) return 1; + + // https://www.sqlite.org/pragma.html#pragma_cache_size + // PRAGMA schema.cache_size = pages; + // PRAGMA schema.cache_size = -kibibytes; + snprintfz(buf, 1024, "PRAGMA cache_size=%lld;", config_get_number(CONFIG_SECTION_SQLITE, "cache size", -2000)); + if(init_database_batch(rebuild, 0, list)) return 1; + if (init_database_batch(rebuild, 0, &database_config[0])) return 1; @@ -1160,8 +1205,24 @@ failed: return; } +void free_temporary_host(RRDHOST *host) +{ + if (host) { + freez(host->hostname); + freez((char *)host->os); + freez((char *)host->tags); + freez((char *)host->timezone); + freez(host->program_name); + freez(host->program_version); + freez(host->registry_hostname); + freez(host->system_info); + freez(host); + } +} + #define SELECT_HOST "select host_id, registry_hostname, update_every, os, timezone, tags from host where hostname = @hostname order by rowid desc;" -#define SELECT_HOST_BY_UUID "select host_id, registry_hostname, update_every, os, timezone, tags from host where host_id = @host_id ;" +#define SELECT_HOST_BY_UUID "select h.host_id, h.registry_hostname, h.update_every, h.os, h.timezone, h.tags from host h, node_instance ni " \ + "where (ni.host_id = @host_id or ni.node_id = @host_id) AND ni.host_id = h.host_id;" RRDHOST *sql_create_host_by_uuid(char *hostname) { @@ -1229,8 +1290,6 @@ failed: return host; } -#define SQL_MAX_RETRY 100 - void db_execute(const char *cmd) { int rc; @@ -1430,13 +1489,13 @@ int find_dimension_first_last_t(char *machine_guid, char *chart_id, char *dim_id } #ifdef ENABLE_DBENGINE -static RRDDIM *create_rrdim_entry(RRDSET *st, char *id, char *name, uuid_t *metric_uuid) +static RRDDIM *create_rrdim_entry(ONEWAYALLOC *owa, RRDSET *st, char *id, char *name, uuid_t *metric_uuid) { - RRDDIM *rd = callocz(1, sizeof(*rd)); + RRDDIM *rd = onewayalloc_callocz(owa, 1, sizeof(*rd)); rd->rrdset = st; rd->last_stored_value = NAN; rrddim_flag_set(rd, RRDDIM_FLAG_NONE); - rd->state = mallocz(sizeof(*rd->state)); + rd->state = onewayalloc_mallocz(owa, sizeof(*rd->state)); rd->rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; rd->state->query_ops.init = rrdeng_load_metric_init; rd->state->query_ops.next_metric = rrdeng_load_metric_next; @@ -1444,11 +1503,11 @@ static RRDDIM *create_rrdim_entry(RRDSET *st, char *id, char *name, uuid_t *metr rd->state->query_ops.finalize = rrdeng_load_metric_finalize; rd->state->query_ops.latest_time = rrdeng_metric_latest_time; rd->state->query_ops.oldest_time = rrdeng_metric_oldest_time; - rd->state->rrdeng_uuid = mallocz(sizeof(uuid_t)); + rd->state->rrdeng_uuid = onewayalloc_mallocz(owa, sizeof(uuid_t)); uuid_copy(*rd->state->rrdeng_uuid, *metric_uuid); uuid_copy(rd->state->metric_uuid, *metric_uuid); - rd->id = strdupz(id); - rd->name = strdupz(name); + rd->id = onewayalloc_strdupz(owa, id); + rd->name = onewayalloc_strdupz(owa, name); return rd; } #endif @@ -1465,7 +1524,7 @@ static RRDDIM *create_rrdim_entry(RRDSET *st, char *id, char *name, uuid_t *metr "where d.chart_id = c.chart_id and c.host_id = h.host_id and c.host_id = @host_id and c.type||'.'||c.id = @chart " \ "order by c.chart_id asc, c.type||'.'||c.id desc;" -void sql_build_context_param_list(struct context_param **param_list, RRDHOST *host, char *context, char *chart) +void sql_build_context_param_list(ONEWAYALLOC *owa, struct context_param **param_list, RRDHOST *host, char *context, char *chart) { #ifdef ENABLE_DBENGINE int rc; @@ -1474,7 +1533,7 @@ void sql_build_context_param_list(struct context_param **param_list, RRDHOST *ho return; if (unlikely(!(*param_list))) { - *param_list = mallocz(sizeof(struct context_param)); + *param_list = onewayalloc_mallocz(owa, sizeof(struct context_param)); (*param_list)->first_entry_t = LONG_MAX; (*param_list)->last_entry_t = 0; (*param_list)->rd = NULL; @@ -1523,21 +1582,21 @@ void sql_build_context_param_list(struct context_param **param_list, RRDHOST *ho if (!st || uuid_compare(*(uuid_t *)sqlite3_column_blob(res, 7), chart_id)) { if (unlikely(st && !st->counter)) { - freez(st->context); - freez((char *) st->name); - freez(st); + onewayalloc_freez(owa, st->context); + onewayalloc_freez(owa, (char *) st->name); + onewayalloc_freez(owa, st); } - st = callocz(1, sizeof(*st)); + st = onewayalloc_callocz(owa, 1, sizeof(*st)); char n[RRD_ID_LENGTH_MAX + 1]; snprintfz( n, RRD_ID_LENGTH_MAX, "%s.%s", (char *)sqlite3_column_text(res, 4), (char *)sqlite3_column_text(res, 3)); - st->name = strdupz(n); + st->name = onewayalloc_strdupz(owa, n); st->update_every = sqlite3_column_int(res, 6); st->counter = 0; if (chart) { - st->context = strdupz((char *)sqlite3_column_text(res, 8)); + st->context = onewayalloc_strdupz(owa, (char *)sqlite3_column_text(res, 8)); strncpyz(st->id, chart, RRD_ID_LENGTH_MAX); } uuid_copy(chart_id, *(uuid_t *)sqlite3_column_blob(res, 7)); @@ -1553,7 +1612,7 @@ void sql_build_context_param_list(struct context_param **param_list, RRDHOST *ho st->counter++; st->last_entry_t = MAX(st->last_entry_t, (*param_list)->last_entry_t); - RRDDIM *rd = create_rrdim_entry(st, (char *)sqlite3_column_text(res, 1), (char *)sqlite3_column_text(res, 2), &rrdeng_uuid); + RRDDIM *rd = create_rrdim_entry(owa, st, (char *)sqlite3_column_text(res, 1), (char *)sqlite3_column_text(res, 2), &rrdeng_uuid); if (sqlite3_column_int(res, 9) == 1) rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN); rd->next = (*param_list)->rd; @@ -1561,13 +1620,13 @@ void sql_build_context_param_list(struct context_param **param_list, RRDHOST *ho } if (st) { if (!st->counter) { - freez(st->context); - freez((char *)st->name); - freez(st); + onewayalloc_freez(owa,st->context); + onewayalloc_freez(owa,(char *)st->name); + onewayalloc_freez(owa,st); } else if (!st->context && context) - st->context = strdupz(context); + st->context = onewayalloc_strdupz(owa,context); } failed: diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index 30b8dee6c..d24484774 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -24,6 +24,7 @@ typedef enum db_check_action_type { DB_CHECK_CONT = 0x00008 } db_check_action_type_t; +#define SQL_MAX_RETRY (100) #define SQLITE_INSERT_DELAY (50) // Insert delay in case of lock #define SQL_STORE_HOST "insert or replace into host (host_id,hostname,registry_hostname,update_every,os,timezone,tags) values (?1,?2,?3,?4,?5,?6,?7);" @@ -56,7 +57,7 @@ typedef enum db_check_action_type { return 1; \ } -extern int sql_init_database(db_check_action_type_t rebuild); +extern int sql_init_database(db_check_action_type_t rebuild, int memory); extern void sql_close_database(void); extern int sql_store_host(uuid_t *guid, const char *hostname, const char *registry_hostname, int update_every, const char *os, const char *timezone, const char *tags); @@ -89,7 +90,7 @@ extern void db_unlock(void); extern void db_lock(void); extern void delete_dimension_uuid(uuid_t *dimension_uuid); extern void sql_store_chart_label(uuid_t *chart_uuid, int source_type, char *label, char *value); -extern void sql_build_context_param_list(struct context_param **param_list, RRDHOST *host, char *context, char *chart); +extern void sql_build_context_param_list(ONEWAYALLOC *owa, struct context_param **param_list, RRDHOST *host, char *context, char *chart); extern void store_claim_id(uuid_t *host_id, uuid_t *claim_id); extern int update_node_id(uuid_t *host_id, uuid_t *node_id); extern int get_node_id(uuid_t *host_id, uuid_t *node_id); @@ -100,4 +101,5 @@ extern void sql_load_node_id(RRDHOST *host); extern void compute_chart_hash(RRDSET *st); extern int sql_set_dimension_option(uuid_t *dim_uuid, char *option); char *get_hostname_by_node_id(char *node_id); +void free_temporary_host(RRDHOST *host); #endif //NETDATA_SQLITE_FUNCTIONS_H diff --git a/database/sqlite/sqlite_health.c b/database/sqlite/sqlite_health.c index 8ba95628f..53742a1a6 100644 --- a/database/sqlite/sqlite_health.c +++ b/database/sqlite/sqlite_health.c @@ -433,6 +433,168 @@ void sql_health_alarm_log_count(RRDHOST *host) { info("HEALTH [%s]: Table health_log_%s, contains %lu entries.", host->hostname, uuid_str, host->health_log_entries_written); } +#define SQL_INJECT_REMOVED(guid, guid2) "insert into health_log_%s (hostname, unique_id, alarm_id, alarm_event_id, config_hash_id, updated_by_id, updates_id, when_key, duration, non_clear_duration, flags, exec_run_timestamp, " \ +"delay_up_to_timestamp, name, chart, family, exec, recipient, source, units, info, exec_code, new_status, old_status, delay, new_value, old_value, last_repeat, class, component, type) " \ +"select hostname, ?1, ?2, ?3, config_hash_id, 0, ?4, strftime('%%s'), 0, 0, flags, exec_run_timestamp, " \ +"strftime('%%s'), name, chart, family, exec, recipient, source, units, info, exec_code, -2, new_status, delay, NULL, new_value, 0, class, component, type " \ +"from health_log_%s where unique_id = ?5", guid, guid2 +#define SQL_INJECT_REMOVED_UPDATE(guid) "update health_log_%s set flags = flags | ?1, updated_by_id = ?2 where unique_id = ?3; ", guid +void sql_inject_removed_status(char *uuid_str, uint32_t alarm_id, uint32_t alarm_event_id, uint32_t unique_id, uint32_t max_unique_id) +{ + int rc = 0; + char command[MAX_HEALTH_SQL_SIZE + 1]; + + if (!alarm_id || !alarm_event_id || !unique_id || !max_unique_id) + return; + + sqlite3_stmt *res = NULL; + + snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_INJECT_REMOVED(uuid_str, uuid_str)); + rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to inject removed event"); + return; + } + + rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) max_unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind max_unique_id parameter for SQL_INJECT_REMOVED"); + goto failed; + } + + rc = sqlite3_bind_int64(res, 2, (sqlite3_int64) alarm_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind alarm_id parameter for SQL_INJECT_REMOVED"); + goto failed; + } + + rc = sqlite3_bind_int64(res, 3, (sqlite3_int64) alarm_event_id + 1); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind alarm_event_id parameter for SQL_INJECT_REMOVED"); + goto failed; + } + + rc = sqlite3_bind_int64(res, 4, (sqlite3_int64) unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind unique_id parameter for SQL_INJECT_REMOVED"); + goto failed; + } + + rc = sqlite3_bind_int64(res, 5, (sqlite3_int64) unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind unique_id parameter for SQL_INJECT_REMOVED"); + goto failed; + } + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) { + error_report("HEALTH [N/A]: Failed to execute SQL_INJECT_REMOVED, rc = %d", rc); + goto failed; + } + + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("HEALTH [N/A]: Failed to finalize the prepared statement for injecting removed event."); + + //update the old entry + snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_INJECT_REMOVED_UPDATE(uuid_str)); + rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to update during inject removed event"); + return; + } + + rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) HEALTH_ENTRY_FLAG_UPDATED); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind flags parameter for SQL_INJECT_REMOVED (update)"); + goto failed; + } + + rc = sqlite3_bind_int64(res, 2, (sqlite3_int64) max_unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind max_unique_id parameter for SQL_INJECT_REMOVED (update)"); + goto failed; + } + + rc = sqlite3_bind_int64(res, 3, (sqlite3_int64) unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind unique_id parameter for SQL_INJECT_REMOVED (update)"); + goto failed; + } + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) { + error_report("HEALTH [N/A]: Failed to execute SQL_INJECT_REMOVED_UPDATE, rc = %d", rc); + goto failed; + } + +failed: + if (unlikely(sqlite3_finalize(res) != SQLITE_OK)) + error_report("HEALTH [N/A]: Failed to finalize the prepared statement for injecting removed event."); + return; + +} + +#define SQL_SELECT_MAX_UNIQUE_ID(guid) "SELECT MAX(unique_id) from health_log_%s", guid +uint32_t sql_get_max_unique_id (char *uuid_str) +{ + int rc = 0; + char command[MAX_HEALTH_SQL_SIZE + 1]; + uint32_t max_unique_id = 0; + + sqlite3_stmt *res = NULL; + + snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_SELECT_MAX_UNIQUE_ID(uuid_str)); + rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to get max unique id"); + return 0; + } + + while (sqlite3_step(res) == SQLITE_ROW) { + max_unique_id = (uint32_t) sqlite3_column_int64(res, 0); + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize the statement"); + + return max_unique_id; +} + +#define SQL_SELECT_LAST_STATUSES(guid) "SELECT new_status, unique_id, alarm_id, alarm_event_id from health_log_%s group by alarm_id having max(alarm_event_id)", guid +void sql_check_removed_alerts_state(char *uuid_str) +{ + int rc = 0; + char command[MAX_HEALTH_SQL_SIZE + 1]; + RRDCALC_STATUS status; + uint32_t alarm_id = 0, alarm_event_id = 0, unique_id = 0, max_unique_id = 0; + + sqlite3_stmt *res = NULL; + + snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_SELECT_LAST_STATUSES(uuid_str)); + rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to check removed statuses"); + return; + } + + while (sqlite3_step(res) == SQLITE_ROW) { + status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); + unique_id = (uint32_t) sqlite3_column_int64(res, 1); + alarm_id = (uint32_t) sqlite3_column_int64(res, 2); + alarm_event_id = (uint32_t) sqlite3_column_int64(res, 3); + if (unlikely(status != RRDCALC_STATUS_REMOVED)) { + if (unlikely(!max_unique_id)) + max_unique_id = sql_get_max_unique_id (uuid_str); + sql_inject_removed_status (uuid_str, alarm_id, alarm_event_id, unique_id, ++max_unique_id); + } + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize the statement"); +} + /* Health related SQL queries Load from the health log table */ @@ -454,6 +616,8 @@ void sql_health_alarm_log_load(RRDHOST *host) { char uuid_str[GUID_LEN + 1]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + sql_check_removed_alerts_state(uuid_str); + snprintfz(command, MAX_HEALTH_SQL_SIZE, SQL_LOAD_HEALTH_LOG(uuid_str, host->health_log.max)); rc = sqlite3_prepare_v2(db_meta, command, -1, &res, 0); |