diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk_chart.c')
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 138 |
1 files changed, 51 insertions, 87 deletions
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index a9db5282a..c1db60c49 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -3,7 +3,7 @@ #include "sqlite_functions.h" #include "sqlite_aclk_chart.h" -#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) +#ifdef ENABLE_ACLK #include "../../aclk/aclk_charts_api.h" #include "../../aclk/aclk.h" @@ -87,7 +87,7 @@ static int aclk_add_chart_payload( char sql[ACLK_SYNC_QUERY_SIZE]; snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ - "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str); + "VALUES (@unique_id, @uuid, @claim_id, unixepoch(), @type, @payload);", wc->uuid_str); rc = prepare_statement(db_meta, sql, &res_chart); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to store chart payload data"); @@ -143,7 +143,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat int rc = 0; CHECK_SQLITE_CONNECTION(db_meta); - char *claim_id = is_agent_claimed(); + char *claim_id = get_agent_claimid(); RRDSET *st = cmd.data; @@ -158,16 +158,8 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat chart_payload.claim_id = claim_id; chart_payload.id = strdupz(st->id); - struct label_index *labels = &st->state->labels; - netdata_rwlock_rdlock(&labels->labels_rwlock); - struct label *label_list = labels->head; - struct label *chart_label = NULL; - while (label_list) { - chart_label = add_label_to_list(chart_label, label_list->key, label_list->value, label_list->label_source); - label_list = label_list->next; - } - netdata_rwlock_unlock(&labels->labels_rwlock); - chart_payload.label_head = chart_label; + chart_payload.chart_labels = rrdlabels_create(); + rrdlabels_copy(chart_payload.chart_labels, st->state->chart_labels); size_t size; char *payload = generate_chart_instance_updated(&size, &chart_payload); @@ -220,7 +212,7 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str int rc = 0; sqlite3_stmt *res = NULL; - if (!aclk_use_new_cloud_arch || !aclk_connected) + if (!aclk_connected) return; if (unlikely(!db_meta)) @@ -230,7 +222,7 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str if (uuid_parse(wc->host_guid, host_id)) return; - char *claim_id = is_agent_claimed(); + char *claim_id = get_agent_claimid(); if (!claim_id) return; @@ -289,7 +281,7 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk struct aclk_chart_dimension_data *aclk_cd_data = cmd.data; - char *claim_id = is_agent_claimed(); + char *claim_id = get_agent_claimid(); if (!claim_id) goto cleanup; @@ -316,7 +308,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d return; } - char *claim_id = is_agent_claimed(); + char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; @@ -333,12 +325,6 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d char sql[ACLK_SYNC_QUERY_SIZE]; static __thread sqlite3_stmt *res = NULL; - char *hostname = NULL; - if (wc->host) - hostname = strdupz(wc->host->hostname); - else - hostname = get_hostname_by_node_id(wc->node_id); - if (unlikely(!res)) { snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \ "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \ @@ -348,7 +334,6 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to send a chart update via ACLK"); freez(claim_id); - freez(hostname); return; } } @@ -406,7 +391,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d if (likely(first_sequence)) { db_lock(); - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_chart_%s SET status = NULL, date_submitted=strftime('%%s','now') " + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_chart_%s SET status = NULL, date_submitted=unixepoch() " "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", wc->uuid_str, first_sequence, last_sequence); db_execute(sql); @@ -422,7 +407,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, wc->node_id, - hostname ? hostname : "N/A", + wc->hostname ? wc->hostname : "N/A", first_sequence, last_sequence, wc->batch_id); @@ -443,7 +428,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", wc->node_id, - hostname ? hostname : "N/A", + wc->hostname ? wc->hostname : "N/A", now_realtime_sec() - wc->startup_time); } @@ -462,7 +447,6 @@ bind_fail: error_report("Failed to reset statement when pushing chart events, rc = %d", rc); freez(claim_id); - freez(hostname); return; } @@ -548,7 +532,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_ char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"UPDATE aclk_chart_%s SET date_updated=strftime('%%s','now') WHERE sequence_id <= @sequence_id " + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"UPDATE aclk_chart_%s SET date_updated=unixepoch() WHERE sequence_id <= @sequence_id " "AND date_submitted IS NOT NULL AND date_updated IS NULL;", wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); @@ -587,13 +571,8 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl cmd.param1); db_execute(buffer_tostring(sql)); if (cmd.param1 == 1) { - char *hostname = NULL; - if (wc->host) - hostname = strdupz(wc->host->hostname); - else - hostname = get_hostname_by_node_id(wc->node_id); buffer_flush(sql); - log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, hostname? hostname : "N/A"); + log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->hostname ? wc->hostname: "N/A"); buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \ "DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str); db_lock(); @@ -619,19 +598,18 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl rrddim_foreach_read(rd, st) { rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); - rd->state->aclk_live_status = (rd->state->aclk_live_status == 0); + rd->aclk_live_status = (rd->aclk_live_status == 0); } rrdset_unlock(st); } rrdhost_unlock(host); } else error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid); - freez(hostname); } else { log_access( "ACLK STA [%s (%s)]: RESTARTING CHART SYNC FROM SEQUENCE %" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + wc->hostname ? wc->hostname : "N/A", cmd.param1); wc->chart_payload_count = sql_get_pending_count(wc); sql_get_last_chart_sequence(wc); @@ -732,12 +710,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at wc = (struct aclk_database_worker_config *)host->dbsync_worker ? (struct aclk_database_worker_config *)host->dbsync_worker : (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); - char *hostname = NULL; if (likely(wc)) { - if (wc->host) - hostname = strdupz(wc->host->hostname); - else - hostname = get_hostname_by_node_id(node_id); wc->chart_reset_count++; __sync_synchronize(); wc->chart_updates = 0; @@ -747,7 +720,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at log_access( "ACLK REQ [%s (%s)]: CHARTS STREAM from %"PRIu64" (LOCAL %"PRIu64") t=%ld resets=%d" , wc->node_id, - hostname ? hostname : "N/A", + wc->hostname ? wc->hostname : "N/A", sequence_id + 1, wc->chart_sequence_id, wc->chart_timestamp, @@ -757,13 +730,13 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at "ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST " "remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ", wc->node_id, - hostname ? hostname : "N/A", + wc->hostname ? wc->hostname : "N/A", sequence_id, wc->chart_sequence_id, wc->chart_reset_count); chart_reset_t chart_reset; - chart_reset.claim_id = is_agent_claimed(); + chart_reset.claim_id = get_agent_claimid(); if (chart_reset.claim_id) { chart_reset.node_id = node_id; chart_reset.reason = SEQ_ID_NOT_EXISTS; @@ -780,7 +753,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at log_access( "ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64, wc->node_id, - hostname ? hostname : "N/A", + wc->hostname ? wc->hostname : "N/A", sequence_id + 1, wc->chart_timestamp, wc->batch_id); @@ -794,10 +767,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at } } } else { - hostname = get_hostname_by_node_id(node_id); - log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, hostname ? hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, wc->hostname ? wc->hostname : "N/A"); } - freez(hostname); return; } host = host->next; @@ -851,10 +822,15 @@ void aclk_update_retention(struct aclk_database_worker_config *wc) { int rc; - if (!aclk_use_new_cloud_arch || !aclk_connected) + if (!aclk_connected) + return; + + if (wc->host && rrdhost_flag_check(wc->host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS)) { + internal_error(true, "Skipping aclk_update_retention for host %s because context streaming is enabled", wc->host->hostname); return; + } - char *claim_id = is_agent_claimed(); + char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; @@ -935,7 +911,7 @@ void aclk_update_retention(struct aclk_database_worker_config *wc) #ifdef ENABLE_DBENGINE if (memory_mode == RRD_MEMORY_MODE_DBENGINE) rc = - rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t); + rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t, 0); else #endif { @@ -1000,17 +976,12 @@ void aclk_update_retention(struct aclk_database_worker_config *wc) rotate_data.interval_duration_count++; } - char *hostname = NULL; - if (!wc->host) - hostname = get_hostname_by_node_id(wc->node_id); - if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", - wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + wc->node_id, wc->hostname ? wc->hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); else log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", - wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); - freez(hostname); + wc->node_id, wc->hostname ? wc->hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); #ifdef NETDATA_INTERNAL_CHECKS info("Retention update for %s (chart updates = %d)", wc->host_guid, wc->chart_updates); @@ -1094,23 +1065,27 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc) void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) { + RRDHOST *host = rd->rrdset->rrdhost; + if (likely(rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS))) + return; + int live = !last_updated; - if (likely(rd->state->aclk_live_status == live)) + if (likely(rd->aclk_live_status == live)) return; - time_t created_at = rd->state->query_ops.oldest_time(rd); + time_t created_at = rd->tiers[0]->query_ops.oldest_time(rd->tiers[0]->db_metric_handle); if (unlikely(!created_at && rd->updated)) created_at = rd->last_collected_time.tv_sec; - rd->state->aclk_live_status = live; + rd->aclk_live_status = live; struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker; if (unlikely(!wc)) return; - char *claim_id = is_agent_claimed(); + char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; @@ -1132,7 +1107,7 @@ void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) return; struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data)); - uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid); + uuid_copy(aclk_cd_data->uuid, rd->metric_uuid); aclk_cd_data->payload = payload; aclk_cd_data->payload_size = size; aclk_cd_data->check_payload = 1; @@ -1147,17 +1122,14 @@ void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) if (unlikely(rc)) { freez(aclk_cd_data->payload); freez(aclk_cd_data); - rd->state->aclk_live_status = !live; + rd->aclk_live_status = !live; } return; } void aclk_send_dimension_update(RRDDIM *rd) { - if (!aclk_use_new_cloud_arch) - return; - - char *claim_id = is_agent_claimed(); + char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; @@ -1167,11 +1139,11 @@ void aclk_send_dimension_update(RRDDIM *rd) time_t now = now_realtime_sec(); int live = ((now - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); - if (!live || rd->state->aclk_live_status != live || !first_entry_t) { + if (!live || rd->aclk_live_status != live || !first_entry_t) { (void)aclk_upd_dimension_event( rd->rrdset->rrdhost->dbsync_worker, claim_id, - &rd->state->metric_uuid, + &rd->metric_uuid, rd->id, rd->name, rd->rrdset->id, @@ -1200,7 +1172,7 @@ void aclk_send_dimension_update(RRDDIM *rd) first_entry_t, last_entry_t, now - last_entry_t); - rd->state->aclk_live_status = live; + rd->aclk_live_status = live; } freez(claim_id); @@ -1324,24 +1296,16 @@ void sql_check_chart_liveness(RRDSET *st) { rrdset_unlock(st); } -#endif //ENABLE_NEW_CLOUD_PROTOCOL - // ST is read locked int queue_chart_to_aclk(RRDSET *st) { -#ifndef ENABLE_NEW_CLOUD_PROTOCOL -#ifdef ENABLE_ACLK - aclk_update_chart(st->rrdhost, st->id, 1); -#else - UNUSED(st); -#endif - return 0; -#else - if (!aclk_use_new_cloud_arch && aclk_connected) { - aclk_update_chart(st->rrdhost, st->id, 1); + RRDHOST *host = st->rrdhost; + + if (likely(rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS))) return 0; - } + return sql_queue_chart_payload((struct aclk_database_worker_config *) st->rrdhost->dbsync_worker, st, ACLK_DATABASE_ADD_CHART); -#endif } + +#endif //ENABLE_ACLK |