diff options
Diffstat (limited to 'database/sqlite/sqlite_aclk_chart.c')
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 315 |
1 files changed, 203 insertions, 112 deletions
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index 7afa1d451..a9db5282a 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -22,20 +22,20 @@ sql_queue_chart_payload(struct aclk_database_worker_config *wc, void *data, enum return rc; } -static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) +static time_t payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) { static __thread sqlite3_stmt *res = NULL; int rc; - int send_status = 0; + time_t send_status = 0; if (unlikely(!res)) { char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " - "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", - uuid_str, uuid_str); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT acl.date_submitted FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " + "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", + uuid_str, uuid_str); rc = prepare_statement(db_meta, sql, &res); if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to check payload data"); + error_report("Failed to prepare statement to check payload data on %s", sql); return 0; } } @@ -49,7 +49,7 @@ static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payl goto bind_fail; while (sqlite3_step(res) == SQLITE_ROW) { - send_status = sqlite3_column_int(res, 0); + send_status = (time_t) sqlite3_column_int64(res, 0); } bind_fail: @@ -58,23 +58,36 @@ bind_fail: return send_status; } -static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id, - ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status) +static int aclk_add_chart_payload( + struct aclk_database_worker_config *wc, + uuid_t *uuid, + char *claim_id, + ACLK_PAYLOAD_TYPE payload_type, + void *payload, + size_t payload_size, + time_t *send_status, + int check_sent) { static __thread sqlite3_stmt *res_chart = NULL; int rc; + time_t date_submitted; - rc = payload_sent(wc->uuid_str, uuid, payload, payload_size); - if (send_status) - *send_status = rc; - if (rc == 1) + if (unlikely(!payload)) return 0; + if (check_sent) { + date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size); + if (send_status) + *send_status = date_submitted; + if (date_submitted) + return 0; + } + if (unlikely(!res_chart)) { char sql[ACLK_SYNC_QUERY_SIZE]; snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, - "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ - "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str); + "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ + "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str); rc = prepare_statement(db_meta, sql, &res_chart); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to store chart payload data"); @@ -146,7 +159,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat chart_payload.id = strdupz(st->id); struct label_index *labels = &st->state->labels; - netdata_rwlock_wrlock(&labels->labels_rwlock); + netdata_rwlock_rdlock(&labels->labels_rwlock); struct label *label_list = labels->head; struct label *chart_label = NULL; while (label_list) { @@ -159,7 +172,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat size_t size; char *payload = generate_chart_instance_updated(&size, &chart_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL); + rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1); freez(payload); chart_instance_updated_destroy(&chart_payload); } @@ -168,7 +181,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid, const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time, - int *send_status) + time_t *send_status) { int rc = 0; size_t size; @@ -197,7 +210,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w dim_payload.last_timestamp.tv_sec = last_time; char *payload = generate_chart_dimension_updated(&size, &dim_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status); + rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1); freez(payload); return rc; } @@ -271,39 +284,22 @@ bind_fail: int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) { - int rc = 0; + int rc = 1; CHECK_SQLITE_CONNECTION(db_meta); - char *claim_id = is_agent_claimed(); - - RRDDIM *rd = cmd.data; - - if (likely(claim_id)) { - int send_status = 0; - time_t now = now_realtime_sec(); - - time_t first_t = rd->state->query_ops.oldest_time(rd); - time_t last_t = rd->state->query_ops.latest_time(rd); - - int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); + struct aclk_chart_dimension_data *aclk_cd_data = cmd.data; - rc = aclk_upd_dimension_event( - wc, - claim_id, - &rd->state->metric_uuid, - rd->id, - rd->name, - rd->rrdset->id, - first_t, - live ? 0 : last_t, - &send_status); + char *claim_id = is_agent_claimed(); + if (!claim_id) + goto cleanup; - if (!send_status) - rd->state->aclk_live_status = live; + rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION, + (void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, aclk_cd_data->check_payload); - freez(claim_id); - } - rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); + freez(claim_id); +cleanup: + freez(aclk_cd_data->payload); + freez(aclk_cd_data); return rc; } @@ -337,6 +333,12 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d char sql[ACLK_SYNC_QUERY_SIZE]; static __thread sqlite3_stmt *res = NULL; + char *hostname = NULL; + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(wc->node_id); + if (unlikely(!res)) { snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \ "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \ @@ -346,6 +348,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to send a chart update via ACLK"); freez(claim_id); + freez(hostname); return; } } @@ -419,7 +422,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", first_sequence, last_sequence, wc->batch_id); @@ -440,7 +443,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", now_realtime_sec() - wc->startup_time); } @@ -459,6 +462,7 @@ bind_fail: error_report("Failed to reset statement when pushing chart events, rc = %d", rc); freez(claim_id); + freez(hostname); return; } @@ -562,7 +566,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_ error_report("Failed to ACK sequence id, rc = %d", rc); else log_access( - "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED in the database upto %" PRIu64, + "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED IN THE DATABASE UP TO %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1); @@ -583,8 +587,13 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl cmd.param1); db_execute(buffer_tostring(sql)); if (cmd.param1 == 1) { + char *hostname = NULL; + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(wc->node_id); buffer_flush(sql); - log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, hostname? hostname : "N/A"); buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \ "DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str); db_lock(); @@ -609,6 +618,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl RRDDIM *rd; rrddim_foreach_read(rd, st) { + rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); rd->state->aclk_live_status = (rd->state->aclk_live_status == 0); } rrdset_unlock(st); @@ -616,9 +626,10 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl rrdhost_unlock(host); } else error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid); + freez(hostname); } else { log_access( - "ACLK STA [%s (%s)]: Restarting chart sync from sequence %" PRIu64, + "ACLK STA [%s (%s)]: RESTARTING CHART SYNC FROM SEQUENCE %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1); @@ -705,25 +716,28 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at if (unlikely(!node_id)) return; - // log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM from %"PRIu64" t=%ld batch=%"PRIu64, node_id, - // sequence_id, created_at, batch_id); - uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) { log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM ignored, invalid node id", node_id); return; } - struct aclk_database_worker_config *wc = NULL; + struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); rrd_rdlock(); RRDHOST *host = localhost; while(host) { - if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) { + if (wc || (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))) { rrd_unlock(); - wc = (struct aclk_database_worker_config *)host->dbsync_worker ? - (struct aclk_database_worker_config *)host->dbsync_worker : - (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); + if (!wc) + wc = (struct aclk_database_worker_config *)host->dbsync_worker ? + (struct aclk_database_worker_config *)host->dbsync_worker : + (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); + char *hostname = NULL; if (likely(wc)) { + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(node_id); wc->chart_reset_count++; __sync_synchronize(); wc->chart_updates = 0; @@ -731,9 +745,10 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at __sync_synchronize(); wc->batch_created = now_realtime_sec(); log_access( - "ACLK REQ [%s (%s)]: CHARTS STREAM from %" PRIu64 " t=%ld resets=%d", + "ACLK REQ [%s (%s)]: CHARTS STREAM from %"PRIu64" (LOCAL %"PRIu64") t=%ld resets=%d" , wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", + sequence_id + 1, wc->chart_sequence_id, wc->chart_timestamp, wc->chart_reset_count); @@ -742,7 +757,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at "ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST " "remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ", wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", sequence_id, wc->chart_sequence_id, wc->chart_reset_count); @@ -756,7 +771,6 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at freez(chart_reset.claim_id); wc->chart_reset_count = -1; } - return; } else { struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -766,8 +780,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at log_access( "ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", - wc->chart_sequence_id, + hostname ? hostname : "N/A", + sequence_id + 1, wc->chart_timestamp, wc->batch_id); cmd.opcode = ACLK_DATABASE_RESET_CHART; @@ -775,20 +789,15 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at cmd.completion = NULL; aclk_database_enq_cmd(wc, &cmd); } else { -// log_access( -// "ACLK RES [%s (%s)]: CHARTS STREAM from %" PRIu64 -// " t=%ld resets=%d", -// wc->node_id, -// wc->host ? wc->host->hostname : "N/A", -// wc->chart_sequence_id, -// wc->chart_timestamp, -// wc->chart_reset_count); wc->chart_reset_count = 0; wc->chart_updates = 1; } } - } else - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); + } else { + hostname = get_hostname_by_node_id(node_id); + log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, hostname ? hostname : "N/A"); + } + freez(hostname); return; } host = host->next; @@ -838,9 +847,8 @@ failed: "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \ "WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;" -void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +void aclk_update_retention(struct aclk_database_worker_config *wc) { - UNUSED(cmd); int rc; if (!aclk_use_new_cloud_arch || !aclk_connected) @@ -887,7 +895,10 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d time_t last_entry_t; uint32_t update_every = 0; uint32_t dimension_update_count = 0; - int send_status; + uint32_t total_checked = 0; + uint32_t total_deleted= 0; + uint32_t total_stopped= 0; + time_t send_status; struct retention_updated rotate_data; @@ -904,7 +915,9 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.node_id = strdupz(wc->node_id); time_t now = now_realtime_sec(); - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step(res) == SQLITE_ROW && dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP) { + if (unlikely(netdata_exit)) + break; if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) { if (update_every) { debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); @@ -942,23 +955,40 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d if (likely(!rc && first_entry_t)) start_time = MIN(start_time, first_entry_t); - if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) { + if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every)); - if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { - (void)aclk_upd_dimension_event( - wc, - claim_id, - (uuid_t *)sqlite3_column_blob(res, 0), - (const char *)(const char *)sqlite3_column_text(res, 3), - (const char *)(const char *)sqlite3_column_text(res, 4), - (const char *)(const char *)sqlite3_column_text(res, 2), - first_entry_t, - live ? 0 : last_entry_t, - &send_status); - if (!send_status) + if (rc) { + first_entry_t = 0; + last_entry_t = 0; + live = 0; + } + if (!wc->host || !first_entry_t) { + if (!first_entry_t) { + delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0)); + total_deleted++; dimension_update_count++; + } + else { + (void)aclk_upd_dimension_event( + wc, + claim_id, + (uuid_t *)sqlite3_column_blob(res, 0), + (const char *)(const char *)sqlite3_column_text(res, 3), + (const char *)(const char *)sqlite3_column_text(res, 4), + (const char *)(const char *)sqlite3_column_text(res, 2), + first_entry_t, + live ? 0 : last_entry_t, + &send_status); + + if (!send_status) { + if (last_entry_t) + total_stopped++; + dimension_update_count++; + } + } } } + total_checked++; } if (update_every) { debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); @@ -970,7 +1000,20 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.interval_duration_count++; } + char *hostname = NULL; + if (!wc->host) + hostname = get_hostname_by_node_id(wc->node_id); + + if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) + log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", + wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + else + log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", + wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + freez(hostname); + #ifdef NETDATA_INTERNAL_CHECKS + info("Retention update for %s (chart updates = %d)", wc->host_guid, wc->chart_updates); for (int i = 0; i < rotate_data.interval_duration_count; ++i) info( "Update for host %s (node %s) for %u Retention = %u", @@ -979,7 +1022,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.interval_durations[i].update_every, rotate_data.interval_durations[i].retention); #endif - aclk_retention_updated(&rotate_data); + if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) + aclk_retention_updated(&rotate_data); freez(rotate_data.node_id); freez(rotate_data.interval_durations); @@ -1048,11 +1092,64 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc) return; } -int queue_dimension_to_aclk(RRDDIM *rd) +void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) { - int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker, - rd, ACLK_DATABASE_ADD_DIMENSION); - return rc; + int live = !last_updated; + + if (likely(rd->state->aclk_live_status == live)) + return; + + time_t created_at = rd->state->query_ops.oldest_time(rd); + + if (unlikely(!created_at && rd->updated)) + created_at = rd->last_collected_time.tv_sec; + + rd->state->aclk_live_status = live; + + struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker; + if (unlikely(!wc)) + return; + + char *claim_id = is_agent_claimed(); + if (unlikely(!claim_id)) + return; + + struct chart_dimension_updated dim_payload; + memset(&dim_payload, 0, sizeof(dim_payload)); + dim_payload.node_id = wc->node_id; + dim_payload.claim_id = claim_id; + dim_payload.name = rd->name; + dim_payload.id = rd->id; + dim_payload.chart_id = rd->rrdset->id; + dim_payload.created_at.tv_sec = created_at; + dim_payload.last_timestamp.tv_sec = last_updated; + + size_t size = 0; + char *payload = generate_chart_dimension_updated(&size, &dim_payload); + + freez(claim_id); + if (unlikely(!payload)) + return; + + struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data)); + uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid); + aclk_cd_data->payload = payload; + aclk_cd_data->payload_size = size; + aclk_cd_data->check_payload = 1; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + + cmd.opcode = ACLK_DATABASE_ADD_DIMENSION; + cmd.data = aclk_cd_data; + int rc = aclk_database_enq_cmd_noblock(wc, &cmd); + + if (unlikely(rc)) { + freez(aclk_cd_data->payload); + freez(aclk_cd_data); + rd->state->aclk_live_status = !live; + } + return; } void aclk_send_dimension_update(RRDDIM *rd) @@ -1203,6 +1300,12 @@ void sql_check_chart_liveness(RRDSET *st) { return; rrdset_rdlock(st); + + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + rrdset_unlock(st); + return; + } + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) { debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name); @@ -1215,20 +1318,8 @@ void sql_check_chart_liveness(RRDSET *st) { debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name); rrddim_foreach_read(rd, st) { - if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { - int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every; - if (unlikely(live != rd->state->aclk_live_status)) { - if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - if (likely(!queue_dimension_to_aclk(rd))) { - debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live); - rd->state->aclk_live_status = live; - rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); - } - } - } - else - debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name); - } + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) + queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); } rrdset_unlock(st); } |