diff options
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 340 |
1 files changed, 211 insertions, 129 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 7c1f16b4c..a50913a1a 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -88,6 +88,7 @@ struct replication_query { bool locked_data_collection; bool execute; bool interrupted; + STREAM_CAPABILITIES capabilities; } query; time_t wall_clock_time; @@ -95,7 +96,7 @@ struct replication_query { size_t points_read; size_t points_generated; - struct storage_engine_query_ops *ops; + STORAGE_ENGINE_BACKEND backend; struct replication_request *rq; size_t dimensions; @@ -112,7 +113,8 @@ static struct replication_query *replication_query_prepare( time_t query_after, time_t query_before, bool query_enable_streaming, - time_t wall_clock_time + time_t wall_clock_time, + STREAM_CAPABILITIES capabilities ) { size_t dimensions = rrdset_number_of_dimensions(st); struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); @@ -131,6 +133,7 @@ static struct replication_query *replication_query_prepare( q->query.after = query_after; q->query.before = query_before; q->query.enable_streaming = query_enable_streaming; + q->query.capabilities = capabilities; q->wall_clock_time = wall_clock_time; @@ -159,7 +162,7 @@ static struct replication_query *replication_query_prepare( } } - q->ops = &st->rrdhost->db[0].eng->api.query_ops; + q->backend = st->rrdhost->db[0].eng->backend; // prepare our array of dimensions size_t count = 0; @@ -181,7 +184,7 @@ static struct replication_query *replication_query_prepare( d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); d->rd = rd; - q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, + storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); d->enabled = true; d->skip = false; @@ -209,32 +212,40 @@ static struct replication_query *replication_query_prepare( return q; } -static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { +static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) { + NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(!rd->exposed) continue; - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", - rrddim_id(rd), - (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, - rd->last_collected_value, - rd->last_calculated_value, - rd->last_stored_value - ); - } + rrddim_foreach_read(rd, st){ + if (!rd->exposed) continue; + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '", + sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC + + (usec_t) rd->last_collected_time.tv_usec); + buffer_fast_strcat(wb, " ", 1); + buffer_print_int64_encoded(wb, encoding, rd->last_collected_value); + buffer_fast_strcat(wb, " ", 1); + buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value); + buffer_fast_strcat(wb, " ", 1); + buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value); + buffer_fast_strcat(wb, "\n", 1); + } rrddim_foreach_done(rd); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", - (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, - (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec - ); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1); + buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec); + buffer_fast_strcat(wb, "\n", 1); } static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) { size_t dimensions = q->dimensions; if(wb && q->query.enable_streaming) - replication_send_chart_collection_state(wb, q->st); + replication_send_chart_collection_state(wb, q->st, q->query.capabilities); if(q->query.locked_data_collection) { netdata_spinlock_unlock(&q->st->data_collection_lock); @@ -249,7 +260,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, struct replication_dimension *d = &q->data[i]; if (unlikely(!d->enabled)) continue; - q->ops->finalize(&d->handle); + storage_engine_query_finalize(&d->handle); dictionary_acquired_item_release(d->dict, d->rda); @@ -281,7 +292,7 @@ static void replication_query_align_to_optimal_before(struct replication_query * struct replication_dimension *d = &q->data[i]; if(unlikely(!d->enabled)) continue; - time_t new_before = q->ops->align_to_optimal_before(&d->handle); + time_t new_before = storage_engine_align_to_optimal_before(&d->handle); if (!expanded_before || new_before < expanded_before) expanded_before = new_before; } @@ -296,13 +307,14 @@ static void replication_query_align_to_optimal_before(struct replication_query * static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { replication_query_align_to_optimal_before(q); + NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; time_t after = q->query.after; time_t before = q->query.before; size_t dimensions = q->dimensions; - struct storage_engine_query_ops *ops = q->ops; time_t wall_clock_time = q->wall_clock_time; - size_t points_read = q->points_read, points_generated = q->points_generated; + bool finished_with_gap = false; + size_t points_read = 0, points_generated = 0; #ifdef NETDATA_LOG_REPLICATION_REQUESTS time_t actual_after = 0, actual_before = 0; @@ -318,8 +330,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s // fetch the first valid point for the dimension int max_skip = 1000; - while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) { - d->sp = ops->next_metric(&d->handle); + while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) { + d->sp = storage_engine_query_next_metric(&d->handle); points_read++; } @@ -328,9 +340,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s error_limit_static_global_var(erl, 1, 0); error_limit(&erl, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", - rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), - (unsigned long long) now); + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query " + "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), + (unsigned long long) now); continue; } @@ -374,9 +387,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s else fix_min_start_time = min_end_time - min_update_every; +#ifdef NETDATA_INTERNAL_CHECKS error_limit_static_global_var(erl, 1, 0); error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' " - "misaligned dimensions " + "misaligned dimensions, " "update every (min: %ld, max: %ld), " "start time (min: %ld, max: %ld), " "end time (min %ld, max %ld), " @@ -389,6 +403,7 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s now, last_end_time_in_buffer, fix_min_start_time ); +#endif min_start_time = fix_min_start_time; } @@ -410,7 +425,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s q->query.before = last_end_time_in_buffer; q->query.enable_streaming = false; - internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. " + internal_error(true, "REPLICATION: current buffer size %zu is more than the " + "max message size %zu for chart '%s' of host '%s'. " "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.", buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), q->request.after, q->request.before, q->request.enable_streaming?"true":"false", @@ -422,11 +438,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s } last_end_time_in_buffer = min_end_time; - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n", - (unsigned long long) min_start_time, - (unsigned long long) min_end_time, - (unsigned long long) wall_clock_time - ); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4); + buffer_print_uint64_encoded(wb, encoding, min_start_time); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, encoding, min_end_time); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_fast_strcat(wb, "\n", 1); // output the replay values for this time for (size_t i = 0; i < dimensions; i++) { @@ -438,8 +456,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s !storage_point_is_unset(d->sp) && !storage_point_is_gap(d->sp))) { - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", - rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2); + buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id)); + buffer_fast_strcat(wb, "\" ", 2); + buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum); + buffer_fast_strcat(wb, " ", 1); + buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED); + buffer_fast_strcat(wb, "\n", 1); points_generated++; } @@ -450,9 +473,16 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s else if(unlikely(min_end_time < now)) // the query does not progress break; - else + else { // we have gap - all points are in the future now = min_start_time; + + if(min_start_time > before && !points_generated) { + before = q->query.before = min_start_time - 1; + finished_with_gap = true; + break; + } + } } #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -462,28 +492,33 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", - rrdhost_hostname(st->rrdhost), rrdset_id(st), + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); } else internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS - q->points_read = points_read; - q->points_generated = points_generated; + q->points_read += points_read; + q->points_generated += points_generated; - bool finished_with_gap = false; if(last_end_time_in_buffer < before - q->st->update_every) finished_with_gap = true; return finished_with_gap; } -static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { +static struct replication_query *replication_response_prepare( + RRDSET *st, + bool requested_enable_streaming, + time_t requested_after, + time_t requested_before, + STREAM_CAPABILITIES capabilities + ) { time_t wall_clock_time = now_realtime_sec(); if(requested_after > requested_before) { @@ -509,7 +544,8 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r bool query_enable_streaming = requested_enable_streaming; time_t db_first_entry = 0, db_last_entry = 0; - rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); + rrdset_get_retention_of_tier_for_collected_chart( + st, &db_first_entry, &db_last_entry, wall_clock_time, 0); if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) { // no data requested - just enable streaming @@ -543,7 +579,7 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r db_first_entry, db_last_entry, requested_after, requested_before, requested_enable_streaming, query_after, query_before, query_enable_streaming, - wall_clock_time); + wall_clock_time, capabilities); } void replication_response_cancel_and_finalize(struct replication_query *q) { @@ -553,6 +589,7 @@ void replication_response_cancel_and_finalize(struct replication_query *q) { static bool sender_is_still_connected_for_this_request(struct replication_request *rq); bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { + NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; struct replication_request *rq = q->rq; RRDSET *st = q->st; RRDHOST *host = st->rrdhost; @@ -562,7 +599,11 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size // holding the host's buffer lock for too long BUFFER *wb = sender_start(host->sender); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2); + buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); + buffer_fast_strcat(wb, "'\n", 2); + +// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); bool locked_data_collection = q->query.locked_data_collection; q->query.locked_data_collection = false; @@ -585,23 +626,22 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size // end with first/last entries we have, and the first start time and // last end time of the data we sent - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n", - // current chart update every - (int)st->update_every + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1); + buffer_print_int64_encoded(wb, encoding, st->update_every); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, encoding, db_first_entry); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, encoding, db_last_entry); - // child first db time, child end db time - , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry + buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7); - // start streaming boolean - , enable_streaming ? "true" : "false" - - // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true) - , (unsigned long long)after, (unsigned long long)before - - // child world clock time - , (unsigned long long)wall_clock_time - ); + buffer_print_uint64_encoded(wb, encoding, after); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, encoding, before); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_fast_strcat(wb, "\n", 1); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); sender_commit(host->sender, wb); @@ -733,9 +773,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c , msg , r->last_request.after, r->last_request.before , r->child_db.first_entry_t, r->child_db.last_entry_t - , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD" + , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD" , r->local_db.first_entry_t, r->local_db.last_entry_t - , r->local_db.now + , r->local_db.wall_clock_time , r->gap.from, r->gap.to , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" @@ -928,11 +968,12 @@ struct replication_sort_entry { // the global variables for the replication thread static struct replication_thread { + ARAL *aral_rse; + SPINLOCK spinlock; struct { size_t pending; // number of requests pending in the queue - Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) // statistics size_t added; // number of requests added to the queue @@ -951,6 +992,7 @@ static struct replication_thread { } unsafe; // protected from replication_recursive_lock() struct { + Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) size_t executed; // the number of replication requests executed size_t latest_first_time; // the 'after' timestamp of the last request we executed size_t memory; // the total memory allocated by replication @@ -964,10 +1006,10 @@ static struct replication_thread { } main_thread; // access is allowed only by the main thread } replication_globals = { + .aral_rse = NULL, .spinlock = NETDATA_SPINLOCK_INITIALIZER, .unsafe = { .pending = 0, - .unique_id = 0, .added = 0, .removed = 0, @@ -984,6 +1026,7 @@ static struct replication_thread { }, }, .atomic = { + .unique_id = 0, .executed = 0, .latest_first_time = 0, .memory = 0, @@ -1047,17 +1090,15 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) { // ---------------------------------------------------------------------------- // replication sort entry management -static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) { - fatal_when_replication_is_not_locked_for_me(); - - struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); +static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { + struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse); __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); // copy the request rse->rq = rq; - rse->unique_id = ++replication_globals.unsafe.unique_id; + rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST); // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; @@ -1068,26 +1109,30 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc } static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { - freez(rse); + aral_freez(replication_globals.aral_rse, rse); __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); } static void replication_sort_entry_add(struct replication_request *rq) { - replication_recursive_lock(); - if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; rq->not_indexed_preprocessing = false; + replication_recursive_lock(); replication_globals.unsafe.pending_no_room++; replication_recursive_unlock(); return; } - if(rq->not_indexed_buffer_full) - replication_globals.unsafe.pending_no_room--; + // cache this, because it will be changed + bool decrement_no_room = rq->not_indexed_buffer_full; + + struct replication_sort_entry *rse = replication_sort_entry_create(rq); + + replication_recursive_lock(); - struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq); + if(decrement_no_room) + replication_globals.unsafe.pending_no_room--; // if(rq->after < (time_t)replication_globals.protected.queue.after && // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && @@ -1371,7 +1416,12 @@ static bool replication_execute_request(struct replication_request *rq, bool wor if(likely(workers)) worker_is_busy(WORKER_JOB_PREPARE_QUERY); - rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before); + rq->q = replication_response_prepare( + rq->st, + rq->start_streaming, + rq->after, + rq->before, + rq->sender->capabilities); } if(likely(workers)) @@ -1580,67 +1630,85 @@ static void replication_initialize_workers(bool master) { #define REQUEST_QUEUE_EMPTY (-1) #define REQUEST_CHART_NOT_FOUND (-2) -static int replication_execute_next_pending_request(bool cancel) { - static __thread int max_requests_ahead = 0; - static __thread struct replication_request *rqs = NULL; - static __thread int rqs_last_executed = 0, rqs_last_prepared = 0; - static __thread size_t queue_rounds = 0; (void)queue_rounds; +static __thread struct replication_thread_pipeline { + int max_requests_ahead; + struct replication_request *rqs; + int rqs_last_executed, rqs_last_prepared; + size_t queue_rounds; +} rtp = { + .max_requests_ahead = 0, + .rqs = NULL, + .rqs_last_executed = 0, + .rqs_last_prepared = 0, + .queue_rounds = 0, +}; + +static void replication_pipeline_cancel_and_cleanup(void) { + if(!rtp.rqs) + return; + struct replication_request *rq; + size_t cancelled = 0; + + do { + if (++rtp.rqs_last_executed >= rtp.max_requests_ahead) + rtp.rqs_last_executed = 0; - if(unlikely(cancel)) { - if(rqs) { - size_t cancelled = 0; - do { - if (++rqs_last_executed >= max_requests_ahead) - rqs_last_executed = 0; + rq = &rtp.rqs[rtp.rqs_last_executed]; - rq = &rqs[rqs_last_executed]; + if (rq->q) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq"); - if (rq->q) { - internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); - internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq"); + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + cancelled++; + } - replication_response_cancel_and_finalize(rq->q); - rq->q = NULL; - cancelled++; - } + rq->executed = true; + rq->found = false; - rq->executed = true; - rq->found = false; + } while (rtp.rqs_last_executed != rtp.rqs_last_prepared); - } while (rqs_last_executed != rqs_last_prepared); + internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled); - internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled); - } - return REQUEST_QUEUE_EMPTY; - } + freez(rtp.rqs); + rtp.rqs = NULL; + rtp.max_requests_ahead = 0; + rtp.rqs_last_executed = 0; + rtp.rqs_last_prepared = 0; + rtp.queue_rounds = 0; +} + +static int replication_pipeline_execute_next(void) { + struct replication_request *rq; - if(unlikely(!rqs)) { - max_requests_ahead = get_netdata_cpus() / 2; + if(unlikely(!rtp.rqs)) { + rtp.max_requests_ahead = (int)get_netdata_cpus() / 2; - if(max_requests_ahead > libuv_worker_threads * 2) - max_requests_ahead = libuv_worker_threads * 2; + if(rtp.max_requests_ahead > libuv_worker_threads * 2) + rtp.max_requests_ahead = libuv_worker_threads * 2; - if(max_requests_ahead < 2) - max_requests_ahead = 2; + if(rtp.max_requests_ahead < 2) + rtp.max_requests_ahead = 2; - rqs = callocz(max_requests_ahead, sizeof(struct replication_request)); - __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); + rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request)); + __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); } // fill the queue do { - if(++rqs_last_prepared >= max_requests_ahead) { - rqs_last_prepared = 0; - queue_rounds++; + if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) { + rtp.rqs_last_prepared = 0; + rtp.queue_rounds++; } - internal_fatal(rqs[rqs_last_prepared].q, + internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q, "REPLAY FATAL: slot is used by query that has not been executed!"); worker_is_busy(WORKER_JOB_FIND_NEXT); - rqs[rqs_last_prepared] = replication_request_get_first_available(); - rq = &rqs[rqs_last_prepared]; + rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available(); + rq = &rtp.rqs[rtp.rqs_last_prepared]; if(rq->found) { if (!rq->st) { @@ -1650,20 +1718,25 @@ static int replication_execute_next_pending_request(bool cancel) { if (rq->st && !rq->q) { worker_is_busy(WORKER_JOB_PREPARE_QUERY); - rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before); + rq->q = replication_response_prepare( + rq->st, + rq->start_streaming, + rq->after, + rq->before, + rq->sender->capabilities); } rq->executed = false; } - } while(rq->found && rqs_last_prepared != rqs_last_executed); + } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed); // pick the first usable do { - if (++rqs_last_executed >= max_requests_ahead) - rqs_last_executed = 0; + if (++rtp.rqs_last_executed >= rtp.max_requests_ahead) + rtp.rqs_last_executed = 0; - rq = &rqs[rqs_last_executed]; + rq = &rtp.rqs[rtp.rqs_last_executed]; if(rq->found) { internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); @@ -1696,7 +1769,7 @@ static int replication_execute_next_pending_request(bool cancel) { else internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!"); - } while(!rq->found && rqs_last_executed != rqs_last_prepared); + } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared); if(unlikely(!rq->found)) { worker_is_idle(); @@ -1720,7 +1793,7 @@ static int replication_execute_next_pending_request(bool cancel) { } static void replication_worker_cleanup(void *ptr __maybe_unused) { - replication_execute_next_pending_request(true); + replication_pipeline_cancel_and_cleanup(); worker_unregister(); } @@ -1730,7 +1803,7 @@ static void *replication_worker_thread(void *ptr) { netdata_thread_cleanup_push(replication_worker_cleanup, ptr); while(service_running(SERVICE_REPLICATION)) { - if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { + if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); @@ -1746,7 +1819,7 @@ static void replication_main_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - replication_execute_next_pending_request(true); + replication_pipeline_cancel_and_cleanup(); int threads = (int)replication_globals.main_thread.threads; for(int i = 0; i < threads ;i++) { @@ -1758,12 +1831,21 @@ static void replication_main_cleanup(void *ptr) { replication_globals.main_thread.threads_ptrs = NULL; __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); + aral_destroy(replication_globals.aral_rse); + replication_globals.aral_rse = NULL; + // custom code worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } +void replication_initialize(void) { + replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry), + 0, 65536, aral_by_size_statistics(), + NULL, NULL, false, false); +} + void *replication_thread_main(void *ptr __maybe_unused) { replication_initialize_workers(true); @@ -1863,7 +1945,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_is_idle(); } - if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { + if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { worker_is_busy(WORKER_JOB_WAIT); replication_recursive_lock(); |