diff options
Diffstat (limited to 'src/streaming/replication.c')
-rw-r--r-- | src/streaming/replication.c | 2032 |
1 files changed, 2032 insertions, 0 deletions
diff --git a/src/streaming/replication.c b/src/streaming/replication.c new file mode 100644 index 000000000..1f5aeb34c --- /dev/null +++ b/src/streaming/replication.c @@ -0,0 +1,2032 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "replication.h" +#include "Judy.h" + +#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL + +#define WORKER_JOB_FIND_NEXT 1 +#define WORKER_JOB_QUERYING 2 +#define WORKER_JOB_DELETE_ENTRY 3 +#define WORKER_JOB_FIND_CHART 4 +#define WORKER_JOB_PREPARE_QUERY 5 +#define WORKER_JOB_CHECK_CONSISTENCY 6 +#define WORKER_JOB_BUFFER_COMMIT 7 +#define WORKER_JOB_CLEANUP 8 +#define WORKER_JOB_WAIT 9 + +// master thread worker jobs +#define WORKER_JOB_STATISTICS 10 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 14 +#define WORKER_JOB_CUSTOM_METRIC_DONE 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17 + +#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 +#define SECONDS_TO_RESET_POINT_IN_TIME 10 + +static struct replication_query_statistics replication_queries = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .queries_started = 0, + .queries_finished = 0, + .points_read = 0, + .points_generated = 0, +}; + +struct replication_query_statistics replication_get_query_statistics(void) { + spinlock_lock(&replication_queries.spinlock); + struct replication_query_statistics ret = replication_queries; + spinlock_unlock(&replication_queries.spinlock); + return ret; +} + +size_t replication_buffers_allocated = 0; + +size_t replication_allocated_buffers(void) { + return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED); +} + +// ---------------------------------------------------------------------------- +// sending replication replies + +struct replication_dimension { + STORAGE_POINT sp; + struct storage_engine_query_handle handle; + bool enabled; + bool skip; + + DICTIONARY *dict; + const DICTIONARY_ITEM *rda; + RRDDIM *rd; +}; + +struct replication_query { + RRDSET *st; + + struct { + time_t first_entry_t; + time_t last_entry_t; + } db; + + struct { // what the parent requested + time_t after; + time_t before; + bool enable_streaming; + } request; + + struct { // what the child will do + time_t after; + time_t before; + bool enable_streaming; + + bool locked_data_collection; + bool execute; + bool interrupted; + STREAM_CAPABILITIES capabilities; + } query; + + time_t wall_clock_time; + + size_t points_read; + size_t points_generated; + + STORAGE_ENGINE_BACKEND backend; + struct replication_request *rq; + + size_t dimensions; + struct replication_dimension data[]; +}; + +static struct replication_query *replication_query_prepare( + RRDSET *st, + time_t db_first_entry, + time_t db_last_entry, + time_t requested_after, + time_t requested_before, + bool requested_enable_streaming, + time_t query_after, + time_t query_before, + bool query_enable_streaming, + time_t wall_clock_time, + STREAM_CAPABILITIES capabilities +) { + size_t dimensions = rrdset_number_of_dimensions(st); + struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); + + q->dimensions = dimensions; + q->st = st; + + q->db.first_entry_t = db_first_entry; + q->db.last_entry_t = db_last_entry; + + q->request.after = requested_after, + q->request.before = requested_before, + q->request.enable_streaming = requested_enable_streaming, + + q->query.after = query_after; + q->query.before = query_before; + q->query.enable_streaming = query_enable_streaming; + q->query.capabilities = capabilities; + + q->wall_clock_time = wall_clock_time; + + if (!q->dimensions || !q->query.after || !q->query.before) { + q->query.execute = false; + q->dimensions = 0; + return q; + } + + if(q->query.enable_streaming) { + spinlock_lock(&st->data_collection_lock); + q->query.locked_data_collection = true; + + if (st->last_updated.tv_sec > q->query.before) { +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s' " + "has start_streaming = true, " + "adjusting replication before timestamp from %llu to %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long) q->query.before, + (unsigned long long) st->last_updated.tv_sec + ); +#endif + q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time); + } + } + + q->backend = st->rrdhost->db[0].eng->seb; + + // prepare our array of dimensions + size_t count = 0; + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd))) + continue; + + if (unlikely(rd_dfe.counter >= q->dimensions)) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + break; + } + + struct replication_dimension *d = &q->data[rd_dfe.counter]; + + d->dict = rd_dfe.dict; + d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + d->rd = rd; + + storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, q->query.after, q->query.before, + q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); + d->enabled = true; + d->skip = false; + count++; + } + rrddim_foreach_done(rd); + + if(!count) { + // no data for this chart + + q->query.execute = false; + + if(q->query.locked_data_collection) { + spinlock_unlock(&st->data_collection_lock); + q->query.locked_data_collection = false; + } + + } + else { + // we have data for this chart + + q->query.execute = true; + } + + return q; +} + +static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) { + bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + RRDDIM *rd; + rrddim_foreach_read(rd, st){ + if (!rrddim_check_upstream_exposed(rd)) continue; + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "' ", 2); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + + (usec_t) rd->collector.last_collected_time.tv_usec); + buffer_fast_strcat(wb, " ", 1); + buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); + buffer_fast_strcat(wb, " ", 1); + buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value); + buffer_fast_strcat(wb, " ", 1); + buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value); + buffer_fast_strcat(wb, "\n", 1); + } + rrddim_foreach_done(rd); + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec); + buffer_fast_strcat(wb, "\n", 1); +} + +static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) { + size_t dimensions = q->dimensions; + + if(wb && q->query.enable_streaming) + replication_send_chart_collection_state(wb, q->st, q->query.capabilities); + + if(q->query.locked_data_collection) { + spinlock_unlock(&q->st->data_collection_lock); + q->query.locked_data_collection = false; + } + + // release all the dictionary items acquired + // finalize the queries + size_t queries = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; + + storage_engine_query_finalize(&d->handle); + + dictionary_acquired_item_release(d->dict, d->rda); + + // update global statistics + queries++; + } + + if(executed) { + spinlock_lock(&replication_queries.spinlock); + replication_queries.queries_started += queries; + replication_queries.queries_finished += queries; + replication_queries.points_read += q->points_read; + replication_queries.points_generated += q->points_generated; + + if(q->st && q->st->rrdhost->sender) { + struct sender_state *s = q->st->rrdhost->sender; + s->replication.latest_completed_before_t = q->query.before; + } + + spinlock_unlock(&replication_queries.spinlock); + } + + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); + freez(q); +} + +static void replication_query_align_to_optimal_before(struct replication_query *q) { + if(!q->query.execute || q->query.enable_streaming) + return; + + size_t dimensions = q->dimensions; + time_t expanded_before = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled)) continue; + + time_t new_before = storage_engine_align_to_optimal_before(&d->handle); + if (!expanded_before || new_before < expanded_before) + expanded_before = new_before; + } + + if(expanded_before > q->query.before && // it is later than the original + (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page) + expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time + expanded_before < q->wall_clock_time) // it is not later than the wall clock time + q->query.before = expanded_before; +} + +static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { + replication_query_align_to_optimal_before(q); + + bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + time_t after = q->query.after; + time_t before = q->query.before; + size_t dimensions = q->dimensions; + time_t wall_clock_time = q->wall_clock_time; + + bool finished_with_gap = false; + size_t points_read = 0, points_generated = 0; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + time_t actual_after = 0, actual_before = 0; +#endif + + time_t now = after + 1; + time_t last_end_time_in_buffer = 0; + while(now <= before) { + time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0; + for (size_t i = 0; i < dimensions ;i++) { + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled || d->skip)) continue; + + // fetch the first valid point for the dimension + int max_skip = 1000; + while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) { + d->sp = storage_engine_query_next_metric(&d->handle); + points_read++; + } + + if(max_skip <= 0) { + d->skip = true; + + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query " + "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), + (unsigned long long) now); + + continue; + } + + if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s)) + // this dimension does not provide any data + continue; + + time_t update_every = d->sp.end_time_s - d->sp.start_time_s; + if(unlikely(!update_every)) + update_every = q->st->update_every; + + if(unlikely(!min_update_every)) + min_update_every = update_every; + + if(unlikely(!min_start_time)) + min_start_time = d->sp.start_time_s; + + if(unlikely(!min_end_time)) + min_end_time = d->sp.end_time_s; + + min_update_every = MIN(min_update_every, update_every); + max_update_every = MAX(max_update_every, update_every); + + min_start_time = MIN(min_start_time, d->sp.start_time_s); + max_start_time = MAX(max_start_time, d->sp.start_time_s); + + min_end_time = MIN(min_end_time, d->sp.end_time_s); + max_end_time = MAX(max_end_time, d->sp.end_time_s); + } + + if (unlikely(min_update_every != max_update_every || + min_start_time != max_start_time)) { + + time_t fix_min_start_time; + if(last_end_time_in_buffer && + last_end_time_in_buffer >= min_start_time && + last_end_time_in_buffer <= max_start_time) { + fix_min_start_time = last_end_time_in_buffer; + } + else + fix_min_start_time = min_end_time - min_update_every; + +#ifdef NETDATA_INTERNAL_CHECKS + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "REPLAY WARNING: 'host:%s/chart:%s' " + "misaligned dimensions, " + "update every (min: %ld, max: %ld), " + "start time (min: %ld, max: %ld), " + "end time (min %ld, max %ld), " + "now %ld, last end time sent %ld, " + "min start time is fixed to %ld", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), + min_update_every, max_update_every, + min_start_time, max_start_time, + min_end_time, max_end_time, + now, last_end_time_in_buffer, + fix_min_start_time + ); +#endif + + min_start_time = fix_min_start_time; + } + + if(likely(min_start_time <= now && min_end_time >= now)) { + // we have a valid point + + if (unlikely(min_end_time == min_start_time)) + min_start_time = min_end_time - q->st->update_every; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if (unlikely(!actual_after)) + actual_after = min_end_time; + + actual_before = min_end_time; +#endif + + if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) { + q->query.before = last_end_time_in_buffer; + q->query.enable_streaming = false; + + internal_error(true, "REPLICATION: current buffer size %zu is more than the " + "max message size %zu for chart '%s' of host '%s'. " + "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.", + buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), + q->request.after, q->request.before, q->request.enable_streaming?"true":"false", + q->query.after, q->query.before, q->query.enable_streaming?"true":"false"); + + q->query.interrupted = true; + + break; + } + last_end_time_in_buffer = min_end_time; + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '' ", 4); + buffer_print_uint64_encoded(wb, integer_encoding, min_start_time); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, min_end_time); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); + buffer_fast_strcat(wb, "\n", 1); + + // output the replay values for this time + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; + + if (likely( d->sp.start_time_s <= min_end_time && + d->sp.end_time_s >= min_end_time && + !storage_point_is_unset(d->sp) && + !storage_point_is_gap(d->sp))) { + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " \"", 2); + buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id)); + buffer_fast_strcat(wb, "\" ", 2); + buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum); + buffer_fast_strcat(wb, " ", 1); + buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED); + buffer_fast_strcat(wb, "\n", 1); + + points_generated++; + } + } + + now = min_end_time + 1; + } + else if(unlikely(min_end_time < now)) + // the query does not progress + break; + else { + // we have gap - all points are in the future + now = min_start_time; + + if(min_start_time > before && !points_generated) { + before = q->query.before = min_start_time - 1; + finished_with_gap = true; + break; + } + } + } + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if(actual_after) { + char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1]; + log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); + log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), + (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, + (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); + } + else + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), + (unsigned long long)after, (unsigned long long)before); +#endif // NETDATA_LOG_REPLICATION_REQUESTS + + q->points_read += points_read; + q->points_generated += points_generated; + + if(last_end_time_in_buffer < before - q->st->update_every) + finished_with_gap = true; + + return finished_with_gap; +} + +static struct replication_query *replication_response_prepare( + RRDSET *st, + bool requested_enable_streaming, + time_t requested_after, + time_t requested_before, + STREAM_CAPABILITIES capabilities + ) { + time_t wall_clock_time = now_realtime_sec(); + + if(requested_after > requested_before) { + // flip them + time_t t = requested_before; + requested_before = requested_after; + requested_after = t; + } + + if(requested_after > wall_clock_time) { + requested_after = 0; + requested_before = 0; + requested_enable_streaming = true; + } + + if(requested_before > wall_clock_time) { + requested_before = wall_clock_time; + requested_enable_streaming = true; + } + + time_t query_after = requested_after; + time_t query_before = requested_before; + bool query_enable_streaming = requested_enable_streaming; + + time_t db_first_entry = 0, db_last_entry = 0; + rrdset_get_retention_of_tier_for_collected_chart( + st, &db_first_entry, &db_last_entry, wall_clock_time, 0); + + if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) { + // no data requested - just enable streaming + ; + } + else { + if (query_after < db_first_entry) + query_after = db_first_entry; + + if (query_before > db_last_entry) + query_before = db_last_entry; + + // if the parent asked us to start streaming, then fill the rest with the data that we have + if (requested_enable_streaming) + query_before = db_last_entry; + + if (query_after > query_before) { + time_t tmp = query_before; + query_before = query_after; + query_after = tmp; + } + + query_enable_streaming = (requested_enable_streaming || + query_before == db_last_entry || + !requested_after || + !requested_before) ? true : false; + } + + return replication_query_prepare( + st, + db_first_entry, db_last_entry, + requested_after, requested_before, requested_enable_streaming, + query_after, query_before, query_enable_streaming, + wall_clock_time, capabilities); +} + +void replication_response_cancel_and_finalize(struct replication_query *q) { + replication_query_finalize(NULL, q, false); +} + +static bool sender_is_still_connected_for_this_request(struct replication_request *rq); + +bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { + bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + struct replication_request *rq = q->rq; + RRDSET *st = q->st; + RRDHOST *host = st->rrdhost; + + // we might want to optimize this by filling a temporary buffer + // and copying the result to the host's buffer in order to avoid + // holding the host's buffer lock for too long + BUFFER *wb = sender_start(host->sender); + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); + buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); + buffer_fast_strcat(wb, "'\n", 2); + + bool locked_data_collection = q->query.locked_data_collection; + q->query.locked_data_collection = false; + + bool finished_with_gap = false; + if(q->query.execute) + finished_with_gap = replication_query_execute(wb, q, max_msg_size); + + time_t after = q->request.after; + time_t before = q->query.before; + bool enable_streaming = q->query.enable_streaming; + + replication_query_finalize(wb, q, q->query.execute); + q = NULL; // IMPORTANT: q is invalid now + + // get a fresh retention to send to the parent + time_t wall_clock_time = now_realtime_sec(); + time_t db_first_entry, db_last_entry; + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); + + // end with first/last entries we have, and the first start time and + // last end time of the data we sent + + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1); + buffer_print_int64_encoded(wb, integer_encoding, st->update_every); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry); + + buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7); + + buffer_print_uint64_encoded(wb, integer_encoding, after); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, before); + buffer_fast_strcat(wb, " ", 1); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); + buffer_fast_strcat(wb, "\n", 1); + + worker_is_busy(WORKER_JOB_BUFFER_COMMIT); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION); + worker_is_busy(WORKER_JOB_CLEANUP); + + if(enable_streaming) { + if(sender_is_still_connected_for_this_request(rq)) { + // enable normal streaming if we have to + // but only if the sender buffer has not been flushed since we started + + if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + + if(!finished_with_gap) + st->rrdpush.sender.resync_time_s = 0; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif + } + else + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + } + } + + if(locked_data_collection) + spinlock_unlock(&st->data_collection_lock); + + return enable_streaming; +} + +// ---------------------------------------------------------------------------- +// sending replication requests + +struct replication_request_details { + struct { + send_command callback; + void *data; + } caller; + + RRDHOST *host; + RRDSET *st; + + struct { + time_t first_entry_t; // the first entry time the child has + time_t last_entry_t; // the last entry time the child has + time_t wall_clock_time; // the current time of the child + bool fixed_last_entry; // when set we set the last entry to wall clock time + } child_db; + + struct { + time_t first_entry_t; // the first entry time we have + time_t last_entry_t; // the last entry time we have + time_t wall_clock_time; // the current local world clock time + } local_db; + + struct { + time_t from; // the starting time of the entire gap we have + time_t to; // the ending time of the entire gap we have + } gap; + + struct { + time_t after; // the start time we requested previously from this child + time_t before; // the end time we requested previously from this child + } last_request; + + struct { + time_t after; // the start time of this replication request - the child will add 1 second + time_t before; // the end time of this replication request + bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before' + } wanted; +}; + +static void replicate_log_request(struct replication_request_details *r, const char *msg) { +#ifdef NETDATA_INTERNAL_CHECKS + internal_error(true, +#else + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, +#endif + "REPLAY ERROR: 'host:%s/chart:%s' child sent: " + "db from %ld to %ld%s, wall clock time %ld, " + "last request from %ld to %ld, " + "issue: %s - " + "sending replication request from %ld to %ld, start streaming %s", + rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st), + r->child_db.first_entry_t, + r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "", + r->child_db.wall_clock_time, + r->last_request.after, + r->last_request.before, + msg, + r->wanted.after, + r->wanted.before, + r->wanted.start_streaming ? "true" : "false"); +} + +static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) { + RRDSET *st = r->st; + + if(log) + replicate_log_request(r, msg); + + if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t)) + st->rrdhost->receiver->replication_first_time_t = r->wanted.after; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.log_next_data_collection = true; + + char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = ""; + + if(r->wanted.after) + log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after); + + if(r->wanted.before) + log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before); + + internal_error(true, + "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " + "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s" + , rrdhost_hostname(r->host), rrdset_id(r->st) + , r->wanted.after, wanted_after_buf + , r->wanted.before, wanted_before_buf + , r->wanted.start_streaming ? "YES" : "NO" + , msg + , r->last_request.after, r->last_request.before + , r->child_db.first_entry_t, r->child_db.last_entry_t + , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD" + , r->local_db.first_entry_t, r->local_db.last_entry_t + , r->local_db.wall_clock_time + , r->gap.from, r->gap.to + , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" + , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" + ); + + st->replay.start_streaming = r->wanted.start_streaming; + st->replay.after = r->wanted.after; + st->replay.before = r->wanted.before; +#endif // NETDATA_LOG_REPLICATION_REQUESTS + + char buffer[2048 + 1]; + snprintfz(buffer, sizeof(buffer) - 1, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", + rrdset_id(st), r->wanted.start_streaming ? "true" : "false", + (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); + + ssize_t ret = r->caller.callback(buffer, r->caller.data); + if (ret < 0) { + netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)", + rrdhost_hostname(r->host), rrdset_id(r->st), ret); + return false; + } + + return true; +} + +bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, + time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time, + time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) +{ + struct replication_request_details r = { + .caller = { + .callback = callback, + .data = callback_data, + }, + + .host = host, + .st = st, + + .child_db = { + .first_entry_t = child_first_entry, + .last_entry_t = child_last_entry, + .wall_clock_time = child_wall_clock_time, + .fixed_last_entry = false, + }, + + .local_db = { + .first_entry_t = 0, + .last_entry_t = 0, + .wall_clock_time = now_realtime_sec(), + }, + + .last_request = { + .after = prev_first_entry_wanted, + .before = prev_last_entry_wanted, + }, + + .wanted = { + .after = 0, + .before = 0, + .start_streaming = true, + }, + }; + + if(r.child_db.last_entry_t > r.child_db.wall_clock_time) { + replicate_log_request(&r, "child's db last entry > child's wall clock time"); + r.child_db.last_entry_t = r.child_db.wall_clock_time; + r.child_db.fixed_last_entry = true; + } + + rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0); + + // let's find the GAP we have + if(!r.last_request.after || !r.last_request.before) { + // there is no previous request + + if(r.local_db.last_entry_t) + // we have some data, let's continue from the last point we have + r.gap.from = r.local_db.last_entry_t; + else + // we don't have any data, the gap is the max timeframe we are allowed to replicate + r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate; + + } + else { + // we had sent a request - let's continue at the point we left it + // for this we don't take into account the actual data in our db + // because the child may also have gaps, and we need to get over it + r.gap.from = r.last_request.before; + } + + // we want all the data up to now + r.gap.to = r.local_db.wall_clock_time; + + // The gap is now r.gap.from -> r.gap.to + + if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) + return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false); + + if (unlikely(!rrdset_number_of_dimensions(st))) + return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false); + + if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false); + + if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0)) + return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true); + + if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time)) + return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true); + + if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true); + + if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false); + + // let's find what the child can provide to fill that gap + + if(r.child_db.first_entry_t > r.gap.from) + // the child does not have all the data - let's get what it has + r.wanted.after = r.child_db.first_entry_t; + else + // ok, the child can fill the entire gap we have + r.wanted.after = r.gap.from; + + if(r.gap.to - r.wanted.after > host->rrdpush_replication_step) + // the duration is too big for one request - let's take the first step + r.wanted.before = r.wanted.after + host->rrdpush_replication_step; + else + // wow, we can do it in one request + r.wanted.before = r.gap.to; + + // don't ask from the child more than it has + if(r.wanted.before > r.child_db.last_entry_t) + r.wanted.before = r.child_db.last_entry_t; + + if(r.wanted.after > r.wanted.before) { + r.wanted.after = 0; + r.wanted.before = 0; + r.wanted.start_streaming = true; + return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true); + } + + // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child + r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step || + r.wanted.before >= r.child_db.last_entry_t || + r.wanted.before >= r.child_db.wall_clock_time || + r.wanted.before >= r.local_db.wall_clock_time); + + // the wanted timeframe is now r.wanted.after -> r.wanted.before + // send it + return send_replay_chart_cmd(&r, "OK", false); +} + +// ---------------------------------------------------------------------------- +// replication thread + +// replication request in sender DICTIONARY +// used for de-duplicating the requests +struct replication_request { + struct sender_state *sender; // the sender we should put the reply at + STRING *chart_id; // the chart of the request + time_t after; // the start time of the query (maybe zero) key for sorting (JudyL) + time_t before; // the end time of the query (maybe zero) + + usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request + Word_t unique_id; // auto-increment, later requests have bigger + + bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming + bool indexed_in_judy; // true when the request is indexed in judy + bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full + bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing + + // prepare ahead members - preprocessing + bool found; // used as a result boolean for the find call + bool executed; // used to detect if we have skipped requests while preprocessing + RRDSET *st; // caching of the chart during preprocessing + struct replication_query *q; // the preprocessing query initialization +}; + +// replication sort entry in JudyL array +// used for sorting all requests, across all nodes +struct replication_sort_entry { + struct replication_request *rq; + + size_t unique_id; // used as a key to identify the sort entry - we never access its contents +}; + +#define MAX_REPLICATION_THREADS 20 // + 1 for the main thread + +// the global variables for the replication thread +static struct replication_thread { + ARAL *aral_rse; + + SPINLOCK spinlock; + + struct { + size_t pending; // number of requests pending in the queue + + // statistics + size_t added; // number of requests added to the queue + size_t removed; // number of requests removed from the queue + size_t pending_no_room; // number of requests skipped, because the sender has no room for responses + size_t senders_full; // number of times a sender reset our last position in the queue + size_t sender_resets; // number of times a sender reset our last position in the queue + time_t first_time_t; // the minimum 'after' we encountered + + struct { + Word_t after; + Word_t unique_id; + Pvoid_t JudyL_array; + } queue; + + } unsafe; // protected from replication_recursive_lock() + + struct { + Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) + size_t executed; // the number of replication requests executed + size_t latest_first_time; // the 'after' timestamp of the last request we executed + size_t memory; // the total memory allocated by replication + } atomic; // access should be with atomic operations + + struct { + size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time + + ND_THREAD **threads_ptrs; + size_t threads; + } main_thread; // access is allowed only by the main thread + +} replication_globals = { + .aral_rse = NULL, + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .unsafe = { + .pending = 0, + + .added = 0, + .removed = 0, + .pending_no_room = 0, + .sender_resets = 0, + .senders_full = 0, + + .first_time_t = 0, + + .queue = { + .after = 0, + .unique_id = 0, + .JudyL_array = NULL, + }, + }, + .atomic = { + .unique_id = 0, + .executed = 0, + .latest_first_time = 0, + .memory = 0, + }, + .main_thread = { + .last_executed = 0, + .threads = 0, + .threads_ptrs = NULL, + }, +}; + +size_t replication_allocated_memory(void) { + return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED); +} + +#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) +#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) + +static inline bool replication_recursive_lock_mode(char mode) { + static __thread int recursions = 0; + + if(mode == 'L') { // (L)ock + if(++recursions == 1) + spinlock_lock(&replication_globals.spinlock); + } + else if(mode == 'U') { // (U)nlock + if(--recursions == 0) + spinlock_unlock(&replication_globals.spinlock); + } + else if(mode == 'C') { // (C)heck + if(recursions > 0) + return true; + else + return false; + } + else + fatal("REPLICATION: unknown lock mode '%c'", mode); + +#ifdef NETDATA_INTERNAL_CHECKS + if(recursions < 0) + fatal("REPLICATION: recursions is %d", recursions); +#endif + + return true; +} + +#define replication_recursive_lock() replication_recursive_lock_mode('L') +#define replication_recursive_unlock() replication_recursive_lock_mode('U') +#define fatal_when_replication_is_not_locked_for_me() do { \ + if(!replication_recursive_lock_mode('C')) \ + fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \ +} while(0) + +void replication_set_next_point_in_time(time_t after, size_t unique_id) { + replication_recursive_lock(); + replication_globals.unsafe.queue.after = after; + replication_globals.unsafe.queue.unique_id = unique_id; + replication_recursive_unlock(); +} + +// ---------------------------------------------------------------------------- +// replication sort entry management + +static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { + struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse); + __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); + + rrdpush_sender_pending_replication_requests_plus_one(rq->sender); + + // copy the request + rse->rq = rq; + rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST); + + // save the unique id into the request, to be able to delete it later + rq->unique_id = rse->unique_id; + rq->indexed_in_judy = false; + rq->not_indexed_buffer_full = false; + rq->not_indexed_preprocessing = false; + return rse; +} + +static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { + aral_freez(replication_globals.aral_rse, rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); +} + +static void replication_sort_entry_add(struct replication_request *rq) { + if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) { + rq->indexed_in_judy = false; + rq->not_indexed_buffer_full = true; + rq->not_indexed_preprocessing = false; + replication_recursive_lock(); + replication_globals.unsafe.pending_no_room++; + replication_recursive_unlock(); + return; + } + + // cache this, because it will be changed + bool decrement_no_room = rq->not_indexed_buffer_full; + + struct replication_sort_entry *rse = replication_sort_entry_create(rq); + + replication_recursive_lock(); + + if(decrement_no_room) + replication_globals.unsafe.pending_no_room--; + +// if(rq->after < (time_t)replication_globals.protected.queue.after && +// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && +// !replication_globals.protected.skipped_no_room_since_last_reset) { +// +// // make it find this request first +// replication_set_next_point_in_time(rq->after, rq->unique_id); +// } + + replication_globals.unsafe.added++; + replication_globals.unsafe.pending++; + + Pvoid_t *inner_judy_ptr; + + // find the outer judy entry, using after as key + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR)) + fatal("REPLICATION: corrupted outer judyL"); + + // add it to the inner judy, using unique_id as key + size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr); + Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr); + if(unlikely(!item || item == PJERR)) + fatal("REPLICATION: corrupted inner judyL"); + + *item = rse; + rq->indexed_in_judy = true; + rq->not_indexed_buffer_full = false; + rq->not_indexed_preprocessing = false; + + if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) + replication_globals.unsafe.first_time_t = rq->after; + + replication_recursive_unlock(); + + __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED); +} + +static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) { + fatal_when_replication_is_not_locked_for_me(); + + bool inner_judy_deleted = false; + + replication_globals.unsafe.removed++; + replication_globals.unsafe.pending--; + + rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); + + rse->rq->indexed_in_judy = false; + rse->rq->not_indexed_preprocessing = preprocessing; + + size_t memory_saved = 0; + + // delete it from the inner judy + size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); + JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); + memory_saved = mem_before_inner_judyl - mem_after_inner_judyl; + + // if no items left, delete it from the outer judy + if(**inner_judy_ppptr == NULL) { + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + memory_saved += mem_before_outer_judyl - mem_after_outer_judyl; + inner_judy_deleted = true; + } + + // free memory + replication_sort_entry_destroy(rse); + + __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED); + + return inner_judy_deleted; +} + +static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) { + Pvoid_t *inner_judy_pptr; + struct replication_sort_entry *rse_to_delete = NULL; + + replication_recursive_lock(); + if(rq->indexed_in_judy) { + + inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0); + if (inner_judy_pptr) { + Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); + if (our_item_pptr) { + rse_to_delete = *our_item_pptr; + replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false); + + if(buffer_full) { + replication_globals.unsafe.pending_no_room++; + rq->not_indexed_buffer_full = true; + } + } + } + + if (!rse_to_delete) + fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.", + rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after); + + } + + replication_recursive_unlock(); +} + +static struct replication_request replication_request_get_first_available() { + Pvoid_t *inner_judy_pptr; + + replication_recursive_lock(); + + struct replication_request rq_to_return = (struct replication_request){ .found = false }; + + if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) { + replication_globals.unsafe.queue.after = 0; + replication_globals.unsafe.queue.unique_id = 0; + } + + Word_t started_after = replication_globals.unsafe.queue.after; + + size_t round = 0; + while(!rq_to_return.found) { + round++; + + if(round > 2) + break; + + if(round == 2) { + if(started_after == 0) + break; + + replication_globals.unsafe.queue.after = 0; + replication_globals.unsafe.queue.unique_id = 0; + } + + bool find_same_after = true; + while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) { + Pvoid_t *our_item_pptr; + + if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after)) + break; + + while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) { + struct replication_sort_entry *rse = *our_item_pptr; + struct replication_request *rq = rse->rq; + + // copy the request to return it + rq_to_return = *rq; + rq_to_return.chart_id = string_dup(rq_to_return.chart_id); + + // set the return result to found + rq_to_return.found = true; + + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true)) + // we removed the item from the outer JudyL + break; + } + + // prepare for the next iteration on the outer loop + replication_globals.unsafe.queue.unique_id = 0; + } + } + + replication_recursive_unlock(); + return rq_to_return; +} + +// ---------------------------------------------------------------------------- +// replication request management + +static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) { + struct sender_state *s = sender_state; (void)s; + struct replication_request *rq = value; + + // IMPORTANT: + // We use the react instead of the insert callback + // because we want the item to be atomically visible + // to our replication thread, immediately after. + + // If we put this at the insert callback, the item is not guaranteed + // to be atomically visible to others, so the replication thread + // may see the replication sort entry, but fail to find the dictionary item + // related to it. + + replication_sort_entry_add(rq); + + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_plus_one(s); +} + +static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) { + struct sender_state *s = sender_state; (void)s; + struct replication_request *rq = old_value; (void)rq; + struct replication_request *rq_new = new_value; + + replication_recursive_lock(); + + if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) { + // we can replace this command + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + + rq->after = rq_new->after; + rq->before = rq_new->before; + rq->start_streaming = rq_new->start_streaming; + } + else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) { + replication_sort_entry_add(rq); + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + } + else { + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), + dictionary_acquired_item_name(item), + (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false"); + } + + replication_recursive_unlock(); + + string_freez(rq_new->chart_id); + return false; +} + +static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) { + struct replication_request *rq = value; + + // this request is about a unique chart for this sender + rrdpush_sender_replicating_charts_minus_one(rq->sender); + + if(rq->indexed_in_judy) + replication_sort_entry_del(rq, false); + + else if(rq->not_indexed_buffer_full) { + replication_recursive_lock(); + replication_globals.unsafe.pending_no_room--; + replication_recursive_unlock(); + } + + string_freez(rq->chart_id); +} + +static bool sender_is_still_connected_for_this_request(struct replication_request *rq) { + return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender); +} + +static bool replication_execute_request(struct replication_request *rq, bool workers) { + bool ret = false; + + if(!rq->st) { + if(likely(workers)) + worker_is_busy(WORKER_JOB_FIND_CHART); + + rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + } + + if(!rq->st) { + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found", + rrdhost_hostname(rq->sender->host), string2str(rq->chart_id)); + + goto cleanup; + } + + if(!rq->q) { + if(likely(workers)) + worker_is_busy(WORKER_JOB_PREPARE_QUERY); + + rq->q = replication_response_prepare( + rq->st, + rq->start_streaming, + rq->after, + rq->before, + rq->sender->capabilities); + } + + if(likely(workers)) + worker_is_busy(WORKER_JOB_QUERYING); + + // send the replication data + rq->q->rq = rq; + replication_response_execute_and_finalize( + rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL)); + + rq->q = NULL; + + __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); + + ret = true; + +cleanup: + if(rq->q) { + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + } + + string_freez(rq->chart_id); + worker_is_idle(); + return ret; +} + +// ---------------------------------------------------------------------------- +// public API + +void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) { + struct replication_request rq = { + .sender = sender, + .chart_id = string_strdupz(chart_id), + .after = after, + .before = before, + .start_streaming = start_streaming, + .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), + .indexed_in_judy = false, + .not_indexed_buffer_full = false, + .not_indexed_preprocessing = false, + }; + + if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t) + sender->replication.oldest_request_after_t = rq.after; + + if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) + replication_execute_request(&rq, false); + + else + dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request)); +} + +void replication_sender_delete_pending_requests(struct sender_state *sender) { + // allow the dictionary destructor to go faster on locks + dictionary_flush(sender->replication.requests); +} + +void replication_init_sender(struct sender_state *sender) { + sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct replication_request)); + + dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender); + dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender); + dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender); +} + +void replication_cleanup_sender(struct sender_state *sender) { + // allow the dictionary destructor to go faster on locks + replication_recursive_lock(); + dictionary_destroy(sender->replication.requests); + replication_recursive_unlock(); +} + +void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { + size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); + size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size; + + if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) { + rrdpush_sender_replication_buffer_full_set(s, true); + + struct replication_request *rq; + dfe_start_read(s->replication.requests, rq) { + if(rq->indexed_in_judy) + replication_sort_entry_del(rq, true); + } + dfe_done(rq); + + replication_recursive_lock(); + replication_globals.unsafe.senders_full++; + replication_recursive_unlock(); + } + else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) { + rrdpush_sender_replication_buffer_full_set(s, false); + + struct replication_request *rq; + dfe_start_read(s->replication.requests, rq) { + if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing)) + replication_sort_entry_add(rq); + } + dfe_done(rq); + + replication_recursive_lock(); + replication_globals.unsafe.senders_full--; + replication_globals.unsafe.sender_resets++; + // replication_set_next_point_in_time(0, 0); + replication_recursive_unlock(); + } + + rrdpush_sender_set_buffer_used_percent(s, percentage); +} + +// ---------------------------------------------------------------------------- +// replication thread + +static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { + internal_error( + host->sender && + !rrdpush_sender_pending_replication_requests(host->sender) && + dictionary_entries(host->sender->replication.requests) != 0, + "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication", + rrdhost_hostname(host), + rrdpush_sender_pending_replication_requests(host->sender), + dictionary_entries(host->sender->replication.requests) + ); + + size_t ok = 0; + size_t errors = 0; + + RRDSET *st; + rrdset_foreach_read(st, host) { + RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + + bool is_error = false; + + if(!flags) { + internal_error( + true, + "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED", + rrdhost_hostname(host), rrdset_id(st) + ); + is_error = true; + } + + if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + internal_error( + true, + "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished", + rrdhost_hostname(host), rrdset_id(st) + ); + is_error = true; + } + + if(is_error) + errors++; + else + ok++; + } + rrdset_foreach_done(st); + + internal_error(errors, + "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished", + rrdhost_hostname(host), ok, errors); + + return errors; +} + +static void verify_all_hosts_charts_are_streaming_now(void) { + worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY); + + size_t errors = 0; + RRDHOST *host; + dfe_start_read(rrdhost_root_index, host) + errors += verify_host_charts_are_streaming_now(host); + dfe_done(host); + + size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); + netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", + executed - replication_globals.main_thread.last_executed, errors); + replication_globals.main_thread.last_executed = executed; +} + +static void replication_initialize_workers(bool master) { + worker_register("REPLICATION"); + worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next"); + worker_register_job_name(WORKER_JOB_QUERYING, "querying"); + worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); + worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); + worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query"); + worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); + worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); + worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); + worker_register_job_name(WORKER_JOB_WAIT, "wait"); + + if(master) { + worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE); + } +} + +#define REQUEST_OK (0) +#define REQUEST_QUEUE_EMPTY (-1) +#define REQUEST_CHART_NOT_FOUND (-2) + +static __thread struct replication_thread_pipeline { + int max_requests_ahead; + struct replication_request *rqs; + int rqs_last_executed, rqs_last_prepared; + size_t queue_rounds; +} rtp = { + .max_requests_ahead = 0, + .rqs = NULL, + .rqs_last_executed = 0, + .rqs_last_prepared = 0, + .queue_rounds = 0, +}; + +static void replication_pipeline_cancel_and_cleanup(void) { + if(!rtp.rqs) + return; + + struct replication_request *rq; + size_t cancelled = 0; + + do { + if (++rtp.rqs_last_executed >= rtp.max_requests_ahead) + rtp.rqs_last_executed = 0; + + rq = &rtp.rqs[rtp.rqs_last_executed]; + + if (rq->q) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq"); + + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + cancelled++; + } + + rq->executed = true; + rq->found = false; + + } while (rtp.rqs_last_executed != rtp.rqs_last_prepared); + + internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled); + + freez(rtp.rqs); + rtp.rqs = NULL; + rtp.max_requests_ahead = 0; + rtp.rqs_last_executed = 0; + rtp.rqs_last_prepared = 0; + rtp.queue_rounds = 0; +} + +static int replication_pipeline_execute_next(void) { + struct replication_request *rq; + + if(unlikely(!rtp.rqs)) { + rtp.max_requests_ahead = (int)get_netdata_cpus() / 2; + + if(rtp.max_requests_ahead > libuv_worker_threads * 2) + rtp.max_requests_ahead = libuv_worker_threads * 2; + + if(rtp.max_requests_ahead < 2) + rtp.max_requests_ahead = 2; + + rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request)); + __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); + } + + // fill the queue + do { + if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) { + rtp.rqs_last_prepared = 0; + rtp.queue_rounds++; + } + + internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q, + "REPLAY FATAL: slot is used by query that has not been executed!"); + + worker_is_busy(WORKER_JOB_FIND_NEXT); + rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available(); + rq = &rtp.rqs[rtp.rqs_last_prepared]; + + if(rq->found) { + if (!rq->st) { + worker_is_busy(WORKER_JOB_FIND_CHART); + rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + } + + if (rq->st && !rq->q) { + worker_is_busy(WORKER_JOB_PREPARE_QUERY); + rq->q = replication_response_prepare( + rq->st, + rq->start_streaming, + rq->after, + rq->before, + rq->sender->capabilities); + } + + rq->executed = false; + } + + } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed); + + // pick the first usable + do { + if (++rtp.rqs_last_executed >= rtp.max_requests_ahead) + rtp.rqs_last_executed = 0; + + rq = &rtp.rqs[rtp.rqs_last_executed]; + + if(rq->found) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + + if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) { + // the sender has reconnected since this request was queued, + // we can safely throw it away, since the parent will resend it + replication_response_cancel_and_finalize(rq->q); + rq->executed = true; + rq->found = false; + rq->q = NULL; + } + else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) { + // the sender buffer is full, so we can ignore this request, + // it has already been marked as 'preprocessed' in the dictionary, + // and the sender will put it back in when there is + // enough room in the buffer for processing replication requests + replication_response_cancel_and_finalize(rq->q); + rq->executed = true; + rq->found = false; + rq->q = NULL; + } + else { + // we can execute this, + // delete it from the dictionary + worker_is_busy(WORKER_JOB_DELETE_ENTRY); + dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id)); + } + } + else + internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!"); + + } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared); + + if(unlikely(!rq->found)) { + worker_is_idle(); + return REQUEST_QUEUE_EMPTY; + } + + replication_set_latest_first_time(rq->after); + + bool chart_found = replication_execute_request(rq, true); + rq->executed = true; + rq->found = false; + rq->q = NULL; + + if(unlikely(!chart_found)) { + worker_is_idle(); + return REQUEST_CHART_NOT_FOUND; + } + + worker_is_idle(); + return REQUEST_OK; +} + +static void replication_worker_cleanup(void *pptr) { + if(CLEANUP_FUNCTION_GET_PTR(pptr) != (void *)0x01) return; + replication_pipeline_cancel_and_cleanup(); + worker_unregister(); +} + +static void *replication_worker_thread(void *ptr __maybe_unused) { + CLEANUP_FUNCTION_REGISTER(replication_worker_cleanup) cleanup_ptr = (void *)0x1; + replication_initialize_workers(false); + + while (service_running(SERVICE_REPLICATION)) { + if (unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { + sender_thread_buffer_free(); + worker_is_busy(WORKER_JOB_WAIT); + worker_is_idle(); + sleep_usec(1 * USEC_PER_SEC); + } + } + + return NULL; +} + +static void replication_main_cleanup(void *pptr) { + struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr); + if(!static_thread) return; + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + + replication_pipeline_cancel_and_cleanup(); + + int threads = (int)replication_globals.main_thread.threads; + for(int i = 0; i < threads ;i++) { + nd_thread_join(replication_globals.main_thread.threads_ptrs[i]); + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED); + } + freez(replication_globals.main_thread.threads_ptrs); + replication_globals.main_thread.threads_ptrs = NULL; + __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED); + + aral_destroy(replication_globals.aral_rse); + replication_globals.aral_rse = NULL; + + // custom code + worker_unregister(); + + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; +} + +void replication_initialize(void) { + replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry), + 0, 65536, aral_by_size_statistics(), + NULL, NULL, false, false); +} + +void *replication_thread_main(void *ptr __maybe_unused) { + replication_initialize_workers(true); + + int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1); + if(threads < 1 || threads > MAX_REPLICATION_THREADS) { + netdata_log_error("replication threads given %d is invalid, resetting to 1", threads); + threads = 1; + } + + if(--threads) { + replication_globals.main_thread.threads = threads; + replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(ND_THREAD *)); + __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED); + + for(int i = 0; i < threads ;i++) { + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2); + replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(ND_THREAD *)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED); + replication_globals.main_thread.threads_ptrs[i] = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE, + replication_worker_thread, NULL); + } + } + + CLEANUP_FUNCTION_REGISTER(replication_main_cleanup) cleanup_ptr = ptr; + + // start from 100% completed + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); + + long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place + bool slow = true; // control the time we sleep - it has to start with true! + usec_t last_now_mono_ut = now_monotonic_usec(); + time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds + + size_t last_executed = 0; + size_t last_sender_resets = 0; + + while(service_running(SERVICE_REPLICATION)) { + + // statistics + usec_t now_mono_ut = now_monotonic_usec(); + if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) { + last_now_mono_ut = now_mono_ut; + + worker_is_busy(WORKER_JOB_STATISTICS); + replication_recursive_lock(); + + size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); + if(last_executed != current_executed) { + run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; + last_executed = current_executed; + slow = false; + } + + if(replication_reset_next_point_in_time_countdown-- == 0) { + // once per second, make it scan all the pending requests next time + replication_set_next_point_in_time(0, 0); +// replication_globals.protected.skipped_no_room_since_last_reset = 0; + replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; + } + + if(--run_verification_countdown == 0) { + if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) { + // reset the statistics about completion percentage + replication_globals.unsafe.first_time_t = 0; + replication_set_latest_first_time(0); + + verify_all_hosts_charts_are_streaming_now(); + + run_verification_countdown = LONG_MAX; + slow = true; + } + else + run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; + } + + time_t latest_first_time_t = replication_get_latest_first_time(); + if(latest_first_time_t && replication_globals.unsafe.pending) { + // completion percentage statistics + time_t now = now_realtime_sec(); + time_t total = now - replication_globals.unsafe.first_time_t; + time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t; + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, + (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total); + } + else + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); + + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED)); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full); + + replication_recursive_unlock(); + worker_is_idle(); + } + + if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { + + worker_is_busy(WORKER_JOB_WAIT); + replication_recursive_lock(); + + // the timeout also defines now frequently we will traverse all the pending requests + // when the outbound buffers of all senders is full + usec_t timeout; + if(slow) { + // no work to be done, wait for a request to come in + timeout = 1000 * USEC_PER_MS; + sender_thread_buffer_free(); + } + + else if(replication_globals.unsafe.pending > 0) { + if(replication_globals.unsafe.sender_resets == last_sender_resets) + timeout = 1000 * USEC_PER_MS; + + else { + // there are pending requests waiting to be executed, + // but none could be executed at this time. + // try again after this time. + timeout = 100 * USEC_PER_MS; + } + + last_sender_resets = replication_globals.unsafe.sender_resets; + } + else { + // no requests pending, but there were requests recently (run_verification_countdown) + // so, try in a short time. + // if this is big, one chart replicating will be slow to finish (ping - pong just one chart) + timeout = 10 * USEC_PER_MS; + last_sender_resets = replication_globals.unsafe.sender_resets; + } + + replication_recursive_unlock(); + + worker_is_idle(); + sleep_usec(timeout); + + // make it scan all the pending requests next time + replication_set_next_point_in_time(0, 0); + replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; + + continue; + } + } + + return NULL; +} |