summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.h
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk.h')
-rw-r--r--database/sqlite/sqlite_aclk.h98
1 files changed, 15 insertions, 83 deletions
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index b73f422e1..06d5d0270 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -5,8 +5,6 @@
#include "sqlite3.h"
-// TODO: To be added
-#include "../../aclk/schema-wrappers/chart_stream.h"
#ifndef ACLK_MAX_CHART_BATCH
#define ACLK_MAX_CHART_BATCH (200)
@@ -15,15 +13,9 @@
#define ACLK_MAX_CHART_BATCH_COUNT (10)
#endif
#define ACLK_MAX_ALERT_UPDATES (5)
-#define ACLK_DATABASE_CLEANUP_FIRST (60)
-#define ACLK_DATABASE_ROTATION_DELAY (180)
-#define ACLK_DATABASE_RETENTION_RETRY (60)
+#define ACLK_DATABASE_CLEANUP_FIRST (1200)
#define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
-#define ACLK_DATABASE_ROTATION_INTERVAL (3600)
-#define ACLK_DELETE_ACK_INTERNAL (600)
#define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400)
-#define ACLK_AUTO_MARK_SUBMIT_INTERVAL (3600)
-#define ACLK_AUTO_MARK_UPDATED_INTERVAL (1800)
#define ACLK_SYNC_QUERY_SIZE 512
struct aclk_completion {
@@ -74,57 +66,14 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
out[23] = '_';
}
-static inline char *get_str_from_uuid(uuid_t *uuid)
-{
- char uuid_str[GUID_LEN + 1];
- if (unlikely(!uuid)) {
- uuid_t zero_uuid;
- uuid_clear(zero_uuid);
- uuid_unparse_lower(zero_uuid, uuid_str);
- }
- else
- uuid_unparse_lower(*uuid, uuid_str);
- return strdupz(uuid_str);
-}
-
-#define TABLE_ACLK_CHART "CREATE TABLE IF NOT EXISTS aclk_chart_%s (sequence_id INTEGER PRIMARY KEY, " \
- "date_created, date_updated, date_submitted, status, uuid, type, unique_id, " \
- "update_count default 1, unique(uuid, status));"
-
-#define TABLE_ACLK_CHART_PAYLOAD "CREATE TABLE IF NOT EXISTS aclk_chart_payload_%s (unique_id BLOB PRIMARY KEY, " \
- "uuid, claim_id, type, date_created, payload);"
-
-#define TABLE_ACLK_CHART_LATEST "CREATE TABLE IF NOT EXISTS aclk_chart_latest_%s (uuid BLOB PRIMARY KEY, " \
- "unique_id, date_submitted);"
-
-#define TRIGGER_ACLK_CHART_PAYLOAD "CREATE TRIGGER IF NOT EXISTS aclk_tr_chart_payload_%s " \
- "after insert on aclk_chart_payload_%s " \
- "begin insert into aclk_chart_%s (uuid, unique_id, type, status, date_created) values " \
- " (new.uuid, new.unique_id, new.type, 'pending', unixepoch()) on conflict(uuid, status) " \
- " do update set unique_id = new.unique_id, update_count = update_count + 1; " \
- "end;"
-
#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
- "alert_unique_id, date_created, date_submitted, date_cloud_ack, " \
+ "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
"unique(alert_unique_id));"
-#define INDEX_ACLK_CHART "CREATE INDEX IF NOT EXISTS aclk_chart_index_%s ON aclk_chart_%s (unique_id);"
-
-#define INDEX_ACLK_CHART_LATEST "CREATE INDEX IF NOT EXISTS aclk_chart_latest_index_%s ON aclk_chart_latest_%s (unique_id);"
-
#define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);"
-
enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0,
- ACLK_DATABASE_ADD_CHART,
- ACLK_DATABASE_ADD_DIMENSION,
- ACLK_DATABASE_PUSH_CHART,
- ACLK_DATABASE_PUSH_CHART_CONFIG,
- ACLK_DATABASE_RESET_CHART,
- ACLK_DATABASE_CHART_ACK,
- ACLK_DATABASE_UPD_RETENTION,
- ACLK_DATABASE_DIM_DELETION,
ACLK_DATABASE_ORPHAN_HOST,
ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CLEANUP,
@@ -142,20 +91,11 @@ enum aclk_database_opcode {
ACLK_MAX_ENUMERATIONS_DEFINED
};
-struct aclk_chart_payload_t {
- long sequence_id;
- long last_sequence_id;
- char *payload;
- struct aclk_chart_payload_t *next;
-};
-
-
struct aclk_database_cmd {
enum aclk_database_opcode opcode;
void *data;
void *data_param;
int count;
- uint64_t param1;
struct aclk_completion *completion;
};
@@ -172,12 +112,8 @@ struct aclk_database_worker_config {
char node_id[GUID_LEN + 1];
char host_guid[GUID_LEN + 1];
char *hostname; // hostname to avoid constant lookups
- uint64_t chart_sequence_id; // last chart_sequence_id
- time_t chart_timestamp; // last chart timestamp
time_t cleanup_after; // Start a cleanup after this timestamp
time_t startup_time; // When the sync thread started
- time_t rotation_after;
- uint64_t batch_id; // batch id to use
uint64_t alerts_batch_id; // batch id for alerts to use
uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
uint64_t alert_sequence_id; // last alert sequence_id
@@ -193,15 +129,9 @@ struct aclk_database_worker_config {
uv_cond_t cmd_cond;
volatile unsigned queue_size;
struct aclk_database_cmdqueue cmd_queue;
- uint32_t retry_count;
- int chart_updates;
int alert_updates;
- time_t batch_created;
int node_info_send;
time_t node_collectors_send;
- int chart_pending;
- int chart_reset_count;
- int retention_running;
volatile unsigned is_shutting_down;
volatile unsigned is_orphan;
struct aclk_database_worker_config *next;
@@ -216,23 +146,25 @@ static inline RRDHOST *find_host_by_node_id(char *node_id)
if (uuid_parse(node_id, node_uuid))
return NULL;
- RRDHOST *host = localhost;
- while(host) {
- if (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))
- return host;
- host = host->next;
+ rrd_rdlock();
+ RRDHOST *host, *ret = NULL;
+ rrdhost_foreach_read(host) {
+ if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
+ ret = host;
+ break;
+ }
}
- return NULL;
+ rrd_unlock();
+
+ return ret;
}
extern sqlite3 *db_meta;
-extern int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
-extern void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
-extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
-int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd);
-void aclk_data_rotated(void);
+int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
+void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
+void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
void sql_aclk_sync_init(void);
void sql_check_aclk_table_list(struct aclk_database_worker_config *wc);
void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);