diff options
Diffstat (limited to '')
-rw-r--r-- | src/streaming/replication.c (renamed from streaming/replication.c) | 52 |
1 files changed, 24 insertions, 28 deletions
diff --git a/streaming/replication.c b/src/streaming/replication.c index bc34361b3..1f5aeb34c 100644 --- a/streaming/replication.c +++ b/src/streaming/replication.c @@ -162,7 +162,7 @@ static struct replication_query *replication_query_prepare( } } - q->backend = st->rrdhost->db[0].eng->backend; + q->backend = st->rrdhost->db[0].eng->seb; // prepare our array of dimensions size_t count = 0; @@ -184,7 +184,7 @@ static struct replication_query *replication_query_prepare( d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); d->rd = rd; - storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, + storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, q->query.after, q->query.before, q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); d->enabled = true; d->skip = false; @@ -1036,7 +1036,7 @@ static struct replication_thread { struct { size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time - netdata_thread_t **threads_ptrs; + ND_THREAD **threads_ptrs; size_t threads; } main_thread; // access is allowed only by the main thread @@ -1426,7 +1426,7 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma static bool sender_is_still_connected_for_this_request(struct replication_request *rq) { return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender); -}; +} static bool replication_execute_request(struct replication_request *rq, bool workers) { bool ret = false; @@ -1445,8 +1445,6 @@ static bool replication_execute_request(struct replication_request *rq, bool wor goto cleanup; } - netdata_thread_disable_cancelability(); - if(!rq->q) { if(likely(workers)) worker_is_busy(WORKER_JOB_PREPARE_QUERY); @@ -1468,7 +1466,6 @@ static bool replication_execute_request(struct replication_request *rq, bool wor rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL)); rq->q = NULL; - netdata_thread_enable_cancelability(); __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); @@ -1830,18 +1827,18 @@ static int replication_pipeline_execute_next(void) { return REQUEST_OK; } -static void replication_worker_cleanup(void *ptr __maybe_unused) { +static void replication_worker_cleanup(void *pptr) { + if(CLEANUP_FUNCTION_GET_PTR(pptr) != (void *)0x01) return; replication_pipeline_cancel_and_cleanup(); worker_unregister(); } -static void *replication_worker_thread(void *ptr) { +static void *replication_worker_thread(void *ptr __maybe_unused) { + CLEANUP_FUNCTION_REGISTER(replication_worker_cleanup) cleanup_ptr = (void *)0x1; replication_initialize_workers(false); - netdata_thread_cleanup_push(replication_worker_cleanup, ptr); - - while(service_running(SERVICE_REPLICATION)) { - if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { + while (service_running(SERVICE_REPLICATION)) { + if (unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); @@ -1849,25 +1846,25 @@ static void *replication_worker_thread(void *ptr) { } } - netdata_thread_cleanup_pop(1); return NULL; } -static void replication_main_cleanup(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; +static void replication_main_cleanup(void *pptr) { + struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr); + if(!static_thread) return; + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; replication_pipeline_cancel_and_cleanup(); int threads = (int)replication_globals.main_thread.threads; for(int i = 0; i < threads ;i++) { - netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL); - freez(replication_globals.main_thread.threads_ptrs[i]); - __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); + nd_thread_join(replication_globals.main_thread.threads_ptrs[i]); + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED); } freez(replication_globals.main_thread.threads_ptrs); replication_globals.main_thread.threads_ptrs = NULL; - __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED); + __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED); aral_destroy(replication_globals.aral_rse); replication_globals.aral_rse = NULL; @@ -1895,20 +1892,20 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(--threads) { replication_globals.main_thread.threads = threads; - replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *)); - __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED); + replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(ND_THREAD *)); + __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED); for(int i = 0; i < threads ;i++) { char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2); - replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t)); - __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); - netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag, - NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); + replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(ND_THREAD *)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED); + replication_globals.main_thread.threads_ptrs[i] = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE, + replication_worker_thread, NULL); } } - netdata_thread_cleanup_push(replication_main_cleanup, ptr); + CLEANUP_FUNCTION_REGISTER(replication_main_cleanup) cleanup_ptr = ptr; // start from 100% completed worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); @@ -2031,6 +2028,5 @@ void *replication_thread_main(void *ptr __maybe_unused) { } } - netdata_thread_cleanup_pop(1); return NULL; } |