summaryrefslogtreecommitdiffstats
path: root/src/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/database/sqlite/sqlite_aclk.c')
-rw-r--r--src/database/sqlite/sqlite_aclk.c228
1 files changed, 20 insertions, 208 deletions
diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c
index 8dc2231b4..027ee8f93 100644
--- a/src/database/sqlite/sqlite_aclk.c
+++ b/src/database/sqlite/sqlite_aclk.c
@@ -9,7 +9,6 @@ struct aclk_sync_config_s {
uv_thread_t thread;
uv_loop_t loop;
uv_timer_t timer_req;
- time_t cleanup_after; // Start a cleanup after this timestamp
uv_async_t async;
bool initialized;
SPINLOCK cmd_queue_lock;
@@ -24,7 +23,7 @@ void sanity_check(void) {
#ifdef ENABLE_ACLK
static struct aclk_database_cmd aclk_database_deq_cmd(void)
{
- struct aclk_database_cmd ret;
+ struct aclk_database_cmd ret = { 0 };
spinlock_lock(&aclk_sync_config.cmd_queue_lock);
if(aclk_sync_config.cmd_base) {
@@ -35,7 +34,6 @@ static struct aclk_database_cmd aclk_database_deq_cmd(void)
}
else {
ret.opcode = ACLK_DATABASE_NOOP;
- ret.completion = NULL;
}
spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
@@ -176,70 +174,24 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
#ifdef ENABLE_ACLK
-#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id"
-static int is_host_available(nd_uuid_t *host_id)
-{
- sqlite3_stmt *res = NULL;
- int rc = 0;
-
- if (!REQUIRE_DB(db_meta))
- return 1;
-
- if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_HOST_BY_UUID, &res))
- return 1;
-
- int param = 0;
- SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_id, sizeof(*host_id), SQLITE_STATIC));
-
- param = 0;
- rc = sqlite3_step_monitored(res);
-
-done:
- REPORT_BIND_FAIL(res, param);
- SQLITE_FINALIZE(res);
- return (rc == SQLITE_ROW);
-}
+#define SQL_SELECT_ACLK_ALERT_TABLES \
+ "SELECT 'DROP '||type||' IF EXISTS '||name||';' FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table', 'trigger', 'index')"
-// OPCODE: ACLK_DATABASE_DELETE_HOST
-static void sql_delete_aclk_table_list(char *host_guid)
+static void sql_delete_aclk_table_list(void)
{
- char uuid_str[UUID_STR_LEN];
- char host_str[UUID_STR_LEN];
-
- int rc;
- nd_uuid_t host_uuid;
-
- if (unlikely(!host_guid))
- return;
-
- rc = uuid_parse(host_guid, host_uuid);
- freez(host_guid);
- if (rc)
- return;
-
- uuid_unparse_lower(host_uuid, host_str);
- uuid_unparse_lower_fix(&host_uuid, uuid_str);
-
- if (is_host_available(&host_uuid))
- return;
-
sqlite3_stmt *res = NULL;
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
- buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index')", uuid_str);
+ BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, NULL);
- if (!PREPARE_STATEMENT(db_meta, buffer_tostring(sql), &res))
+ if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_ACLK_ALERT_TABLES, &res))
goto fail;
- buffer_flush(sql);
-
while (sqlite3_step_monitored(res) == SQLITE_ROW)
buffer_strcat(sql, (char *) sqlite3_column_text(res, 0));
SQLITE_FINALIZE(res);
- rc = db_execute(db_meta, buffer_tostring(sql));
+ int rc = db_execute(db_meta, buffer_tostring(sql));
if (unlikely(rc))
netdata_log_error("Failed to drop unused ACLK tables");
@@ -285,53 +237,6 @@ skip:
freez(machine_guid);
}
-
-static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unused, char **argv __maybe_unused, char **column __maybe_unused)
-{
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_DELETE_HOST;
- cmd.param[0] = strdupz((char *) argv[0]);
- aclk_database_enq_cmd(&cmd);
- return 0;
-}
-
-#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table')"
-
-static void sql_check_aclk_table_list(void)
-{
- char *err_msg = NULL;
- int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, NULL, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
- sqlite3_free(err_msg);
- }
-}
-
-#define SQL_ALERT_CLEANUP "DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND CAST(date_cloud_ack AS INT) < unixepoch()-%d"
-
-static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
-{
- char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql,sizeof(sql) - 1, SQL_ALERT_CLEANUP, (char *) argv[0], ACLK_DELETE_ACK_ALERTS_INTERNAL);
- if (unlikely(db_execute(db_meta, sql)))
- error_report("Failed to clean stale ACLK alert entries");
- return 0;
-}
-
-#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table')"
-
-static void sql_maint_aclk_sync_database_all(void)
-{
- char *err_msg = NULL;
- int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ALERT_LIST, sql_maint_aclk_sync_database, NULL, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
- sqlite3_free(err_msg);
- }
-}
-
static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
char uuid_str[UUID_STR_LEN];
@@ -339,7 +244,7 @@ static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_un
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
if (host != localhost)
- sql_create_aclk_table(host, (nd_uuid_t *) argv[0], (nd_uuid_t *) argv[1]);
+ create_aclk_config(host, (nd_uuid_t *)argv[0], (nd_uuid_t *)argv[1]);
return 0;
}
@@ -356,16 +261,7 @@ static void timer_cb(uv_timer_t *handle)
uv_stop(handle->loop);
uv_update_time(handle->loop);
- struct aclk_sync_config_s *config = handle->data;
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
-
- if (config->cleanup_after < now_realtime_sec()) {
- cmd.opcode = ACLK_DATABASE_CLEANUP;
- aclk_database_enq_cmd(&cmd);
- config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
- }
-
+ struct aclk_database_cmd cmd = { 0 };
if (aclk_connected) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
aclk_database_enq_cmd(&cmd);
@@ -373,7 +269,7 @@ static void timer_cb(uv_timer_t *handle)
}
}
-static void aclk_synchronization(void *arg __maybe_unused)
+static void aclk_synchronization(void *arg)
{
struct aclk_sync_config_s *config = arg;
uv_thread_set_name_np("ACLKSYNC");
@@ -381,14 +277,9 @@ static void aclk_synchronization(void *arg __maybe_unused)
service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
- worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
- worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
- worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint");
- worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot");
- worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check");
worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
uv_loop_t *loop = &config->loop;
@@ -401,9 +292,10 @@ static void aclk_synchronization(void *arg __maybe_unused)
netdata_log_info("Starting ACLK synchronization thread");
- config->cleanup_after = now_realtime_sec() + ACLK_DATABASE_CLEANUP_FIRST;
config->initialized = true;
+ sql_delete_aclk_table_list();
+
while (likely(service_running(SERVICE_ACLKSYNC))) {
enum aclk_database_opcode opcode;
worker_is_idle();
@@ -422,26 +314,17 @@ static void aclk_synchronization(void *arg __maybe_unused)
worker_is_busy(opcode);
switch (opcode) {
+ default:
case ACLK_DATABASE_NOOP:
/* the command queue was empty, do nothing */
break;
-// MAINTENANCE
- case ACLK_DATABASE_CLEANUP:
- // Scan all aclk_alert_ tables and cleanup as needed
- sql_maint_aclk_sync_database_all();
- sql_check_aclk_table_list();
- break;
-
- case ACLK_DATABASE_DELETE_HOST:
- sql_delete_aclk_table_list(cmd.param[0]);
- break;
// NODE STATE
case ACLK_DATABASE_NODE_STATE:;
RRDHOST *host = cmd.param[0];
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
struct aclk_sync_cfg_t *ahc = host->aclk_config;
if (unlikely(!ahc))
- sql_create_aclk_table(host, &host->host_uuid, host->node_id);
+ create_aclk_config(host, &host->host_uuid, host->node_id);
aclk_host_state_update(host, live, 1);
break;
case ACLK_DATABASE_NODE_UNREGISTER:
@@ -455,17 +338,7 @@ static void aclk_synchronization(void *arg __maybe_unused)
case ACLK_DATABASE_PUSH_ALERT:
aclk_push_alert_events_for_all_hosts();
break;
- case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:;
- aclk_push_alert_snapshot_event(cmd.param[0]);
- break;
- case ACLK_DATABASE_QUEUE_REMOVED_ALERTS:
- sql_process_queue_removed_alerts_to_aclk(cmd.param[0]);
- break;
- default:
- break;
}
- if (cmd.completion)
- completion_mark_complete(cmd.completion);
} while (opcode != ACLK_DATABASE_NOOP);
}
@@ -489,39 +362,11 @@ static void aclk_synchronization_init(void)
// -------------------------------------------------------------
-void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused)
+void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __maybe_unused, nd_uuid_t *node_id __maybe_unused)
{
#ifdef ENABLE_ACLK
- char uuid_str[UUID_STR_LEN];
- char host_guid[UUID_STR_LEN];
- int rc;
-
- uuid_unparse_lower_fix(host_uuid, uuid_str);
- uuid_unparse_lower(*host_uuid, host_guid);
-
- char sql[ACLK_SYNC_QUERY_SIZE];
-
- snprintfz(sql, sizeof(sql) - 1, TABLE_ACLK_ALERT, uuid_str);
- rc = db_execute(db_meta, sql);
- if (unlikely(rc))
- error_report("Failed to create ACLK alert table for host %s", host ? rrdhost_hostname(host) : host_guid);
- else {
- snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT1, uuid_str, uuid_str);
- rc = db_execute(db_meta, sql);
- if (unlikely(rc))
- error_report(
- "Failed to create ACLK alert table index 1 for host %s", host ? string2str(host->hostname) : host_guid);
-
- snprintfz(sql, sizeof(sql) - 1, INDEX_ACLK_ALERT2, uuid_str, uuid_str);
- rc = db_execute(db_meta, sql);
- if (unlikely(rc))
- error_report(
- "Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid);
- }
- if (likely(host) && unlikely(host->aclk_config))
- return;
- if (unlikely(!host))
+ if (!host || host->aclk_config)
return;
struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t));
@@ -535,8 +380,7 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __
}
wc->host = host;
- strcpy(wc->uuid_str, uuid_str);
- wc->alert_updates = 0;
+ wc->stream_alerts = false;
time_t now = now_realtime_sec();
wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now;
#endif
@@ -579,7 +423,7 @@ void sql_aclk_sync_init(void)
if (!number_of_children)
aclk_queue_node_info(localhost, true);
- rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES,aclk_config_parameters, NULL,&err_msg);
+ rc = sqlite3_exec_monitored(db_meta, SQL_FETCH_ALL_INSTANCES, aclk_config_parameters, NULL, &err_msg);
if (rc != SQLITE_OK) {
error_report("SQLite error when configuring host ACLK synchonization parameters, rc = %d (%s)", rc, err_msg);
@@ -591,15 +435,12 @@ void sql_aclk_sync_init(void)
#endif
}
-// Public
-
static inline void queue_aclk_sync_cmd(enum aclk_database_opcode opcode, const void *param0, const void *param1)
{
struct aclk_database_cmd cmd;
cmd.opcode = opcode;
cmd.param[0] = (void *) param0;
cmd.param[1] = (void *) param1;
- cmd.completion = NULL;
aclk_database_enq_cmd(&cmd);
}
@@ -612,35 +453,12 @@ void aclk_push_alert_config(const char *node_id, const char *config_hash)
queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_CONFIG, strdupz(node_id), strdupz(config_hash));
}
-void aclk_push_node_alert_snapshot(const char *node_id)
-{
- if (unlikely(!aclk_sync_config.initialized))
- return;
-
- queue_aclk_sync_cmd(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, strdupz(node_id), NULL);
-}
-
-
-void aclk_push_node_removed_alerts(const char *node_id)
-{
- if (unlikely(!aclk_sync_config.initialized))
- return;
-
- queue_aclk_sync_cmd(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, strdupz(node_id), NULL);
-}
-
void schedule_node_info_update(RRDHOST *host __maybe_unused)
{
#ifdef ENABLE_ACLK
if (unlikely(!host))
return;
-
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_NODE_STATE;
- cmd.param[0] = host;
- cmd.completion = NULL;
- aclk_database_enq_cmd(&cmd);
+ queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, NULL);
#endif
}
@@ -649,12 +467,6 @@ void unregister_node(const char *machine_guid)
{
if (unlikely(!machine_guid))
return;
-
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_NODE_UNREGISTER;
- cmd.param[0] = strdupz(machine_guid);
- cmd.completion = NULL;
- aclk_database_enq_cmd(&cmd);
+ queue_aclk_sync_cmd(ACLK_DATABASE_NODE_UNREGISTER, strdupz(machine_guid), NULL);
}
#endif