summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r--database/sqlite/sqlite_aclk.c164
1 files changed, 137 insertions, 27 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 950856d9..43b34109 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -6,7 +6,7 @@
#include "sqlite_aclk_chart.h"
#include "sqlite_aclk_node.h"
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
#include "../../aclk/aclk.h"
#endif
@@ -23,11 +23,11 @@ const char *aclk_sync_config[] = {
"CREATE TRIGGER IF NOT EXISTS tr_dim_del AFTER DELETE ON dimension BEGIN INSERT INTO dimension_delete "
"(dimension_id, dimension_name, chart_type_id, dim_id, chart_id, host_id, date_created)"
- " select old.id, old.name, c.type||\".\"||c.id, old.dim_id, old.chart_id, c.host_id, strftime('%s') FROM"
+ " select old.id, old.name, c.type||\".\"||c.id, old.dim_id, old.chart_id, c.host_id, unixepoch() FROM"
" chart c WHERE c.chart_id = old.chart_id; END;",
"DELETE FROM dimension_delete WHERE host_id NOT IN"
- " (SELECT host_id FROM host) OR strftime('%s') - date_created > 604800;",
+ " (SELECT host_id FROM host) OR unixepoch() - date_created > 604800;",
NULL,
};
@@ -36,7 +36,7 @@ uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
int retention_running = 0;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
static void stop_retention_run()
{
uv_mutex_lock(&aclk_async_lock);
@@ -258,6 +258,74 @@ void aclk_sync_exit_all()
uv_mutex_unlock(&aclk_async_lock);
}
+#ifdef ENABLE_ACLK
+enum {
+ IDX_HOST_ID,
+ IDX_HOSTNAME,
+ IDX_REGISTRY,
+ IDX_UPDATE_EVERY,
+ IDX_OS,
+ IDX_TIMEZONE,
+ IDX_TAGS,
+ IDX_HOPS,
+ IDX_MEMORY_MODE,
+ IDX_ABBREV_TIMEZONE,
+ IDX_UTC_OFFSET,
+ IDX_PROGRAM_NAME,
+ IDX_PROGRAM_VERSION,
+ IDX_ENTRIES,
+ IDX_HEALTH_ENABLED,
+};
+
+static int create_host_callback(void *data, int argc, char **argv, char **column)
+{
+ UNUSED(data);
+ UNUSED(argc);
+ UNUSED(column);
+
+ char guid[UUID_STR_LEN];
+ uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid);
+
+ struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ system_info->hops = str2i((const char *) argv[IDX_HOPS]);
+
+ sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info);
+
+ RRDHOST *host = rrdhost_find_or_create(
+ (const char *) argv[IDX_HOSTNAME]
+ , (const char *) argv[IDX_REGISTRY]
+ , guid
+ , (const char *) argv[IDX_OS]
+ , (const char *) argv[IDX_TIMEZONE]
+ , (const char *) argv[IDX_ABBREV_TIMEZONE]
+ , argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET]) : 0
+ , (const char *) argv[IDX_TAGS]
+ , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown")
+ , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown")
+ , argv[3] ? str2i(argv[IDX_UPDATE_EVERY]) : 1
+ , argv[13] ? str2i(argv[IDX_ENTRIES]) : 0
+ , RRD_MEMORY_MODE_DBENGINE
+ , 0 // health
+ , 0 // rrdpush enabled
+ , NULL //destination
+ , NULL // api key
+ , NULL // send charts matching
+ , system_info
+ , 1
+ );
+ if (likely(host))
+ host->host_labels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ char node_str[UUID_STR_LEN] = "<none>";
+ if (likely(host->node_id))
+ uuid_unparse_lower(*host->node_id, node_str);
+ internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", host->hostname, host->machine_guid, node_str);
+#endif
+ return 0;
+}
+#endif
+
int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
{
char uuid_str[GUID_LEN + 1];
@@ -267,16 +335,17 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str);
- if (rrdhost_find_by_guid(uuid_str, 0) == localhost)
+ RRDHOST *host = rrdhost_find_by_guid(uuid_str, 0);
+ if (host == localhost)
return 0;
- sql_create_aclk_table(NULL, (uuid_t *) argv[0], (uuid_t *) argv[1]);
+ sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]);
return 0;
}
void sql_aclk_sync_init(void)
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
char *err_msg = NULL;
int rc;
@@ -303,8 +372,23 @@ void sql_aclk_sync_init(void)
info("SQLite aclk sync initialization completed");
fatal_assert(0 == uv_mutex_init(&aclk_async_lock));
+ if (likely(rrdcontext_enabled == CONFIG_BOOLEAN_YES)) {
+ rc = sqlite3_exec(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, "
+ "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, "
+ "program_version, entries, health_enabled FROM host WHERE hops >0;",
+ create_host_callback, NULL, &err_msg);
+ if (rc != SQLITE_OK) {
+ error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg);
+ sqlite3_free(err_msg);
+ }
+ }
+
rc = sqlite3_exec(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE "
- "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, NULL);
+ "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg);
+ if (rc != SQLITE_OK) {
+ error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg);
+ sqlite3_free(err_msg);
+ }
#endif
return;
}
@@ -323,7 +407,7 @@ static void timer_cb(uv_timer_t* handle)
uv_stop(handle->loop);
uv_update_time(handle->loop);
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
struct aclk_database_worker_config *wc = handle->data;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -338,7 +422,7 @@ static void timer_cb(uv_timer_t* handle)
wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
- if (aclk_use_new_cloud_arch && aclk_connected) {
+ if (aclk_connected) {
if (wc->rotation_after && wc->rotation_after < now) {
cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
@@ -373,7 +457,7 @@ static void timer_cb(uv_timer_t* handle)
}
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
void after_send_retention(uv_work_t *req, int status)
{
struct aclk_database_worker_config *wc = req->data;
@@ -410,7 +494,6 @@ void aclk_database_worker(void *arg)
{
worker_register("ACLKSYNC");
worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
worker_register_job_name(ACLK_DATABASE_ADD_CHART, "chart add");
worker_register_job_name(ACLK_DATABASE_ADD_DIMENSION, "dimension add");
worker_register_job_name(ACLK_DATABASE_PUSH_CHART, "chart push");
@@ -420,11 +503,11 @@ void aclk_database_worker(void *arg)
worker_register_job_name(ACLK_DATABASE_UPD_RETENTION, "retention check");
worker_register_job_name(ACLK_DATABASE_DIM_DELETION, "dimension delete");
worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan");
-#endif
worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log");
worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info");
+ worker_register_job_name(ACLK_DATABASE_NODE_COLLECTORS, "node collectors");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot");
@@ -474,12 +557,12 @@ void aclk_database_worker(void *arg)
fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
// wc->retry_count = 0;
- wc->node_info_send = (wc->host && !localhost);
+ wc->node_info_send = 1;
// aclk_add_worker_thread(wc);
info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc));
memset(&cmd, 0, sizeof(cmd));
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
uv_work_t retention_work;
sql_get_last_chart_sequence(wc);
wc->chart_payload_count = sql_get_pending_count(wc);
@@ -532,7 +615,7 @@ void aclk_database_worker(void *arg)
break;
// CHART / DIMENSION OPERATIONS
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
case ACLK_DATABASE_ADD_CHART:
debug(D_ACLK_SYNC, "Adding chart event for %s", wc->host_guid);
aclk_add_chart_event(wc, cmd);
@@ -585,7 +668,11 @@ void aclk_database_worker(void *arg)
debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str);
sql_build_node_info(wc, cmd);
break;
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ case ACLK_DATABASE_NODE_COLLECTORS:
+ debug(D_ACLK_SYNC,"Sending node collectors info for %s", wc->uuid_str);
+ sql_build_node_collectors(wc);
+ break;
+#ifdef ENABLE_ACLK
case ACLK_DATABASE_DIM_DELETION:
debug(D_ACLK_SYNC,"Sending dimension deletion information %s", wc->uuid_str);
aclk_process_dimension_deletion(wc, cmd);
@@ -624,16 +711,23 @@ void aclk_database_worker(void *arg)
snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->host->hostname);
uv_thread_set_name_np(wc->thread, threadname);
wc->host->dbsync_worker = wc;
+ if (unlikely(!wc->hostname))
+ wc->hostname = strdupz(wc->host->hostname);
aclk_del_worker_thread(wc);
wc->node_info_send = 1;
}
}
}
- if (wc->node_info_send && wc->host && localhost && claimed() && aclk_connected) {
+ if (wc->node_info_send && localhost && claimed() && aclk_connected) {
cmd.opcode = ACLK_DATABASE_NODE_INFO;
cmd.completion = NULL;
wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd);
}
+ if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) {
+ cmd.opcode = ACLK_DATABASE_NODE_COLLECTORS;
+ cmd.completion = NULL;
+ wc->node_collectors_send = aclk_database_enq_cmd_noblock(wc, &cmd);
+ }
if (localhost == wc->host)
(void) sqlite3_wal_checkpoint(db_meta, NULL);
break;
@@ -676,6 +770,7 @@ void aclk_database_worker(void *arg)
rrd_rdlock();
if (likely(wc->host))
wc->host->dbsync_worker = NULL;
+ freez(wc->hostname);
freez(wc);
rrd_unlock();
@@ -745,13 +840,17 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
return;
struct aclk_database_worker_config *wc = callocz(1, sizeof(struct aclk_database_worker_config));
- if (likely(host))
- host->dbsync_worker = (void *) wc;
+ if (node_id && !uuid_is_null(*node_id))
+ uuid_unparse_lower(*node_id, wc->node_id);
+ if (likely(host)) {
+ host->dbsync_worker = (void *)wc;
+ wc->hostname = strdupz(host->hostname);
+ }
+ else
+ wc->hostname = get_hostname_by_node_id(wc->node_id);
wc->host = host;
strcpy(wc->uuid_str, uuid_str);
strcpy(wc->host_guid, host_guid);
- if (node_id && !uuid_is_null(*node_id))
- uuid_unparse_lower(*node_id, wc->node_id);
wc->chart_updates = 0;
wc->alert_updates = 0;
wc->retry_count = 0;
@@ -775,7 +874,7 @@ void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct
BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE);
buffer_sprintf(sql,"DELETE FROM aclk_chart_%s WHERE date_submitted IS NOT NULL AND "
- "date_updated < strftime('%%s','now','-%d seconds');", wc->uuid_str, ACLK_DELETE_ACK_INTERNAL);
+ "CAST(date_updated AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_INTERNAL);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
@@ -786,7 +885,18 @@ void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct
buffer_flush(sql);
buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND "
- "date_cloud_ack < strftime('%%s','now','-%d seconds');", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL);
+ "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL);
+ db_execute(buffer_tostring(sql));
+ buffer_flush(sql);
+
+ buffer_sprintf(sql,"UPDATE aclk_chart_%s SET status = NULL, date_submitted=unixepoch() WHERE "
+ "date_submitted IS NULL AND CAST(date_created AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_AUTO_MARK_SUBMIT_INTERVAL);
+ db_execute(buffer_tostring(sql));
+ buffer_flush(sql);
+
+ buffer_sprintf(sql,"UPDATE aclk_chart_%s SET date_updated = unixepoch() WHERE date_updated IS NULL"
+ " AND date_submitted IS NOT NULL AND CAST(date_submitted AS INT) < unixepoch()-%d;",
+ wc->uuid_str, ACLK_AUTO_MARK_UPDATED_INTERVAL);
db_execute(buffer_tostring(sql));
buffer_free(sql);
@@ -912,15 +1022,15 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
sqlite3_free(err_msg);
}
db_execute("DELETE FROM dimension_delete WHERE host_id NOT IN (SELECT host_id FROM host) "
- " OR strftime('%s') - date_created > 604800;");
+ " OR unixepoch() - date_created > 604800;");
return;
}
void aclk_data_rotated(void)
{
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#ifdef ENABLE_ACLK
- if (!aclk_use_new_cloud_arch || !aclk_connected)
+ if (!aclk_connected)
return;
time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY;