summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c11
1 files changed, 10 insertions, 1 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index a50913a1a..c6fafc357 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -274,6 +274,12 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
replication_queries.queries_finished += queries;
replication_queries.points_read += q->points_read;
replication_queries.points_generated += q->points_generated;
+
+ if(q->st && q->st->rrdhost->sender) {
+ struct sender_state *s = q->st->rrdhost->sender;
+ s->replication.latest_completed_before_t = q->query.before;
+ }
+
netdata_spinlock_unlock(&replication_queries.spinlock);
}
@@ -644,7 +650,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
buffer_fast_strcat(wb, "\n", 1);
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION);
worker_is_busy(WORKER_JOB_CLEANUP);
if(enable_streaming) {
@@ -1466,6 +1472,9 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.not_indexed_preprocessing = false,
};
+ if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t)
+ sender->replication.oldest_request_after_t = rq.after;
+
if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
replication_execute_request(&rq, false);