summaryrefslogtreecommitdiffstats
path: root/src/streaming/replication.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/streaming/replication.c (renamed from streaming/replication.c)52
1 files changed, 24 insertions, 28 deletions
diff --git a/streaming/replication.c b/src/streaming/replication.c
index bc34361b3..1f5aeb34c 100644
--- a/streaming/replication.c
+++ b/src/streaming/replication.c
@@ -162,7 +162,7 @@ static struct replication_query *replication_query_prepare(
}
}
- q->backend = st->rrdhost->db[0].eng->backend;
+ q->backend = st->rrdhost->db[0].eng->seb;
// prepare our array of dimensions
size_t count = 0;
@@ -184,7 +184,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
+ storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -1036,7 +1036,7 @@ static struct replication_thread {
struct {
size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
- netdata_thread_t **threads_ptrs;
+ ND_THREAD **threads_ptrs;
size_t threads;
} main_thread; // access is allowed only by the main thread
@@ -1426,7 +1426,7 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma
static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
-};
+}
static bool replication_execute_request(struct replication_request *rq, bool workers) {
bool ret = false;
@@ -1445,8 +1445,6 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
goto cleanup;
}
- netdata_thread_disable_cancelability();
-
if(!rq->q) {
if(likely(workers))
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
@@ -1468,7 +1466,6 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
rq->q = NULL;
- netdata_thread_enable_cancelability();
__atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
@@ -1830,18 +1827,18 @@ static int replication_pipeline_execute_next(void) {
return REQUEST_OK;
}
-static void replication_worker_cleanup(void *ptr __maybe_unused) {
+static void replication_worker_cleanup(void *pptr) {
+ if(CLEANUP_FUNCTION_GET_PTR(pptr) != (void *)0x01) return;
replication_pipeline_cancel_and_cleanup();
worker_unregister();
}
-static void *replication_worker_thread(void *ptr) {
+static void *replication_worker_thread(void *ptr __maybe_unused) {
+ CLEANUP_FUNCTION_REGISTER(replication_worker_cleanup) cleanup_ptr = (void *)0x1;
replication_initialize_workers(false);
- netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
-
- while(service_running(SERVICE_REPLICATION)) {
- if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
+ while (service_running(SERVICE_REPLICATION)) {
+ if (unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
@@ -1849,25 +1846,25 @@ static void *replication_worker_thread(void *ptr) {
}
}
- netdata_thread_cleanup_pop(1);
return NULL;
}
-static void replication_main_cleanup(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+static void replication_main_cleanup(void *pptr) {
+ struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr);
+ if(!static_thread) return;
+
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
replication_pipeline_cancel_and_cleanup();
int threads = (int)replication_globals.main_thread.threads;
for(int i = 0; i < threads ;i++) {
- netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
- freez(replication_globals.main_thread.threads_ptrs[i]);
- __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
+ nd_thread_join(replication_globals.main_thread.threads_ptrs[i]);
+ __atomic_sub_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED);
}
freez(replication_globals.main_thread.threads_ptrs);
replication_globals.main_thread.threads_ptrs = NULL;
- __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED);
aral_destroy(replication_globals.aral_rse);
replication_globals.aral_rse = NULL;
@@ -1895,20 +1892,20 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(--threads) {
replication_globals.main_thread.threads = threads;
- replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
- __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
+ replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(ND_THREAD *));
+ __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED);
for(int i = 0; i < threads ;i++) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
- replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
- __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
- netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
- NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
+ replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(ND_THREAD *));
+ __atomic_add_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED);
+ replication_globals.main_thread.threads_ptrs[i] = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE,
+ replication_worker_thread, NULL);
}
}
- netdata_thread_cleanup_push(replication_main_cleanup, ptr);
+ CLEANUP_FUNCTION_REGISTER(replication_main_cleanup) cleanup_ptr = ptr;
// start from 100% completed
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
@@ -2031,6 +2028,5 @@ void *replication_thread_main(void *ptr __maybe_unused) {
}
}
- netdata_thread_cleanup_pop(1);
return NULL;
}