summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /streaming/replication.c
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c115
1 files changed, 72 insertions, 43 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index ffb6b3def..bc34361b3 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare(
size_t count = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd)))
+ if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd)))
continue;
if (unlikely(rd_dfe.counter >= q->dimensions)) {
@@ -213,31 +213,38 @@ static struct replication_query *replication_query_prepare(
}
static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
- NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
rrddim_foreach_read(rd, st){
- if (!rrddim_check_exposed(rd)) continue;
+ if (!rrddim_check_upstream_exposed(rd)) continue;
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
- sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
- (usec_t) rd->collector.last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->collector.last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value);
+ buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value);
+ buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value);
buffer_fast_strcat(wb, "\n", 1);
}
rrddim_foreach_done(rd);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
+ buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
buffer_fast_strcat(wb, "\n", 1);
}
@@ -313,7 +320,8 @@ static void replication_query_align_to_optimal_before(struct replication_query *
static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
replication_query_align_to_optimal_before(q);
- NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
time_t after = q->query.after;
time_t before = q->query.before;
size_t dimensions = q->dimensions;
@@ -344,8 +352,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
if(max_skip <= 0) {
d->skip = true;
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
"beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
@@ -394,14 +402,15 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
fix_min_start_time = min_end_time - min_update_every;
#ifdef NETDATA_INTERNAL_CHECKS
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
- "misaligned dimensions, "
- "update every (min: %ld, max: %ld), "
- "start time (min: %ld, max: %ld), "
- "end time (min %ld, max %ld), "
- "now %ld, last end time sent %ld, "
- "min start time is fixed to %ld",
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "REPLAY WARNING: 'host:%s/chart:%s' "
+ "misaligned dimensions, "
+ "update every (min: %ld, max: %ld), "
+ "start time (min: %ld, max: %ld), "
+ "end time (min %ld, max %ld), "
+ "now %ld, last end time sent %ld, "
+ "min start time is fixed to %ld",
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
min_update_every, max_update_every,
min_start_time, max_start_time,
@@ -444,12 +453,19 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
}
last_end_time_in_buffer = min_end_time;
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
- buffer_print_uint64_encoded(wb, encoding, min_start_time);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '' ", 4);
+ buffer_print_uint64_encoded(wb, integer_encoding, min_start_time);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, min_end_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, min_end_time);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
// output the replay values for this time
@@ -462,10 +478,17 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
!storage_point_is_unset(d->sp) &&
!storage_point_is_gap(d->sp))) {
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " \"", 2);
buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
buffer_fast_strcat(wb, "\" ", 2);
- buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum);
+ buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum);
buffer_fast_strcat(wb, " ", 1);
buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
buffer_fast_strcat(wb, "\n", 1);
@@ -595,7 +618,8 @@ void replication_response_cancel_and_finalize(struct replication_query *q) {
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
- NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
struct replication_request *rq = q->rq;
RRDSET *st = q->st;
RRDHOST *host = st->rrdhost;
@@ -605,12 +629,17 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// holding the host's buffer lock for too long
BUFFER *wb = sender_start(host->sender);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "'\n", 2);
-// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
-
bool locked_data_collection = q->query.locked_data_collection;
q->query.locked_data_collection = false;
@@ -634,19 +663,19 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// last end time of the data we sent
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
- buffer_print_int64_encoded(wb, encoding, st->update_every);
+ buffer_print_int64_encoded(wb, integer_encoding, st->update_every);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, db_first_entry);
+ buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, db_last_entry);
+ buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry);
buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
- buffer_print_uint64_encoded(wb, encoding, after);
+ buffer_print_uint64_encoded(wb, integer_encoding, after);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, before);
+ buffer_print_uint64_encoded(wb, integer_encoding, before);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
@@ -664,7 +693,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
if(!finished_with_gap)
- st->upstream_resync_time_s = 0;
+ st->rrdpush.sender.resync_time_s = 0;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
@@ -729,8 +758,8 @@ static void replicate_log_request(struct replication_request_details *r, const c
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"REPLAY ERROR: 'host:%s/chart:%s' child sent: "
"db from %ld to %ld%s, wall clock time %ld, "
@@ -793,7 +822,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
#endif // NETDATA_LOG_REPLICATION_REQUESTS
char buffer[2048 + 1];
- snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ snprintfz(buffer, sizeof(buffer) - 1, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
(unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);