diff options
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index a50913a1a..c6fafc357 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -274,6 +274,12 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, replication_queries.queries_finished += queries; replication_queries.points_read += q->points_read; replication_queries.points_generated += q->points_generated; + + if(q->st && q->st->rrdhost->sender) { + struct sender_state *s = q->st->rrdhost->sender; + s->replication.latest_completed_before_t = q->query.before; + } + netdata_spinlock_unlock(&replication_queries.spinlock); } @@ -644,7 +650,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size buffer_fast_strcat(wb, "\n", 1); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); - sender_commit(host->sender, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION); worker_is_busy(WORKER_JOB_CLEANUP); if(enable_streaming) { @@ -1466,6 +1472,9 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .not_indexed_preprocessing = false, }; + if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t) + sender->replication.oldest_request_after_t = rq.after; + if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) replication_execute_request(&rq, false); |