From c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:22 +0100 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- streaming/replication.c | 115 ++++++++++++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 43 deletions(-) (limited to 'streaming/replication.c') diff --git a/streaming/replication.c b/streaming/replication.c index ffb6b3def..bc34361b3 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare( size_t count = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { - if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd))) + if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd))) continue; if (unlikely(rd_dfe.counter >= q->dimensions)) { @@ -213,31 +213,38 @@ static struct replication_query *replication_query_prepare( } static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) { - NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; RRDDIM *rd; rrddim_foreach_read(rd, st){ - if (!rrddim_check_exposed(rd)) continue; + if (!rrddim_check_upstream_exposed(rd)) continue; - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '", - sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); - buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + - (usec_t) rd->collector.last_collected_time.tv_usec); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + + (usec_t) rd->collector.last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); - buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value); + buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value); + buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value); + buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value); buffer_fast_strcat(wb, "\n", 1); } rrddim_foreach_done(rd); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1); - buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec); buffer_fast_strcat(wb, "\n", 1); } @@ -313,7 +320,8 @@ static void replication_query_align_to_optimal_before(struct replication_query * static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { replication_query_align_to_optimal_before(q); - NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; time_t after = q->query.after; time_t before = q->query.before; size_t dimensions = q->dimensions; @@ -344,8 +352,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s if(max_skip <= 0) { d->skip = true; - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query " "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), @@ -394,14 +402,15 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s fix_min_start_time = min_end_time - min_update_every; #ifdef NETDATA_INTERNAL_CHECKS - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' " - "misaligned dimensions, " - "update every (min: %ld, max: %ld), " - "start time (min: %ld, max: %ld), " - "end time (min %ld, max %ld), " - "now %ld, last end time sent %ld, " - "min start time is fixed to %ld", + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "REPLAY WARNING: 'host:%s/chart:%s' " + "misaligned dimensions, " + "update every (min: %ld, max: %ld), " + "start time (min: %ld, max: %ld), " + "end time (min %ld, max %ld), " + "now %ld, last end time sent %ld, " + "min start time is fixed to %ld", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), min_update_every, max_update_every, min_start_time, max_start_time, @@ -444,12 +453,19 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s } last_end_time_in_buffer = min_end_time; - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4); - buffer_print_uint64_encoded(wb, encoding, min_start_time); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '' ", 4); + buffer_print_uint64_encoded(wb, integer_encoding, min_start_time); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, min_end_time); + buffer_print_uint64_encoded(wb, integer_encoding, min_end_time); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); // output the replay values for this time @@ -462,10 +478,17 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s !storage_point_is_unset(d->sp) && !storage_point_is_gap(d->sp))) { - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " \"", 2); buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id)); buffer_fast_strcat(wb, "\" ", 2); - buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum); + buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum); buffer_fast_strcat(wb, " ", 1); buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED); buffer_fast_strcat(wb, "\n", 1); @@ -595,7 +618,8 @@ void replication_response_cancel_and_finalize(struct replication_query *q) { static bool sender_is_still_connected_for_this_request(struct replication_request *rq); bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { - NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; struct replication_request *rq = q->rq; RRDSET *st = q->st; RRDHOST *host = st->rrdhost; @@ -605,12 +629,17 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size // holding the host's buffer lock for too long BUFFER *wb = sender_start(host->sender); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "'\n", 2); -// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - bool locked_data_collection = q->query.locked_data_collection; q->query.locked_data_collection = false; @@ -634,19 +663,19 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size // last end time of the data we sent buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1); - buffer_print_int64_encoded(wb, encoding, st->update_every); + buffer_print_int64_encoded(wb, integer_encoding, st->update_every); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, db_first_entry); + buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, db_last_entry); + buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry); buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7); - buffer_print_uint64_encoded(wb, encoding, after); + buffer_print_uint64_encoded(wb, integer_encoding, after); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, before); + buffer_print_uint64_encoded(wb, integer_encoding, before); buffer_fast_strcat(wb, " ", 1); - buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); @@ -664,7 +693,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size rrdhost_sender_replicating_charts_minus_one(st->rrdhost); if(!finished_with_gap) - st->upstream_resync_time_s = 0; + st->rrdpush.sender.resync_time_s = 0; #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", @@ -729,8 +758,8 @@ static void replicate_log_request(struct replication_request_details *r, const c #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "REPLAY ERROR: 'host:%s/chart:%s' child sent: " "db from %ld to %ld%s, wall clock time %ld, " @@ -793,7 +822,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c #endif // NETDATA_LOG_REPLICATION_REQUESTS char buffer[2048 + 1]; - snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + snprintfz(buffer, sizeof(buffer) - 1, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", rrdset_id(st), r->wanted.start_streaming ? "true" : "false", (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); -- cgit v1.2.3