diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk.h')
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 125 |
1 files changed, 24 insertions, 101 deletions
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 208177e45..d555a0cef 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -18,45 +18,6 @@ #define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400) #define ACLK_SYNC_QUERY_SIZE 512 -struct aclk_completion { - uv_mutex_t mutex; - uv_cond_t cond; - volatile unsigned completed; -}; - -static inline void init_aclk_completion(struct aclk_completion *p) -{ - p->completed = 0; - fatal_assert(0 == uv_cond_init(&p->cond)); - fatal_assert(0 == uv_mutex_init(&p->mutex)); -} - -static inline void destroy_aclk_completion(struct aclk_completion *p) -{ - uv_cond_destroy(&p->cond); - uv_mutex_destroy(&p->mutex); -} - -static inline void wait_for_aclk_completion(struct aclk_completion *p) -{ - uv_mutex_lock(&p->mutex); - while (0 == p->completed) { - uv_cond_wait(&p->cond, &p->mutex); - } - fatal_assert(1 == p->completed); - uv_mutex_unlock(&p->mutex); -} - -static inline void aclk_complete(struct aclk_completion *p) -{ - uv_mutex_lock(&p->mutex); - p->completed = 1; - uv_mutex_unlock(&p->mutex); - uv_cond_broadcast(&p->cond); -} - -extern uv_mutex_t aclk_async_lock; - static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) { uuid_unparse_lower(*uuid, out); @@ -66,6 +27,12 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) out[23] = '_'; } +static inline int claimed() +{ + return localhost->aclk_state.claimed_id != NULL; +} + + #define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ "unique(alert_unique_id));" @@ -74,16 +41,14 @@ static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) enum aclk_database_opcode { ACLK_DATABASE_NOOP = 0, - ACLK_DATABASE_ORPHAN_HOST, - ACLK_DATABASE_ALARM_HEALTH_LOG, ACLK_DATABASE_CLEANUP, ACLK_DATABASE_DELETE_HOST, - ACLK_DATABASE_NODE_INFO, + ACLK_DATABASE_NODE_STATE, ACLK_DATABASE_PUSH_ALERT, ACLK_DATABASE_PUSH_ALERT_CONFIG, ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, + ACLK_DATABASE_PUSH_ALERT_CHECKPOINT, ACLK_DATABASE_QUEUE_REMOVED_ALERTS, - ACLK_DATABASE_NODE_COLLECTORS, ACLK_DATABASE_TIMER, // leave this last @@ -93,10 +58,8 @@ enum aclk_database_opcode { struct aclk_database_cmd { enum aclk_database_opcode opcode; - void *data; - void *data_param; - int count; - struct aclk_completion *completion; + void *param[2]; + struct completion *completion; }; #define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024) @@ -106,67 +69,27 @@ struct aclk_database_cmdqueue { struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE]; }; -struct aclk_database_worker_config { - uv_thread_t thread; - char uuid_str[GUID_LEN + 1]; - char node_id[GUID_LEN + 1]; - char host_guid[GUID_LEN + 1]; - char *hostname; // hostname to avoid constant lookups - time_t cleanup_after; // Start a cleanup after this timestamp - time_t startup_time; // When the sync thread started - uint64_t alerts_batch_id; // batch id for alerts to use - uint64_t alerts_start_seq_id; // cloud has asked to start streaming from - uint64_t alert_sequence_id; // last alert sequence_id - int pause_alert_updates; - uint32_t chart_payload_count; - uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested - uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message - uv_loop_t *loop; +struct aclk_sync_host_config { RRDHOST *host; - uv_async_t async; - /* FIFO command queue */ - uv_mutex_t cmd_mutex; - uv_cond_t cmd_cond; - volatile unsigned queue_size; - struct aclk_database_cmdqueue cmd_queue; int alert_updates; - int node_info_send; + int alert_checkpoint_req; + int alert_queue_removed; + time_t node_info_send_time; time_t node_collectors_send; - volatile unsigned is_shutting_down; - volatile unsigned is_orphan; - struct aclk_database_worker_config *next; + char uuid_str[UUID_STR_LEN]; + char node_id[UUID_STR_LEN]; + char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested }; -static inline RRDHOST *find_host_by_node_id(char *node_id) -{ - uuid_t node_uuid; - if (unlikely(!node_id)) - return NULL; - - if (uuid_parse(node_id, node_uuid)) - return NULL; - - rrd_rdlock(); - RRDHOST *host, *ret = NULL; - rrdhost_foreach_read(host) { - if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) { - ret = host; - break; - } - } - rrd_unlock(); - - return ret; -} - - extern sqlite3 *db_meta; -int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); -void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); +int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd); void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); void sql_aclk_sync_init(void); -int claimed(); -void aclk_sync_exit_all(); -struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id); +void aclk_push_alert_config(const char *node_id, const char *config_hash); +void aclk_push_node_alert_snapshot(const char *node_id); +void aclk_push_node_health_log(const char *node_id); +void aclk_push_node_removed_alerts(const char *node_id); +void schedule_node_info_update(RRDHOST *host); + #endif //NETDATA_SQLITE_ACLK_H |