summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk_chart.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/sqlite/sqlite_aclk_chart.c')
-rw-r--r--database/sqlite/sqlite_aclk_chart.c315
1 files changed, 203 insertions, 112 deletions
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c
index 7afa1d451..a9db5282a 100644
--- a/database/sqlite/sqlite_aclk_chart.c
+++ b/database/sqlite/sqlite_aclk_chart.c
@@ -22,20 +22,20 @@ sql_queue_chart_payload(struct aclk_database_worker_config *wc, void *data, enum
return rc;
}
-static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size)
+static time_t payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size)
{
static __thread sqlite3_stmt *res = NULL;
int rc;
- int send_status = 0;
+ time_t send_status = 0;
if (unlikely(!res)) {
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp "
- "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;",
- uuid_str, uuid_str);
+ snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT acl.date_submitted FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp "
+ "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;",
+ uuid_str, uuid_str);
rc = prepare_statement(db_meta, sql, &res);
if (rc != SQLITE_OK) {
- error_report("Failed to prepare statement to check payload data");
+ error_report("Failed to prepare statement to check payload data on %s", sql);
return 0;
}
}
@@ -49,7 +49,7 @@ static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payl
goto bind_fail;
while (sqlite3_step(res) == SQLITE_ROW) {
- send_status = sqlite3_column_int(res, 0);
+ send_status = (time_t) sqlite3_column_int64(res, 0);
}
bind_fail:
@@ -58,23 +58,36 @@ bind_fail:
return send_status;
}
-static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id,
- ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status)
+static int aclk_add_chart_payload(
+ struct aclk_database_worker_config *wc,
+ uuid_t *uuid,
+ char *claim_id,
+ ACLK_PAYLOAD_TYPE payload_type,
+ void *payload,
+ size_t payload_size,
+ time_t *send_status,
+ int check_sent)
{
static __thread sqlite3_stmt *res_chart = NULL;
int rc;
+ time_t date_submitted;
- rc = payload_sent(wc->uuid_str, uuid, payload, payload_size);
- if (send_status)
- *send_status = rc;
- if (rc == 1)
+ if (unlikely(!payload))
return 0;
+ if (check_sent) {
+ date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size);
+ if (send_status)
+ *send_status = date_submitted;
+ if (date_submitted)
+ return 0;
+ }
+
if (unlikely(!res_chart)) {
char sql[ACLK_SYNC_QUERY_SIZE];
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,
- "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \
- "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str);
+ "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \
+ "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str);
rc = prepare_statement(db_meta, sql, &res_chart);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to store chart payload data");
@@ -146,7 +159,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
chart_payload.id = strdupz(st->id);
struct label_index *labels = &st->state->labels;
- netdata_rwlock_wrlock(&labels->labels_rwlock);
+ netdata_rwlock_rdlock(&labels->labels_rwlock);
struct label *label_list = labels->head;
struct label *chart_label = NULL;
while (label_list) {
@@ -159,7 +172,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
size_t size;
char *payload = generate_chart_instance_updated(&size, &chart_payload);
if (likely(payload))
- rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL);
+ rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1);
freez(payload);
chart_instance_updated_destroy(&chart_payload);
}
@@ -168,7 +181,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid,
const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time,
- int *send_status)
+ time_t *send_status)
{
int rc = 0;
size_t size;
@@ -197,7 +210,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w
dim_payload.last_timestamp.tv_sec = last_time;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
if (likely(payload))
- rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status);
+ rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1);
freez(payload);
return rc;
}
@@ -271,39 +284,22 @@ bind_fail:
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
- int rc = 0;
+ int rc = 1;
CHECK_SQLITE_CONNECTION(db_meta);
- char *claim_id = is_agent_claimed();
-
- RRDDIM *rd = cmd.data;
-
- if (likely(claim_id)) {
- int send_status = 0;
- time_t now = now_realtime_sec();
-
- time_t first_t = rd->state->query_ops.oldest_time(rd);
- time_t last_t = rd->state->query_ops.latest_time(rd);
-
- int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every));
+ struct aclk_chart_dimension_data *aclk_cd_data = cmd.data;
- rc = aclk_upd_dimension_event(
- wc,
- claim_id,
- &rd->state->metric_uuid,
- rd->id,
- rd->name,
- rd->rrdset->id,
- first_t,
- live ? 0 : last_t,
- &send_status);
+ char *claim_id = is_agent_claimed();
+ if (!claim_id)
+ goto cleanup;
- if (!send_status)
- rd->state->aclk_live_status = live;
+ rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION,
+ (void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, aclk_cd_data->check_payload);
- freez(claim_id);
- }
- rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
+ freez(claim_id);
+cleanup:
+ freez(aclk_cd_data->payload);
+ freez(aclk_cd_data);
return rc;
}
@@ -337,6 +333,12 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
char sql[ACLK_SYNC_QUERY_SIZE];
static __thread sqlite3_stmt *res = NULL;
+ char *hostname = NULL;
+ if (wc->host)
+ hostname = strdupz(wc->host->hostname);
+ else
+ hostname = get_hostname_by_node_id(wc->node_id);
+
if (unlikely(!res)) {
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \
"FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \
@@ -346,6 +348,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to send a chart update via ACLK");
freez(claim_id);
+ freez(hostname);
return;
}
}
@@ -419,7 +422,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
log_access(
"ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
wc->node_id,
- wc->host ? wc->host->hostname : "N/A",
+ hostname ? hostname : "N/A",
first_sequence,
last_sequence,
wc->batch_id);
@@ -440,7 +443,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
log_access(
"ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.",
wc->node_id,
- wc->host ? wc->host->hostname : "N/A",
+ hostname ? hostname : "N/A",
now_realtime_sec() - wc->startup_time);
}
@@ -459,6 +462,7 @@ bind_fail:
error_report("Failed to reset statement when pushing chart events, rc = %d", rc);
freez(claim_id);
+ freez(hostname);
return;
}
@@ -562,7 +566,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
error_report("Failed to ACK sequence id, rc = %d", rc);
else
log_access(
- "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED in the database upto %" PRIu64,
+ "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED IN THE DATABASE UP TO %" PRIu64,
wc->node_id,
wc->host ? wc->host->hostname : "N/A",
cmd.param1);
@@ -583,8 +587,13 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
cmd.param1);
db_execute(buffer_tostring(sql));
if (cmd.param1 == 1) {
+ char *hostname = NULL;
+ if (wc->host)
+ hostname = strdupz(wc->host->hostname);
+ else
+ hostname = get_hostname_by_node_id(wc->node_id);
buffer_flush(sql);
- log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, hostname? hostname : "N/A");
buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \
"DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
db_lock();
@@ -609,6 +618,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
RRDDIM *rd;
rrddim_foreach_read(rd, st)
{
+ rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
rd->state->aclk_live_status = (rd->state->aclk_live_status == 0);
}
rrdset_unlock(st);
@@ -616,9 +626,10 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
rrdhost_unlock(host);
} else
error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid);
+ freez(hostname);
} else {
log_access(
- "ACLK STA [%s (%s)]: Restarting chart sync from sequence %" PRIu64,
+ "ACLK STA [%s (%s)]: RESTARTING CHART SYNC FROM SEQUENCE %" PRIu64,
wc->node_id,
wc->host ? wc->host->hostname : "N/A",
cmd.param1);
@@ -705,25 +716,28 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
if (unlikely(!node_id))
return;
- // log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM from %"PRIu64" t=%ld batch=%"PRIu64, node_id,
- // sequence_id, created_at, batch_id);
-
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid)) {
log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM ignored, invalid node id", node_id);
return;
}
- struct aclk_database_worker_config *wc = NULL;
+ struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id);
rrd_rdlock();
RRDHOST *host = localhost;
while(host) {
- if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
+ if (wc || (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))) {
rrd_unlock();
- wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
- (struct aclk_database_worker_config *)host->dbsync_worker :
- (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
+ if (!wc)
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
+ (struct aclk_database_worker_config *)host->dbsync_worker :
+ (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
+ char *hostname = NULL;
if (likely(wc)) {
+ if (wc->host)
+ hostname = strdupz(wc->host->hostname);
+ else
+ hostname = get_hostname_by_node_id(node_id);
wc->chart_reset_count++;
__sync_synchronize();
wc->chart_updates = 0;
@@ -731,9 +745,10 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
__sync_synchronize();
wc->batch_created = now_realtime_sec();
log_access(
- "ACLK REQ [%s (%s)]: CHARTS STREAM from %" PRIu64 " t=%ld resets=%d",
+ "ACLK REQ [%s (%s)]: CHARTS STREAM from %"PRIu64" (LOCAL %"PRIu64") t=%ld resets=%d" ,
wc->node_id,
- wc->host ? wc->host->hostname : "N/A",
+ hostname ? hostname : "N/A",
+ sequence_id + 1,
wc->chart_sequence_id,
wc->chart_timestamp,
wc->chart_reset_count);
@@ -742,7 +757,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
"ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST "
"remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ",
wc->node_id,
- wc->host ? wc->host->hostname : "N/A",
+ hostname ? hostname : "N/A",
sequence_id,
wc->chart_sequence_id,
wc->chart_reset_count);
@@ -756,7 +771,6 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
freez(chart_reset.claim_id);
wc->chart_reset_count = -1;
}
- return;
} else {
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@@ -766,8 +780,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
log_access(
"ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64,
wc->node_id,
- wc->host ? wc->host->hostname : "N/A",
- wc->chart_sequence_id,
+ hostname ? hostname : "N/A",
+ sequence_id + 1,
wc->chart_timestamp,
wc->batch_id);
cmd.opcode = ACLK_DATABASE_RESET_CHART;
@@ -775,20 +789,15 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
cmd.completion = NULL;
aclk_database_enq_cmd(wc, &cmd);
} else {
-// log_access(
-// "ACLK RES [%s (%s)]: CHARTS STREAM from %" PRIu64
-// " t=%ld resets=%d",
-// wc->node_id,
-// wc->host ? wc->host->hostname : "N/A",
-// wc->chart_sequence_id,
-// wc->chart_timestamp,
-// wc->chart_reset_count);
wc->chart_reset_count = 0;
wc->chart_updates = 1;
}
}
- } else
- log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
+ } else {
+ hostname = get_hostname_by_node_id(node_id);
+ log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, hostname ? hostname : "N/A");
+ }
+ freez(hostname);
return;
}
host = host->next;
@@ -838,9 +847,8 @@ failed:
"SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \
"WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
-void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+void aclk_update_retention(struct aclk_database_worker_config *wc)
{
- UNUSED(cmd);
int rc;
if (!aclk_use_new_cloud_arch || !aclk_connected)
@@ -887,7 +895,10 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
time_t last_entry_t;
uint32_t update_every = 0;
uint32_t dimension_update_count = 0;
- int send_status;
+ uint32_t total_checked = 0;
+ uint32_t total_deleted= 0;
+ uint32_t total_stopped= 0;
+ time_t send_status;
struct retention_updated rotate_data;
@@ -904,7 +915,9 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.node_id = strdupz(wc->node_id);
time_t now = now_realtime_sec();
- while (sqlite3_step(res) == SQLITE_ROW) {
+ while (sqlite3_step(res) == SQLITE_ROW && dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP) {
+ if (unlikely(netdata_exit))
+ break;
if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) {
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
@@ -942,23 +955,40 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
if (likely(!rc && first_entry_t))
start_time = MIN(start_time, first_entry_t);
- if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) {
+ if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) {
int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every));
- if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) {
- (void)aclk_upd_dimension_event(
- wc,
- claim_id,
- (uuid_t *)sqlite3_column_blob(res, 0),
- (const char *)(const char *)sqlite3_column_text(res, 3),
- (const char *)(const char *)sqlite3_column_text(res, 4),
- (const char *)(const char *)sqlite3_column_text(res, 2),
- first_entry_t,
- live ? 0 : last_entry_t,
- &send_status);
- if (!send_status)
+ if (rc) {
+ first_entry_t = 0;
+ last_entry_t = 0;
+ live = 0;
+ }
+ if (!wc->host || !first_entry_t) {
+ if (!first_entry_t) {
+ delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0));
+ total_deleted++;
dimension_update_count++;
+ }
+ else {
+ (void)aclk_upd_dimension_event(
+ wc,
+ claim_id,
+ (uuid_t *)sqlite3_column_blob(res, 0),
+ (const char *)(const char *)sqlite3_column_text(res, 3),
+ (const char *)(const char *)sqlite3_column_text(res, 4),
+ (const char *)(const char *)sqlite3_column_text(res, 2),
+ first_entry_t,
+ live ? 0 : last_entry_t,
+ &send_status);
+
+ if (!send_status) {
+ if (last_entry_t)
+ total_stopped++;
+ dimension_update_count++;
+ }
+ }
}
}
+ total_checked++;
}
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
@@ -970,7 +1000,20 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.interval_duration_count++;
}
+ char *hostname = NULL;
+ if (!wc->host)
+ hostname = get_hostname_by_node_id(wc->node_id);
+
+ if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit)
+ log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
+ wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
+ else
+ log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
+ wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
+ freez(hostname);
+
#ifdef NETDATA_INTERNAL_CHECKS
+ info("Retention update for %s (chart updates = %d)", wc->host_guid, wc->chart_updates);
for (int i = 0; i < rotate_data.interval_duration_count; ++i)
info(
"Update for host %s (node %s) for %u Retention = %u",
@@ -979,7 +1022,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.interval_durations[i].update_every,
rotate_data.interval_durations[i].retention);
#endif
- aclk_retention_updated(&rotate_data);
+ if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit)
+ aclk_retention_updated(&rotate_data);
freez(rotate_data.node_id);
freez(rotate_data.interval_durations);
@@ -1048,11 +1092,64 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc)
return;
}
-int queue_dimension_to_aclk(RRDDIM *rd)
+void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated)
{
- int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker,
- rd, ACLK_DATABASE_ADD_DIMENSION);
- return rc;
+ int live = !last_updated;
+
+ if (likely(rd->state->aclk_live_status == live))
+ return;
+
+ time_t created_at = rd->state->query_ops.oldest_time(rd);
+
+ if (unlikely(!created_at && rd->updated))
+ created_at = rd->last_collected_time.tv_sec;
+
+ rd->state->aclk_live_status = live;
+
+ struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker;
+ if (unlikely(!wc))
+ return;
+
+ char *claim_id = is_agent_claimed();
+ if (unlikely(!claim_id))
+ return;
+
+ struct chart_dimension_updated dim_payload;
+ memset(&dim_payload, 0, sizeof(dim_payload));
+ dim_payload.node_id = wc->node_id;
+ dim_payload.claim_id = claim_id;
+ dim_payload.name = rd->name;
+ dim_payload.id = rd->id;
+ dim_payload.chart_id = rd->rrdset->id;
+ dim_payload.created_at.tv_sec = created_at;
+ dim_payload.last_timestamp.tv_sec = last_updated;
+
+ size_t size = 0;
+ char *payload = generate_chart_dimension_updated(&size, &dim_payload);
+
+ freez(claim_id);
+ if (unlikely(!payload))
+ return;
+
+ struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data));
+ uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid);
+ aclk_cd_data->payload = payload;
+ aclk_cd_data->payload_size = size;
+ aclk_cd_data->check_payload = 1;
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+
+ cmd.opcode = ACLK_DATABASE_ADD_DIMENSION;
+ cmd.data = aclk_cd_data;
+ int rc = aclk_database_enq_cmd_noblock(wc, &cmd);
+
+ if (unlikely(rc)) {
+ freez(aclk_cd_data->payload);
+ freez(aclk_cd_data);
+ rd->state->aclk_live_status = !live;
+ }
+ return;
}
void aclk_send_dimension_update(RRDDIM *rd)
@@ -1203,6 +1300,12 @@ void sql_check_chart_liveness(RRDSET *st) {
return;
rrdset_rdlock(st);
+
+ if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ rrdset_unlock(st);
+ return;
+ }
+
if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) {
debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name);
@@ -1215,20 +1318,8 @@ void sql_check_chart_liveness(RRDSET *st) {
debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name);
rrddim_foreach_read(rd, st) {
- if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
- int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every;
- if (unlikely(live != rd->state->aclk_live_status)) {
- if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
- if (likely(!queue_dimension_to_aclk(rd))) {
- debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live);
- rd->state->aclk_live_status = live;
- rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
- }
- }
- }
- else
- debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name);
- }
+ if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN))
+ queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
}
rrdset_unlock(st);
}