summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.h
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk.h')
-rw-r--r--database/sqlite/sqlite_aclk.h125
1 files changed, 24 insertions, 101 deletions
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 208177e4..d555a0ce 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -18,45 +18,6 @@
#define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400)
#define ACLK_SYNC_QUERY_SIZE 512
-struct aclk_completion {
- uv_mutex_t mutex;
- uv_cond_t cond;
- volatile unsigned completed;
-};
-
-static inline void init_aclk_completion(struct aclk_completion *p)
-{
- p->completed = 0;
- fatal_assert(0 == uv_cond_init(&p->cond));
- fatal_assert(0 == uv_mutex_init(&p->mutex));
-}
-
-static inline void destroy_aclk_completion(struct aclk_completion *p)
-{
- uv_cond_destroy(&p->cond);
- uv_mutex_destroy(&p->mutex);
-}
-
-static inline void wait_for_aclk_completion(struct aclk_completion *p)
-{
- uv_mutex_lock(&p->mutex);
- while (0 == p->completed) {
- uv_cond_wait(&p->cond, &p->mutex);
- }
- fatal_assert(1 == p->completed);
- uv_mutex_unlock(&p->mutex);
-}
-
-static inline void aclk_complete(struct aclk_completion *p)
-{
- uv_mutex_lock(&p->mutex);
- p->completed = 1;
- uv_mutex_unlock(&p->mutex);
- uv_cond_broadcast(&p->cond);
-}
-
-extern uv_mutex_t aclk_async_lock;
-
static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
{
uuid_unparse_lower(*uuid, out);
@@ -66,6 +27,12 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
out[23] = '_';
}
+static inline int claimed()
+{
+ return localhost->aclk_state.claimed_id != NULL;
+}
+
+
#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
"alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
"unique(alert_unique_id));"
@@ -74,16 +41,14 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0,
- ACLK_DATABASE_ORPHAN_HOST,
- ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CLEANUP,
ACLK_DATABASE_DELETE_HOST,
- ACLK_DATABASE_NODE_INFO,
+ ACLK_DATABASE_NODE_STATE,
ACLK_DATABASE_PUSH_ALERT,
ACLK_DATABASE_PUSH_ALERT_CONFIG,
ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
+ ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,
ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
- ACLK_DATABASE_NODE_COLLECTORS,
ACLK_DATABASE_TIMER,
// leave this last
@@ -93,10 +58,8 @@ enum aclk_database_opcode {
struct aclk_database_cmd {
enum aclk_database_opcode opcode;
- void *data;
- void *data_param;
- int count;
- struct aclk_completion *completion;
+ void *param[2];
+ struct completion *completion;
};
#define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024)
@@ -106,67 +69,27 @@ struct aclk_database_cmdqueue {
struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE];
};
-struct aclk_database_worker_config {
- uv_thread_t thread;
- char uuid_str[GUID_LEN + 1];
- char node_id[GUID_LEN + 1];
- char host_guid[GUID_LEN + 1];
- char *hostname; // hostname to avoid constant lookups
- time_t cleanup_after; // Start a cleanup after this timestamp
- time_t startup_time; // When the sync thread started
- uint64_t alerts_batch_id; // batch id for alerts to use
- uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
- uint64_t alert_sequence_id; // last alert sequence_id
- int pause_alert_updates;
- uint32_t chart_payload_count;
- uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested
- uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message
- uv_loop_t *loop;
+struct aclk_sync_host_config {
RRDHOST *host;
- uv_async_t async;
- /* FIFO command queue */
- uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
- volatile unsigned queue_size;
- struct aclk_database_cmdqueue cmd_queue;
int alert_updates;
- int node_info_send;
+ int alert_checkpoint_req;
+ int alert_queue_removed;
+ time_t node_info_send_time;
time_t node_collectors_send;
- volatile unsigned is_shutting_down;
- volatile unsigned is_orphan;
- struct aclk_database_worker_config *next;
+ char uuid_str[UUID_STR_LEN];
+ char node_id[UUID_STR_LEN];
+ char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested
};
-static inline RRDHOST *find_host_by_node_id(char *node_id)
-{
- uuid_t node_uuid;
- if (unlikely(!node_id))
- return NULL;
-
- if (uuid_parse(node_id, node_uuid))
- return NULL;
-
- rrd_rdlock();
- RRDHOST *host, *ret = NULL;
- rrdhost_foreach_read(host) {
- if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
- ret = host;
- break;
- }
- }
- rrd_unlock();
-
- return ret;
-}
-
-
extern sqlite3 *db_meta;
-int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
-void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
+int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd);
void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
void sql_aclk_sync_init(void);
-int claimed();
-void aclk_sync_exit_all();
-struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id);
+void aclk_push_alert_config(const char *node_id, const char *config_hash);
+void aclk_push_node_alert_snapshot(const char *node_id);
+void aclk_push_node_health_log(const char *node_id);
+void aclk_push_node_removed_alerts(const char *node_id);
+void schedule_node_info_update(RRDHOST *host);
+
#endif //NETDATA_SQLITE_ACLK_H