diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 164 |
1 files changed, 137 insertions, 27 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 950856d9a..43b341097 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -6,7 +6,7 @@ #include "sqlite_aclk_chart.h" #include "sqlite_aclk_node.h" -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK #include "../../aclk/aclk.h" #endif @@ -23,11 +23,11 @@ const char *aclk_sync_config[] = { "CREATE TRIGGER IF NOT EXISTS tr_dim_del AFTER DELETE ON dimension BEGIN INSERT INTO dimension_delete " "(dimension_id, dimension_name, chart_type_id, dim_id, chart_id, host_id, date_created)" - " select old.id, old.name, c.type||\".\"||c.id, old.dim_id, old.chart_id, c.host_id, strftime('%s') FROM" + " select old.id, old.name, c.type||\".\"||c.id, old.dim_id, old.chart_id, c.host_id, unixepoch() FROM" " chart c WHERE c.chart_id = old.chart_id; END;", "DELETE FROM dimension_delete WHERE host_id NOT IN" - " (SELECT host_id FROM host) OR strftime('%s') - date_created > 604800;", + " (SELECT host_id FROM host) OR unixepoch() - date_created > 604800;", NULL, }; @@ -36,7 +36,7 @@ uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; int retention_running = 0; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK static void stop_retention_run() { uv_mutex_lock(&aclk_async_lock); @@ -258,6 +258,74 @@ void aclk_sync_exit_all() uv_mutex_unlock(&aclk_async_lock); } +#ifdef ENABLE_ACLK +enum { + IDX_HOST_ID, + IDX_HOSTNAME, + IDX_REGISTRY, + IDX_UPDATE_EVERY, + IDX_OS, + IDX_TIMEZONE, + IDX_TAGS, + IDX_HOPS, + IDX_MEMORY_MODE, + IDX_ABBREV_TIMEZONE, + IDX_UTC_OFFSET, + IDX_PROGRAM_NAME, + IDX_PROGRAM_VERSION, + IDX_ENTRIES, + IDX_HEALTH_ENABLED, +}; + +static int create_host_callback(void *data, int argc, char **argv, char **column) +{ + UNUSED(data); + UNUSED(argc); + UNUSED(column); + + char guid[UUID_STR_LEN]; + uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid); + + struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); + system_info->hops = str2i((const char *) argv[IDX_HOPS]); + + sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info); + + RRDHOST *host = rrdhost_find_or_create( + (const char *) argv[IDX_HOSTNAME] + , (const char *) argv[IDX_REGISTRY] + , guid + , (const char *) argv[IDX_OS] + , (const char *) argv[IDX_TIMEZONE] + , (const char *) argv[IDX_ABBREV_TIMEZONE] + , argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET]) : 0 + , (const char *) argv[IDX_TAGS] + , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown") + , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown") + , argv[3] ? str2i(argv[IDX_UPDATE_EVERY]) : 1 + , argv[13] ? str2i(argv[IDX_ENTRIES]) : 0 + , RRD_MEMORY_MODE_DBENGINE + , 0 // health + , 0 // rrdpush enabled + , NULL //destination + , NULL // api key + , NULL // send charts matching + , system_info + , 1 + ); + if (likely(host)) + host->host_labels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]); + +#ifdef NETDATA_INTERNAL_CHECKS + char node_str[UUID_STR_LEN] = "<none>"; + if (likely(host->node_id)) + uuid_unparse_lower(*host->node_id, node_str); + internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", host->hostname, host->machine_guid, node_str); +#endif + return 0; +} +#endif + int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) { char uuid_str[GUID_LEN + 1]; @@ -267,16 +335,17 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str); - if (rrdhost_find_by_guid(uuid_str, 0) == localhost) + RRDHOST *host = rrdhost_find_by_guid(uuid_str, 0); + if (host == localhost) return 0; - sql_create_aclk_table(NULL, (uuid_t *) argv[0], (uuid_t *) argv[1]); + sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]); return 0; } void sql_aclk_sync_init(void) { -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK char *err_msg = NULL; int rc; @@ -303,8 +372,23 @@ void sql_aclk_sync_init(void) info("SQLite aclk sync initialization completed"); fatal_assert(0 == uv_mutex_init(&aclk_async_lock)); + if (likely(rrdcontext_enabled == CONFIG_BOOLEAN_YES)) { + rc = sqlite3_exec(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, " + "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " + "program_version, entries, health_enabled FROM host WHERE hops >0;", + create_host_callback, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } + } + rc = sqlite3_exec(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE " - "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, NULL); + "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg); + if (rc != SQLITE_OK) { + error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } #endif return; } @@ -323,7 +407,7 @@ static void timer_cb(uv_timer_t* handle) uv_stop(handle->loop); uv_update_time(handle->loop); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK struct aclk_database_worker_config *wc = handle->data; struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -338,7 +422,7 @@ static void timer_cb(uv_timer_t* handle) wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } - if (aclk_use_new_cloud_arch && aclk_connected) { + if (aclk_connected) { if (wc->rotation_after && wc->rotation_after < now) { cmd.opcode = ACLK_DATABASE_UPD_RETENTION; if (!aclk_database_enq_cmd_noblock(wc, &cmd)) @@ -373,7 +457,7 @@ static void timer_cb(uv_timer_t* handle) } -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK void after_send_retention(uv_work_t *req, int status) { struct aclk_database_worker_config *wc = req->data; @@ -410,7 +494,6 @@ void aclk_database_worker(void *arg) { worker_register("ACLKSYNC"); worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL worker_register_job_name(ACLK_DATABASE_ADD_CHART, "chart add"); worker_register_job_name(ACLK_DATABASE_ADD_DIMENSION, "dimension add"); worker_register_job_name(ACLK_DATABASE_PUSH_CHART, "chart push"); @@ -420,11 +503,11 @@ void aclk_database_worker(void *arg) worker_register_job_name(ACLK_DATABASE_UPD_RETENTION, "retention check"); worker_register_job_name(ACLK_DATABASE_DIM_DELETION, "dimension delete"); worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan"); -#endif worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log"); worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info"); + worker_register_job_name(ACLK_DATABASE_NODE_COLLECTORS, "node collectors"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); @@ -474,12 +557,12 @@ void aclk_database_worker(void *arg) fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); // wc->retry_count = 0; - wc->node_info_send = (wc->host && !localhost); + wc->node_info_send = 1; // aclk_add_worker_thread(wc); info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc)); memset(&cmd, 0, sizeof(cmd)); -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK uv_work_t retention_work; sql_get_last_chart_sequence(wc); wc->chart_payload_count = sql_get_pending_count(wc); @@ -532,7 +615,7 @@ void aclk_database_worker(void *arg) break; // CHART / DIMENSION OPERATIONS -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK case ACLK_DATABASE_ADD_CHART: debug(D_ACLK_SYNC, "Adding chart event for %s", wc->host_guid); aclk_add_chart_event(wc, cmd); @@ -585,7 +668,11 @@ void aclk_database_worker(void *arg) debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str); sql_build_node_info(wc, cmd); break; -#ifdef ENABLE_NEW_CLOUD_PROTOCOL + case ACLK_DATABASE_NODE_COLLECTORS: + debug(D_ACLK_SYNC,"Sending node collectors info for %s", wc->uuid_str); + sql_build_node_collectors(wc); + break; +#ifdef ENABLE_ACLK case ACLK_DATABASE_DIM_DELETION: debug(D_ACLK_SYNC,"Sending dimension deletion information %s", wc->uuid_str); aclk_process_dimension_deletion(wc, cmd); @@ -624,16 +711,23 @@ void aclk_database_worker(void *arg) snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->host->hostname); uv_thread_set_name_np(wc->thread, threadname); wc->host->dbsync_worker = wc; + if (unlikely(!wc->hostname)) + wc->hostname = strdupz(wc->host->hostname); aclk_del_worker_thread(wc); wc->node_info_send = 1; } } } - if (wc->node_info_send && wc->host && localhost && claimed() && aclk_connected) { + if (wc->node_info_send && localhost && claimed() && aclk_connected) { cmd.opcode = ACLK_DATABASE_NODE_INFO; cmd.completion = NULL; wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd); } + if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) { + cmd.opcode = ACLK_DATABASE_NODE_COLLECTORS; + cmd.completion = NULL; + wc->node_collectors_send = aclk_database_enq_cmd_noblock(wc, &cmd); + } if (localhost == wc->host) (void) sqlite3_wal_checkpoint(db_meta, NULL); break; @@ -676,6 +770,7 @@ void aclk_database_worker(void *arg) rrd_rdlock(); if (likely(wc->host)) wc->host->dbsync_worker = NULL; + freez(wc->hostname); freez(wc); rrd_unlock(); @@ -745,13 +840,17 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) return; struct aclk_database_worker_config *wc = callocz(1, sizeof(struct aclk_database_worker_config)); - if (likely(host)) - host->dbsync_worker = (void *) wc; + if (node_id && !uuid_is_null(*node_id)) + uuid_unparse_lower(*node_id, wc->node_id); + if (likely(host)) { + host->dbsync_worker = (void *)wc; + wc->hostname = strdupz(host->hostname); + } + else + wc->hostname = get_hostname_by_node_id(wc->node_id); wc->host = host; strcpy(wc->uuid_str, uuid_str); strcpy(wc->host_guid, host_guid); - if (node_id && !uuid_is_null(*node_id)) - uuid_unparse_lower(*node_id, wc->node_id); wc->chart_updates = 0; wc->alert_updates = 0; wc->retry_count = 0; @@ -775,7 +874,7 @@ void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE); buffer_sprintf(sql,"DELETE FROM aclk_chart_%s WHERE date_submitted IS NOT NULL AND " - "date_updated < strftime('%%s','now','-%d seconds');", wc->uuid_str, ACLK_DELETE_ACK_INTERNAL); + "CAST(date_updated AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_INTERNAL); db_execute(buffer_tostring(sql)); buffer_flush(sql); @@ -786,7 +885,18 @@ void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct buffer_flush(sql); buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND " - "date_cloud_ack < strftime('%%s','now','-%d seconds');", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); + "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL); + db_execute(buffer_tostring(sql)); + buffer_flush(sql); + + buffer_sprintf(sql,"UPDATE aclk_chart_%s SET status = NULL, date_submitted=unixepoch() WHERE " + "date_submitted IS NULL AND CAST(date_created AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_AUTO_MARK_SUBMIT_INTERVAL); + db_execute(buffer_tostring(sql)); + buffer_flush(sql); + + buffer_sprintf(sql,"UPDATE aclk_chart_%s SET date_updated = unixepoch() WHERE date_updated IS NULL" + " AND date_submitted IS NOT NULL AND CAST(date_submitted AS INT) < unixepoch()-%d;", + wc->uuid_str, ACLK_AUTO_MARK_UPDATED_INTERVAL); db_execute(buffer_tostring(sql)); buffer_free(sql); @@ -912,15 +1022,15 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) sqlite3_free(err_msg); } db_execute("DELETE FROM dimension_delete WHERE host_id NOT IN (SELECT host_id FROM host) " - " OR strftime('%s') - date_created > 604800;"); + " OR unixepoch() - date_created > 604800;"); return; } void aclk_data_rotated(void) { -#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#ifdef ENABLE_ACLK - if (!aclk_use_new_cloud_arch || !aclk_connected) + if (!aclk_connected) return; time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY; |