summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r--database/sqlite/sqlite_aclk.c336
1 files changed, 154 insertions, 182 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 7e3a9b2eb..3b0c40522 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -10,10 +10,140 @@ void sanity_check(void) {
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
}
-const char *aclk_sync_config[] = {
+static int sql_check_aclk_table(void *data, int argc, char **argv, char **column)
+{
+ struct aclk_database_worker_config *wc = data;
+ UNUSED(argc);
+ UNUSED(column);
- NULL,
-};
+ debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]);
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_DELETE_HOST;
+ cmd.data = strdupz((char *) argv[0]);
+ aclk_database_enq_cmd_noblock(wc, &cmd);
+ return 0;
+}
+
+#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
+ "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
+
+static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
+{
+ char *err_msg = NULL;
+ debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
+ int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg);
+ if (rc != SQLITE_OK) {
+ error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
+ sqlite3_free(err_msg);
+ }
+}
+
+static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+ UNUSED(cmd);
+
+ debug(D_ACLK, "Checking database for %s", wc->host_guid);
+
+ BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
+
+ buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND "
+ "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL);
+ db_execute(buffer_tostring(sql));
+
+ buffer_free(sql);
+}
+
+
+#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
+
+static int is_host_available(uuid_t *host_id)
+{
+ sqlite3_stmt *res = NULL;
+ int rc;
+
+ if (unlikely(!db_meta)) {
+ if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_UUID, -1, &res, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to select node instance information for a node");
+ return 1;
+ }
+
+ rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind host_id parameter to select node instance information");
+ goto failed;
+ }
+ rc = sqlite3_step_monitored(res);
+
+failed:
+ if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
+ error_report("Failed to finalize the prepared statement when checking host existence");
+
+ return (rc == SQLITE_ROW);
+}
+
+// OPCODE: ACLK_DATABASE_DELETE_HOST
+void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+ UNUSED(wc);
+ char uuid_str[GUID_LEN + 1];
+ char host_str[GUID_LEN + 1];
+
+ int rc;
+ uuid_t host_uuid;
+ char *host_guid = (char *)cmd.data;
+
+ if (unlikely(!host_guid))
+ return;
+
+ rc = uuid_parse(host_guid, host_uuid);
+ freez(host_guid);
+ if (rc)
+ return;
+
+ uuid_unparse_lower(host_uuid, host_str);
+ uuid_unparse_lower_fix(&host_uuid, uuid_str);
+
+ debug(D_ACLK_SYNC, "Checking if I should delete aclk tables for node %s", host_str);
+
+ if (is_host_available(&host_uuid)) {
+ debug(D_ACLK_SYNC, "Host %s exists, not deleting aclk sync tables", host_str);
+ return;
+ }
+
+ debug(D_ACLK_SYNC, "Host %s does NOT exist, can delete aclk sync tables", host_str);
+
+ sqlite3_stmt *res = NULL;
+ BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
+
+ buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \
+ "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str);
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to clean up aclk tables");
+ goto fail;
+ }
+ buffer_flush(sql);
+
+ while (sqlite3_step_monitored(res) == SQLITE_ROW)
+ buffer_strcat(sql, (char *) sqlite3_column_text(res, 0));
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc);
+
+ db_execute(buffer_tostring(sql));
+
+fail:
+ buffer_free(sql);
+}
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
@@ -38,7 +168,6 @@ void aclk_add_worker_thread(struct aclk_database_worker_config *wc)
aclk_thread_head = wc;
}
uv_mutex_unlock(&aclk_async_lock);
- return;
}
void aclk_del_worker_thread(struct aclk_database_worker_config *wc)
@@ -53,7 +182,6 @@ void aclk_del_worker_thread(struct aclk_database_worker_config *wc)
if (*tmp)
*tmp = wc->next;
uv_mutex_unlock(&aclk_async_lock);
- return;
}
int aclk_worker_thread_exists(char *guid)
@@ -199,7 +327,6 @@ void aclk_sync_exit_all()
uv_mutex_unlock(&aclk_async_lock);
}
-#ifdef ENABLE_ACLK
enum {
IDX_HOST_ID,
IDX_HOSTNAME,
@@ -228,6 +355,8 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid);
struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
+
system_info->hops = str2i((const char *) argv[IDX_HOPS]);
sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info);
@@ -268,9 +397,9 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
#endif
return 0;
}
-#endif
-int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
+#ifdef ENABLE_ACLK
+static int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
{
char uuid_str[GUID_LEN + 1];
UNUSED(data);
@@ -286,10 +415,9 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]);
return 0;
}
-
+#endif
void sql_aclk_sync_init(void)
{
-#ifdef ENABLE_ACLK
char *err_msg = NULL;
int rc;
@@ -301,21 +429,7 @@ void sql_aclk_sync_init(void)
return;
}
- info("SQLite aclk sync initialization");
-
- for (int i = 0; aclk_sync_config[i]; i++) {
- debug(D_ACLK_SYNC, "Executing %s", aclk_sync_config[i]);
- rc = sqlite3_exec_monitored(db_meta, aclk_sync_config[i], 0, 0, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("SQLite error aclk sync initialization setup, rc = %d (%s)", rc, err_msg);
- error_report("SQLite failed statement %s", aclk_sync_config[i]);
- sqlite3_free(err_msg);
- return;
- }
- }
- info("SQLite aclk sync initialization completed");
- fatal_assert(0 == uv_mutex_init(&aclk_async_lock));
-
+ info("Creating archived hosts");
rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, "
"timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, "
"program_version, entries, health_enabled FROM host WHERE hops >0;",
@@ -325,14 +439,16 @@ void sql_aclk_sync_init(void)
sqlite3_free(err_msg);
}
+#ifdef ENABLE_ACLK
+ fatal_assert(0 == uv_mutex_init(&aclk_async_lock));
rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE "
"h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg);
if (rc != SQLITE_OK) {
error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg);
sqlite3_free(err_msg);
}
+ info("ACLK sync initialization completed");
#endif
- return;
}
static void async_cb(uv_async_t *handle)
@@ -374,10 +490,9 @@ static void timer_cb(uv_timer_t* handle)
#endif
}
-#define MAX_CMD_BATCH_SIZE (256)
-
-void aclk_database_worker(void *arg)
+static void aclk_database_worker(void *arg)
{
+ service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register("ACLKSYNC");
worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan");
@@ -398,15 +513,12 @@ void aclk_database_worker(void *arg)
enum aclk_database_opcode opcode;
uv_timer_t timer_req;
struct aclk_database_cmd cmd;
- unsigned cmd_batch_size;
-
- //aclk_database_init_cmd_queue(wc);
char threadname[NETDATA_THREAD_NAME_MAX+1];
if (wc->host)
- snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host));
+ snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host));
else {
- snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->uuid_str);
+ snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", wc->uuid_str);
threadname[11] = '\0';
}
uv_thread_set_name_np(wc->thread, threadname);
@@ -449,17 +561,13 @@ void aclk_database_worker(void *arg)
uv_run(loop, UV_RUN_DEFAULT);
/* wait for commands */
- cmd_batch_size = 0;
do {
- if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
- break;
cmd = aclk_database_deq_cmd(wc);
if (netdata_exit)
break;
opcode = cmd.opcode;
- ++cmd_batch_size;
if(likely(opcode != ACLK_DATABASE_NOOP))
worker_is_busy(opcode);
@@ -535,7 +643,7 @@ void aclk_database_worker(void *arg)
wc->host = rrdhost_find_by_guid(wc->host_guid);
if (wc->host) {
info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid);
- snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", rrdhost_hostname(wc->host));
+ snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host));
uv_thread_set_name_np(wc->thread, threadname);
wc->host->dbsync_worker = wc;
if (unlikely(!wc->hostname))
@@ -584,10 +692,8 @@ void aclk_database_worker(void *arg)
info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid);
/* TODO: don't let the API block by waiting to enqueue commands */
uv_cond_destroy(&wc->cmd_cond);
-/* uv_mutex_destroy(&wc->cmd_mutex); */
- //fatal_assert(0 == uv_loop_close(loop));
- int rc;
+ int rc;
do {
rc = uv_loop_close(loop);
} while (rc != UV_EBUSY);
@@ -628,7 +734,7 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
uuid_unparse_lower(*host_uuid, host_guid);
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE);
+ BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str);
db_execute(buffer_tostring(sql));
@@ -648,6 +754,10 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
if (likely(host)) {
host->dbsync_worker = (void *)wc;
wc->hostname = strdupz(rrdhost_hostname(host));
+ if (node_id && !host->node_id) {
+ host->node_id = mallocz(sizeof(*host->node_id));
+ uuid_copy(*host->node_id, *node_id);
+ }
}
else
wc->hostname = get_hostname_by_node_id(wc->node_id);
@@ -663,142 +773,4 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
UNUSED(host_uuid);
UNUSED(node_id);
#endif
- return;
-}
-
-void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
-{
- UNUSED(cmd);
-
- debug(D_ACLK, "Checking database for %s", wc->host_guid);
-
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE);
-
- buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND "
- "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL);
- db_execute(buffer_tostring(sql));
-
- buffer_free(sql);
- return;
-}
-
-#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
-
-static int is_host_available(uuid_t *host_id)
-{
- sqlite3_stmt *res = NULL;
- int rc;
-
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
- error_report("Database has not been initialized");
- return 1;
- }
-
- rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_UUID, -1, &res, 0);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement to select node instance information for a node");
- return 1;
- }
-
- rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to bind host_id parameter to select node instance information");
- goto failed;
- }
- rc = sqlite3_step_monitored(res);
-
- failed:
- if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
- error_report("Failed to finalize the prepared statement when checking host existence");
-
- return (rc == SQLITE_ROW);
-}
-
-// OPCODE: ACLK_DATABASE_DELETE_HOST
-void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
-{
- UNUSED(wc);
- char uuid_str[GUID_LEN + 1];
- char host_str[GUID_LEN + 1];
-
- int rc;
- uuid_t host_uuid;
- char *host_guid = (char *)cmd.data;
-
- if (unlikely(!host_guid))
- return;
-
- rc = uuid_parse(host_guid, host_uuid);
- freez(host_guid);
- if (rc)
- return;
-
- uuid_unparse_lower(host_uuid, host_str);
- uuid_unparse_lower_fix(&host_uuid, uuid_str);
-
- debug(D_ACLK_SYNC, "Checking if I should delete aclk tables for node %s", host_str);
-
- if (is_host_available(&host_uuid)) {
- debug(D_ACLK_SYNC, "Host %s exists, not deleting aclk sync tables", host_str);
- return;
- }
-
- debug(D_ACLK_SYNC, "Host %s does NOT exist, can delete aclk sync tables", host_str);
-
- sqlite3_stmt *res = NULL;
- BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE);
-
- buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str);
-
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
- if (rc != SQLITE_OK) {
- error_report("Failed to prepare statement to clean up aclk tables");
- goto fail;
- }
- buffer_flush(sql);
-
- while (sqlite3_step_monitored(res) == SQLITE_ROW)
- buffer_strcat(sql, (char *) sqlite3_column_text(res, 0));
-
- rc = sqlite3_finalize(res);
- if (unlikely(rc != SQLITE_OK))
- error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc);
-
- db_execute(buffer_tostring(sql));
-
-fail:
- buffer_free(sql);
- return;
-}
-
-static int sql_check_aclk_table(void *data, int argc, char **argv, char **column)
-{
- struct aclk_database_worker_config *wc = data;
- UNUSED(argc);
- UNUSED(column);
-
- debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]);
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_DELETE_HOST;
- cmd.data = strdupz((char *) argv[0]);
- aclk_database_enq_cmd_noblock(wc, &cmd);
- return 0;
-}
-
-#define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
- "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
-
-void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
-{
- char *err_msg = NULL;
- debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
- int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg);
- if (rc != SQLITE_OK) {
- error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
- sqlite3_free(err_msg);
- }
- return;
-}
+} \ No newline at end of file