summaryrefslogtreecommitdiffstats
path: root/src/streaming/replication.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/replication.c (renamed from streaming/replication.c)23
1 files changed, 11 insertions, 12 deletions
diff --git a/streaming/replication.c b/src/streaming/replication.c
index bc34361b3..6f68fedae 100644
--- a/streaming/replication.c
+++ b/src/streaming/replication.c
@@ -162,7 +162,7 @@ static struct replication_query *replication_query_prepare(
}
}
- q->backend = st->rrdhost->db[0].eng->backend;
+ q->backend = st->rrdhost->db[0].eng->seb;
// prepare our array of dimensions
size_t count = 0;
@@ -184,7 +184,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
+ storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -1426,7 +1426,7 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma
static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
-};
+}
static bool replication_execute_request(struct replication_request *rq, bool workers) {
bool ret = false;
@@ -1838,17 +1838,16 @@ static void replication_worker_cleanup(void *ptr __maybe_unused) {
static void *replication_worker_thread(void *ptr) {
replication_initialize_workers(false);
- netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
-
- while(service_running(SERVICE_REPLICATION)) {
- if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
- sender_thread_buffer_free();
- worker_is_busy(WORKER_JOB_WAIT);
- worker_is_idle();
- sleep_usec(1 * USEC_PER_SEC);
+ netdata_thread_cleanup_push(replication_worker_cleanup, ptr) {
+ while (service_running(SERVICE_REPLICATION)) {
+ if (unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
+ sender_thread_buffer_free();
+ worker_is_busy(WORKER_JOB_WAIT);
+ worker_is_idle();
+ sleep_usec(1 * USEC_PER_SEC);
+ }
}
}
-
netdata_thread_cleanup_pop(1);
return NULL;
}