diff options
Diffstat (limited to '')
-rw-r--r-- | src/streaming/replication.c (renamed from streaming/replication.c) | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/streaming/replication.c b/src/streaming/replication.c index bc34361b3..6f68fedae 100644 --- a/streaming/replication.c +++ b/src/streaming/replication.c @@ -162,7 +162,7 @@ static struct replication_query *replication_query_prepare( } } - q->backend = st->rrdhost->db[0].eng->backend; + q->backend = st->rrdhost->db[0].eng->seb; // prepare our array of dimensions size_t count = 0; @@ -184,7 +184,7 @@ static struct replication_query *replication_query_prepare( d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); d->rd = rd; - storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, + storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, q->query.after, q->query.before, q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); d->enabled = true; d->skip = false; @@ -1426,7 +1426,7 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma static bool sender_is_still_connected_for_this_request(struct replication_request *rq) { return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender); -}; +} static bool replication_execute_request(struct replication_request *rq, bool workers) { bool ret = false; @@ -1838,17 +1838,16 @@ static void replication_worker_cleanup(void *ptr __maybe_unused) { static void *replication_worker_thread(void *ptr) { replication_initialize_workers(false); - netdata_thread_cleanup_push(replication_worker_cleanup, ptr); - - while(service_running(SERVICE_REPLICATION)) { - if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { - sender_thread_buffer_free(); - worker_is_busy(WORKER_JOB_WAIT); - worker_is_idle(); - sleep_usec(1 * USEC_PER_SEC); + netdata_thread_cleanup_push(replication_worker_cleanup, ptr) { + while (service_running(SERVICE_REPLICATION)) { + if (unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { + sender_thread_buffer_free(); + worker_is_busy(WORKER_JOB_WAIT); + worker_is_idle(); + sleep_usec(1 * USEC_PER_SEC); + } } } - netdata_thread_cleanup_pop(1); return NULL; } |