// SPDX-License-Identifier: GPL-3.0-or-later #include "replication.h" #include "Judy.h" #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL #define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL #define WORKER_JOB_FIND_NEXT 1 #define WORKER_JOB_QUERYING 2 #define WORKER_JOB_DELETE_ENTRY 3 #define WORKER_JOB_FIND_CHART 4 #define WORKER_JOB_PREPARE_QUERY 5 #define WORKER_JOB_CHECK_CONSISTENCY 6 #define WORKER_JOB_BUFFER_COMMIT 7 #define WORKER_JOB_CLEANUP 8 #define WORKER_JOB_WAIT 9 // master thread worker jobs #define WORKER_JOB_STATISTICS 10 #define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11 #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 #define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13 #define WORKER_JOB_CUSTOM_METRIC_ADDED 14 #define WORKER_JOB_CUSTOM_METRIC_DONE 15 #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 #define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17 #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 #define SECONDS_TO_RESET_POINT_IN_TIME 10 static struct replication_query_statistics replication_queries = { .spinlock = NETDATA_SPINLOCK_INITIALIZER, .queries_started = 0, .queries_finished = 0, .points_read = 0, .points_generated = 0, }; struct replication_query_statistics replication_get_query_statistics(void) { spinlock_lock(&replication_queries.spinlock); struct replication_query_statistics ret = replication_queries; spinlock_unlock(&replication_queries.spinlock); return ret; } size_t replication_buffers_allocated = 0; size_t replication_allocated_buffers(void) { return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED); } // ---------------------------------------------------------------------------- // sending replication replies struct replication_dimension { STORAGE_POINT sp; struct storage_engine_query_handle handle; bool enabled; bool skip; DICTIONARY *dict; const DICTIONARY_ITEM *rda; RRDDIM *rd; }; struct replication_query { RRDSET *st; struct { time_t first_entry_t; time_t last_entry_t; } db; struct { // what the parent requested time_t after; time_t before; bool enable_streaming; } request; struct { // what the child will do time_t after; time_t before; bool enable_streaming; bool locked_data_collection; bool execute; bool interrupted; STREAM_CAPABILITIES capabilities; } query; time_t wall_clock_time; size_t points_read; size_t points_generated; STORAGE_ENGINE_BACKEND backend; struct replication_request *rq; size_t dimensions; struct replication_dimension data[]; }; static struct replication_query *replication_query_prepare( RRDSET *st, time_t db_first_entry, time_t db_last_entry, time_t requested_after, time_t requested_before, bool requested_enable_streaming, time_t query_after, time_t query_before, bool query_enable_streaming, time_t wall_clock_time, STREAM_CAPABILITIES capabilities ) { size_t dimensions = rrdset_number_of_dimensions(st); struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); q->dimensions = dimensions; q->st = st; q->db.first_entry_t = db_first_entry; q->db.last_entry_t = db_last_entry; q->request.after = requested_after, q->request.before = requested_before, q->request.enable_streaming = requested_enable_streaming, q->query.after = query_after; q->query.before = query_before; q->query.enable_streaming = query_enable_streaming; q->query.capabilities = capabilities; q->wall_clock_time = wall_clock_time; if (!q->dimensions || !q->query.after || !q->query.before) { q->query.execute = false; q->dimensions = 0; return q; } if(q->query.enable_streaming) { spinlock_lock(&st->data_collection_lock); q->query.locked_data_collection = true; if (st->last_updated.tv_sec > q->query.before) { #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' " "has start_streaming = true, " "adjusting replication before timestamp from %llu to %llu", rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long) q->query.before, (unsigned long long) st->last_updated.tv_sec ); #endif q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time); } } q->backend = st->rrdhost->db[0].eng->seb; // prepare our array of dimensions size_t count = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd))) continue; if (unlikely(rd_dfe.counter >= q->dimensions)) { internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", rrdhost_hostname(st->rrdhost), rrdset_id(st)); break; } struct replication_dimension *d = &q->data[rd_dfe.counter]; d->dict = rd_dfe.dict; d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); d->rd = rd; storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, q->query.after, q->query.before, q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); d->enabled = true; d->skip = false; count++; } rrddim_foreach_done(rd); if(!count) { // no data for this chart q->query.execute = false; if(q->query.locked_data_collection) { spinlock_unlock(&st->data_collection_lock); q->query.locked_data_collection = false; } } else { // we have data for this chart q->query.execute = true; } return q; } static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) { bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; RRDDIM *rd; rrddim_foreach_read(rd, st){ if (!rrddim_check_upstream_exposed(rd)) continue; buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1); if(with_slots) { buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); } buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) rd->collector.last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value); buffer_fast_strcat(wb, " ", 1); buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value); buffer_fast_strcat(wb, "\n", 1); } rrddim_foreach_done(rd); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1); buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec); buffer_fast_strcat(wb, "\n", 1); } static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) { size_t dimensions = q->dimensions; if(wb && q->query.enable_streaming) replication_send_chart_collection_state(wb, q->st, q->query.capabilities); if(q->query.locked_data_collection) { spinlock_unlock(&q->st->data_collection_lock); q->query.locked_data_collection = false; } // release all the dictionary items acquired // finalize the queries size_t queries = 0; for (size_t i = 0; i < dimensions; i++) { struct replication_dimension *d = &q->data[i]; if (unlikely(!d->enabled)) continue; storage_engine_query_finalize(&d->handle); dictionary_acquired_item_release(d->dict, d->rda); // update global statistics queries++; } if(executed) { spinlock_lock(&replication_queries.spinlock); replication_queries.queries_started += queries; replication_queries.queries_finished += queries; replication_queries.points_read += q->points_read; replication_queries.points_generated += q->points_generated; if(q->st && q->st->rrdhost->sender) { struct sender_state *s = q->st->rrdhost->sender; s->replication.latest_completed_before_t = q->query.before; } spinlock_unlock(&replication_queries.spinlock); } __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); freez(q); } static void replication_query_align_to_optimal_before(struct replication_query *q) { if(!q->query.execute || q->query.enable_streaming) return; size_t dimensions = q->dimensions; time_t expanded_before = 0; for (size_t i = 0; i < dimensions; i++) { struct replication_dimension *d = &q->data[i]; if(unlikely(!d->enabled)) continue; time_t new_before = storage_engine_align_to_optimal_before(&d->handle); if (!expanded_before || new_before < expanded_before) expanded_before = new_before; } if(expanded_before > q->query.before && // it is later than the original (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page) expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time expanded_before < q->wall_clock_time) // it is not later than the wall clock time q->query.before = expanded_before; } static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { replication_query_align_to_optimal_before(q); bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; time_t after = q->query.after; time_t before = q->query.before; size_t dimensions = q->dimensions; time_t wall_clock_time = q->wall_clock_time; bool finished_with_gap = false; size_t points_read = 0, points_generated = 0; #ifdef NETDATA_LOG_REPLICATION_REQUESTS time_t actual_after = 0, actual_before = 0; #endif time_t now = after + 1; time_t last_end_time_in_buffer = 0; while(now <= before) { time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0; for (size_t i = 0; i < dimensions ;i++) { struct replication_dimension *d = &q->data[i]; if(unlikely(!d->enabled || d->skip)) continue; // fetch the first valid point for the dimension int max_skip = 1000; while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) { d->sp = storage_engine_query_next_metric(&d->handle); points_read++; } if(max_skip <= 0) { d->skip = true; nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query " "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), (unsigned long long) now); continue; } if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s)) // this dimension does not provide any data continue; time_t update_every = d->sp.end_time_s - d->sp.start_time_s; if(unlikely(!update_every)) update_every = q->st->update_every; if(unlikely(!min_update_every)) min_update_every = update_every; if(unlikely(!min_start_time)) min_start_time = d->sp.start_time_s; if(unlikely(!min_end_time)) min_end_time = d->sp.end_time_s; min_update_every = MIN(min_update_every, update_every); max_update_every = MAX(max_update_every, update_every); min_start_time = MIN(min_start_time, d->sp.start_time_s); max_start_time = MAX(max_start_time, d->sp.start_time_s); min_end_time = MIN(min_end_time, d->sp.end_time_s); max_end_time = MAX(max_end_time, d->sp.end_time_s); } if (unlikely(min_update_every != max_update_every || min_start_time != max_start_time)) { time_t fix_min_start_time; if(last_end_time_in_buffer && last_end_time_in_buffer >= min_start_time && last_end_time_in_buffer <= max_start_time) { fix_min_start_time = last_end_time_in_buffer; } else fix_min_start_time = min_end_time - min_update_every; #ifdef NETDATA_INTERNAL_CHECKS nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, "REPLAY WARNING: 'host:%s/chart:%s' " "misaligned dimensions, " "update every (min: %ld, max: %ld), " "start time (min: %ld, max: %ld), " "end time (min %ld, max %ld), " "now %ld, last end time sent %ld, " "min start time is fixed to %ld", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), min_update_every, max_update_every, min_start_time, max_start_time, min_end_time, max_end_time, now, last_end_time_in_buffer, fix_min_start_time ); #endif min_start_time = fix_min_start_time; } if(likely(min_start_time <= now && min_end_time >= now)) { // we have a valid point if (unlikely(min_end_time == min_start_time)) min_start_time = min_end_time - q->st->update_every; #ifdef NETDATA_LOG_REPLICATION_REQUESTS if (unlikely(!actual_after)) actual_after = min_end_time; actual_before = min_end_time; #endif if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) { q->query.before = last_end_time_in_buffer; q->query.enable_streaming = false; internal_error(true, "REPLICATION: current buffer size %zu is more than the " "max message size %zu for chart '%s' of host '%s'. " "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.", buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), q->request.after, q->request.before, q->request.enable_streaming?"true":"false", q->query.after, q->query.before, q->query.enable_streaming?"true":"false"); q->query.interrupted = true; break; } last_end_time_in_buffer = min_end_time; buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); if(with_slots) { buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); } buffer_fast_strcat(wb, " '' ", 4); buffer_print_uint64_encoded(wb, integer_encoding, min_start_time); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, min_end_time); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); // output the replay values for this time for (size_t i = 0; i < dimensions; i++) { struct replication_dimension *d = &q->data[i]; if (unlikely(!d->enabled)) continue; if (likely( d->sp.start_time_s <= min_end_time && d->sp.end_time_s >= min_end_time && !storage_point_is_unset(d->sp) && !storage_point_is_gap(d->sp))) { buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1); if(with_slots) { buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot); } buffer_fast_strcat(wb, " \"", 2); buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id)); buffer_fast_strcat(wb, "\" ", 2); buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum); buffer_fast_strcat(wb, " ", 1); buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED); buffer_fast_strcat(wb, "\n", 1); points_generated++; } } now = min_end_time + 1; } else if(unlikely(min_end_time < now)) // the query does not progress break; else { // we have gap - all points are in the future now = min_start_time; if(min_start_time > before && !points_generated) { before = q->query.before = min_start_time - 1; finished_with_gap = true; break; } } } #ifdef NETDATA_LOG_REPLICATION_REQUESTS if(actual_after) { char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1]; log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after); log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before); internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf, (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before)); } else internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)", rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS q->points_read += points_read; q->points_generated += points_generated; if(last_end_time_in_buffer < before - q->st->update_every) finished_with_gap = true; return finished_with_gap; } static struct replication_query *replication_response_prepare( RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before, STREAM_CAPABILITIES capabilities ) { time_t wall_clock_time = now_realtime_sec(); if(requested_after > requested_before) { // flip them time_t t = requested_before; requested_before = requested_after; requested_after = t; } if(requested_after > wall_clock_time) { requested_after = 0; requested_before = 0; requested_enable_streaming = true; } if(requested_before > wall_clock_time) { requested_before = wall_clock_time; requested_enable_streaming = true; } time_t query_after = requested_after; time_t query_before = requested_before; bool query_enable_streaming = requested_enable_streaming; time_t db_first_entry = 0, db_last_entry = 0; rrdset_get_retention_of_tier_for_collected_chart( st, &db_first_entry, &db_last_entry, wall_clock_time, 0); if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) { // no data requested - just enable streaming ; } else { if (query_after < db_first_entry) query_after = db_first_entry; if (query_before > db_last_entry) query_before = db_last_entry; // if the parent asked us to start streaming, then fill the rest with the data that we have if (requested_enable_streaming) query_before = db_last_entry; if (query_after > query_before) { time_t tmp = query_before; query_before = query_after; query_after = tmp; } query_enable_streaming = (requested_enable_streaming || query_before == db_last_entry || !requested_after || !requested_before) ? true : false; } return replication_query_prepare( st, db_first_entry, db_last_entry, requested_after, requested_before, requested_enable_streaming, query_after, query_before, query_enable_streaming, wall_clock_time, capabilities); } void replication_response_cancel_and_finalize(struct replication_query *q) { if(!q) return; replication_query_finalize(NULL, q, false); } static bool sender_is_still_connected_for_this_request(struct replication_request *rq); bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; struct replication_request *rq = q->rq; RRDSET *st = q->st; RRDHOST *host = st->rrdhost; // we might want to optimize this by filling a temporary buffer // and copying the result to the host's buffer in order to avoid // holding the host's buffer lock for too long BUFFER *wb = sender_start(host->sender); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1); if(with_slots) { buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot); } buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "'\n", 2); bool locked_data_collection = q->query.locked_data_collection; q->query.locked_data_collection = false; bool finished_with_gap = false; if(q->query.execute) finished_with_gap = replication_query_execute(wb, q, max_msg_size); time_t after = q->request.after; time_t before = q->query.before; bool enable_streaming = q->query.enable_streaming; replication_query_finalize(wb, q, q->query.execute); q = NULL; // IMPORTANT: q is invalid now // get a fresh retention to send to the parent time_t wall_clock_time = now_realtime_sec(); time_t db_first_entry, db_last_entry; rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); // end with first/last entries we have, and the first start time and // last end time of the data we sent buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1); buffer_print_int64_encoded(wb, integer_encoding, st->update_every); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry); buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7); buffer_print_uint64_encoded(wb, integer_encoding, after); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, before); buffer_fast_strcat(wb, " ", 1); buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION); worker_is_busy(WORKER_JOB_CLEANUP); if(enable_streaming) { if(sender_is_still_connected_for_this_request(rq)) { // enable normal streaming if we have to // but only if the sender buffer has not been flushed since we started if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); rrdhost_sender_replicating_charts_minus_one(st->rrdhost); if(!finished_with_gap) st->rrdpush.sender.resync_time_s = 0; #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", rrdhost_hostname(st->rrdhost), rrdset_id(st)); #endif } else internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", rrdhost_hostname(st->rrdhost), rrdset_id(st)); } } if(locked_data_collection) spinlock_unlock(&st->data_collection_lock); return enable_streaming; } // ---------------------------------------------------------------------------- // sending replication requests struct replication_request_details { struct { send_command callback; struct parser *parser; } caller; RRDHOST *host; RRDSET *st; struct { time_t first_entry_t; // the first entry time the child has time_t last_entry_t; // the last entry time the child has time_t wall_clock_time; // the current time of the child bool fixed_last_entry; // when set we set the last entry to wall clock time } child_db; struct { time_t first_entry_t; // the first entry time we have time_t last_entry_t; // the last entry time we have time_t wall_clock_time; // the current local world clock time } local_db; struct { time_t from; // the starting time of the entire gap we have time_t to; // the ending time of the entire gap we have } gap; struct { time_t after; // the start time we requested previously from this child time_t before; // the end time we requested previously from this child } last_request; struct { time_t after; // the start time of this replication request - the child will add 1 second time_t before; // the end time of this replication request bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before' } wanted; }; static void replicate_log_request(struct replication_request_details *r, const char *msg) { #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "REPLAY ERROR: 'host:%s/chart:%s' child sent: " "db from %ld to %ld%s, wall clock time %ld, " "last request from %ld to %ld, " "issue: %s - " "sending replication request from %ld to %ld, start streaming %s", rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st), r->child_db.first_entry_t, r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "", r->child_db.wall_clock_time, r->last_request.after, r->last_request.before, msg, r->wanted.after, r->wanted.before, r->wanted.start_streaming ? "true" : "false"); } static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) { RRDSET *st = r->st; if(log) replicate_log_request(r, msg); if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t)) st->rrdhost->receiver->replication_first_time_t = r->wanted.after; #ifdef NETDATA_LOG_REPLICATION_REQUESTS st->replay.log_next_data_collection = true; char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = ""; if(r->wanted.after) log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after); if(r->wanted.before) log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before); internal_error(true, "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s" , rrdhost_hostname(r->host), rrdset_id(r->st) , r->wanted.after, wanted_after_buf , r->wanted.before, wanted_before_buf , r->wanted.start_streaming ? "YES" : "NO" , msg , r->last_request.after, r->last_request.before , r->child_db.first_entry_t, r->child_db.last_entry_t , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD" , r->local_db.first_entry_t, r->local_db.last_entry_t , r->local_db.wall_clock_time , r->gap.from, r->gap.to , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" ); st->replay.start_streaming = r->wanted.start_streaming; st->replay.after = r->wanted.after; st->replay.before = r->wanted.before; #endif // NETDATA_LOG_REPLICATION_REQUESTS char buffer[2048 + 1]; snprintfz(buffer, sizeof(buffer) - 1, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n", rrdset_id(st), r->wanted.start_streaming ? "true" : "false", (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); ssize_t ret = r->caller.callback(buffer, r->caller.parser); if (ret < 0) { netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)", rrdhost_hostname(r->host), rrdset_id(r->st), ret); return false; } return true; } bool replicate_chart_request(send_command callback, struct parser *parser, RRDHOST *host, RRDSET *st, time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time, time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) { struct replication_request_details r = { .caller = { .callback = callback, .parser = parser, }, .host = host, .st = st, .child_db = { .first_entry_t = child_first_entry, .last_entry_t = child_last_entry, .wall_clock_time = child_wall_clock_time, .fixed_last_entry = false, }, .local_db = { .first_entry_t = 0, .last_entry_t = 0, .wall_clock_time = now_realtime_sec(), }, .last_request = { .after = prev_first_entry_wanted, .before = prev_last_entry_wanted, }, .wanted = { .after = 0, .before = 0, .start_streaming = true, }, }; if(r.child_db.last_entry_t > r.child_db.wall_clock_time) { replicate_log_request(&r, "child's db last entry > child's wall clock time"); r.child_db.last_entry_t = r.child_db.wall_clock_time; r.child_db.fixed_last_entry = true; } rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0); // let's find the GAP we have if(!r.last_request.after || !r.last_request.before) { // there is no previous request if(r.local_db.last_entry_t) // we have some data, let's continue from the last point we have r.gap.from = r.local_db.last_entry_t; else // we don't have any data, the gap is the max timeframe we are allowed to replicate r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate; } else { // we had sent a request - let's continue at the point we left it // for this we don't take into account the actual data in our db // because the child may also have gaps, and we need to get over it r.gap.from = r.last_request.before; } // we want all the data up to now r.gap.to = r.local_db.wall_clock_time; // The gap is now r.gap.from -> r.gap.to if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false); if (unlikely(!rrdset_number_of_dimensions(st))) return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false); if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t)) return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false); if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0)) return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true); if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time)) return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true); if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t)) return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true); if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t)) return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false); // let's find what the child can provide to fill that gap if(r.child_db.first_entry_t > r.gap.from) // the child does not have all the data - let's get what it has r.wanted.after = r.child_db.first_entry_t; else // ok, the child can fill the entire gap we have r.wanted.after = r.gap.from; if(r.gap.to - r.wanted.after > host->rrdpush_replication_step) // the duration is too big for one request - let's take the first step r.wanted.before = r.wanted.after + host->rrdpush_replication_step; else // wow, we can do it in one request r.wanted.before = r.gap.to; // don't ask from the child more than it has if(r.wanted.before > r.child_db.last_entry_t) r.wanted.before = r.child_db.last_entry_t; if(r.wanted.after > r.wanted.before) { r.wanted.after = 0; r.wanted.before = 0; r.wanted.start_streaming = true; return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true); } // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before >= r.child_db.last_entry_t || r.wanted.before >= r.child_db.wall_clock_time || r.wanted.before >= r.local_db.wall_clock_time); // the wanted timeframe is now r.wanted.after -> r.wanted.before // send it return send_replay_chart_cmd(&r, "OK", false); } // ---------------------------------------------------------------------------- // replication thread // replication request in sender DICTIONARY // used for de-duplicating the requests struct replication_request { struct sender_state *sender; // the sender we should put the reply at STRING *chart_id; // the chart of the request time_t after; // the start time of the query (maybe zero) key for sorting (JudyL) time_t before; // the end time of the query (maybe zero) usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request Word_t unique_id; // auto-increment, later requests have bigger bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming bool indexed_in_judy; // true when the request is indexed in judy bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing // prepare ahead members - preprocessing bool found; // used as a result boolean for the find call bool executed; // used to detect if we have skipped requests while preprocessing RRDSET *st; // caching of the chart during preprocessing struct replication_query *q; // the preprocessing query initialization }; // replication sort entry in JudyL array // used for sorting all requests, across all nodes struct replication_sort_entry { struct replication_request *rq; size_t unique_id; // used as a key to identify the sort entry - we never access its contents }; #define MAX_REPLICATION_THREADS 20 // + 1 for the main thread // the global variables for the replication thread static struct replication_thread { ARAL *aral_rse; SPINLOCK spinlock; struct { size_t pending; // number of requests pending in the queue // statistics size_t added; // number of requests added to the queue size_t removed; // number of requests removed from the queue size_t pending_no_room; // number of requests skipped, because the sender has no room for responses size_t senders_full; // number of times a sender reset our last position in the queue size_t sender_resets; // number of times a sender reset our last position in the queue time_t first_time_t; // the minimum 'after' we encountered struct { Word_t after; Word_t unique_id; Pvoid_t JudyL_array; } queue; } unsafe; // protected from replication_recursive_lock() struct { Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) size_t executed; // the number of replication requests executed size_t latest_first_time; // the 'after' timestamp of the last request we executed size_t memory; // the total memory allocated by replication } atomic; // access should be with atomic operations struct { size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time ND_THREAD **threads_ptrs; size_t threads; } main_thread; // access is allowed only by the main thread } replication_globals = { .aral_rse = NULL, .spinlock = NETDATA_SPINLOCK_INITIALIZER, .unsafe = { .pending = 0, .added = 0, .removed = 0, .pending_no_room = 0, .sender_resets = 0, .senders_full = 0, .first_time_t = 0, .queue = { .after = 0, .unique_id = 0, .JudyL_array = NULL, }, }, .atomic = { .unique_id = 0, .executed = 0, .latest_first_time = 0, .memory = 0, }, .main_thread = { .last_executed = 0, .threads = 0, .threads_ptrs = NULL, }, }; size_t replication_allocated_memory(void) { return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED); } #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) static inline bool replication_recursive_lock_mode(char mode) { static __thread int recursions = 0; if(mode == 'L') { // (L)ock if(++recursions == 1) spinlock_lock(&replication_globals.spinlock); } else if(mode == 'U') { // (U)nlock if(--recursions == 0) spinlock_unlock(&replication_globals.spinlock); } else if(mode == 'C') { // (C)heck if(recursions > 0) return true; else return false; } else fatal("REPLICATION: unknown lock mode '%c'", mode); #ifdef NETDATA_INTERNAL_CHECKS if(recursions < 0) fatal("REPLICATION: recursions is %d", recursions); #endif return true; } #define replication_recursive_lock() replication_recursive_lock_mode('L') #define replication_recursive_unlock() replication_recursive_lock_mode('U') #define fatal_when_replication_is_not_locked_for_me() do { \ if(!replication_recursive_lock_mode('C')) \ fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \ } while(0) void replication_set_next_point_in_time(time_t after, size_t unique_id) { replication_recursive_lock(); replication_globals.unsafe.queue.after = after; replication_globals.unsafe.queue.unique_id = unique_id; replication_recursive_unlock(); } // ---------------------------------------------------------------------------- // replication sort entry management static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse); __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); // copy the request rse->rq = rq; rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST); // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; rq->indexed_in_judy = false; rq->not_indexed_buffer_full = false; rq->not_indexed_preprocessing = false; return rse; } static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { aral_freez(replication_globals.aral_rse, rse); __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); } static void replication_sort_entry_add(struct replication_request *rq) { if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; rq->not_indexed_preprocessing = false; replication_recursive_lock(); replication_globals.unsafe.pending_no_room++; replication_recursive_unlock(); return; } // cache this, because it will be changed bool decrement_no_room = rq->not_indexed_buffer_full; struct replication_sort_entry *rse = replication_sort_entry_create(rq); replication_recursive_lock(); if(decrement_no_room) replication_globals.unsafe.pending_no_room--; // if(rq->after < (time_t)replication_globals.protected.queue.after && // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && // !replication_globals.protected.skipped_no_room_since_last_reset) { // // // make it find this request first // replication_set_next_point_in_time(rq->after, rq->unique_id); // } replication_globals.unsafe.added++; replication_globals.unsafe.pending++; Pvoid_t *inner_judy_ptr; // find the outer judy entry, using after as key size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR)) fatal("REPLICATION: corrupted outer judyL"); // add it to the inner judy, using unique_id as key size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr); Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr); if(unlikely(!item || item == PJERR)) fatal("REPLICATION: corrupted inner judyL"); *item = rse; rq->indexed_in_judy = true; rq->not_indexed_buffer_full = false; rq->not_indexed_preprocessing = false; if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED); } static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) { fatal_when_replication_is_not_locked_for_me(); bool inner_judy_deleted = false; replication_globals.unsafe.removed++; replication_globals.unsafe.pending--; rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); rse->rq->indexed_in_judy = false; rse->rq->not_indexed_preprocessing = preprocessing; size_t memory_saved = 0; // delete it from the inner judy size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); memory_saved = mem_before_inner_judyl - mem_after_inner_judyl; // if no items left, delete it from the outer judy if(**inner_judy_ppptr == NULL) { size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); memory_saved += mem_before_outer_judyl - mem_after_outer_judyl; inner_judy_deleted = true; } // free memory replication_sort_entry_destroy(rse); __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED); return inner_judy_deleted; } static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) { Pvoid_t *inner_judy_pptr; struct replication_sort_entry *rse_to_delete = NULL; replication_recursive_lock(); if(rq->indexed_in_judy) { inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0); if (inner_judy_pptr) { Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); if (our_item_pptr) { rse_to_delete = *our_item_pptr; replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false); if(buffer_full) { replication_globals.unsafe.pending_no_room++; rq->not_indexed_buffer_full = true; } } } if (!rse_to_delete) fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.", rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after); } replication_recursive_unlock(); } static struct replication_request replication_request_get_first_available() { Pvoid_t *inner_judy_pptr; replication_recursive_lock(); struct replication_request rq_to_return = (struct replication_request){ .found = false }; if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) { replication_globals.unsafe.queue.after = 0; replication_globals.unsafe.queue.unique_id = 0; } Word_t started_after = replication_globals.unsafe.queue.after; size_t round = 0; while(!rq_to_return.found) { round++; if(round > 2) break; if(round == 2) { if(started_after == 0) break; replication_globals.unsafe.queue.after = 0; replication_globals.unsafe.queue.unique_id = 0; } bool find_same_after = true; while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) { Pvoid_t *our_item_pptr; if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after)) break; while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) { struct replication_sort_entry *rse = *our_item_pptr; struct replication_request *rq = rse->rq; // copy the request to return it rq_to_return = *rq; rq_to_return.chart_id = string_dup(rq_to_return.chart_id); // set the return result to found rq_to_return.found = true; if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true)) // we removed the item from the outer JudyL break; } // prepare for the next iteration on the outer loop replication_globals.unsafe.queue.unique_id = 0; } } replication_recursive_unlock(); return rq_to_return; } // ---------------------------------------------------------------------------- // replication request management static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) { struct sender_state *s = sender_state; (void)s; struct replication_request *rq = value; // IMPORTANT: // We use the react instead of the insert callback // because we want the item to be atomically visible // to our replication thread, immediately after. // If we put this at the insert callback, the item is not guaranteed // to be atomically visible to others, so the replication thread // may see the replication sort entry, but fail to find the dictionary item // related to it. replication_sort_entry_add(rq); // this request is about a unique chart for this sender rrdpush_sender_replicating_charts_plus_one(s); } static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) { struct sender_state *s = sender_state; (void)s; struct replication_request *rq = old_value; (void)rq; struct replication_request *rq_new = new_value; replication_recursive_lock(); if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) { // we can replace this command internal_error( true, "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); rq->after = rq_new->after; rq->before = rq_new->before; rq->start_streaming = rq_new->start_streaming; } else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) { replication_sort_entry_add(rq); internal_error( true, "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); } else { internal_error( true, "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false", (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false"); } replication_recursive_unlock(); string_freez(rq_new->chart_id); return false; } static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) { struct replication_request *rq = value; // this request is about a unique chart for this sender rrdpush_sender_replicating_charts_minus_one(rq->sender); if(rq->indexed_in_judy) replication_sort_entry_del(rq, false); else if(rq->not_indexed_buffer_full) { replication_recursive_lock(); replication_globals.unsafe.pending_no_room--; replication_recursive_unlock(); } string_freez(rq->chart_id); } static bool sender_is_still_connected_for_this_request(struct replication_request *rq) { return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender); } static bool replication_execute_request(struct replication_request *rq, bool workers) { bool ret = false; if(!rq->st) { if(likely(workers)) worker_is_busy(WORKER_JOB_FIND_CHART); rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); } if(!rq->st) { internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found", rrdhost_hostname(rq->sender->host), string2str(rq->chart_id)); goto cleanup; } if(!rq->q) { if(likely(workers)) worker_is_busy(WORKER_JOB_PREPARE_QUERY); rq->q = replication_response_prepare( rq->st, rq->start_streaming, rq->after, rq->before, rq->sender->capabilities); } if(likely(workers)) worker_is_busy(WORKER_JOB_QUERYING); // send the replication data rq->q->rq = rq; replication_response_execute_and_finalize( rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL)); rq->q = NULL; __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); ret = true; cleanup: if(rq->q) { replication_response_cancel_and_finalize(rq->q); rq->q = NULL; } string_freez(rq->chart_id); worker_is_idle(); return ret; } // ---------------------------------------------------------------------------- // public API void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) { struct replication_request rq = { .sender = sender, .chart_id = string_strdupz(chart_id), .after = after, .before = before, .start_streaming = start_streaming, .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), .indexed_in_judy = false, .not_indexed_buffer_full = false, .not_indexed_preprocessing = false, }; if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t) sender->replication.oldest_request_after_t = rq.after; if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) replication_execute_request(&rq, false); else dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request)); } void replication_sender_delete_pending_requests(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks dictionary_flush(sender->replication.requests); } void replication_init_sender(struct sender_state *sender) { sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct replication_request)); dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender); dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender); dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender); } void replication_cleanup_sender(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks replication_recursive_lock(); dictionary_destroy(sender->replication.requests); replication_recursive_unlock(); } void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size; if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) { rrdpush_sender_replication_buffer_full_set(s, true); struct replication_request *rq; dfe_start_read(s->replication.requests, rq) { if(rq->indexed_in_judy) replication_sort_entry_del(rq, true); } dfe_done(rq); replication_recursive_lock(); replication_globals.unsafe.senders_full++; replication_recursive_unlock(); } else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) { rrdpush_sender_replication_buffer_full_set(s, false); struct replication_request *rq; dfe_start_read(s->replication.requests, rq) { if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing)) replication_sort_entry_add(rq); } dfe_done(rq); replication_recursive_lock(); replication_globals.unsafe.senders_full--; replication_globals.unsafe.sender_resets++; // replication_set_next_point_in_time(0, 0); replication_recursive_unlock(); } rrdpush_sender_set_buffer_used_percent(s, percentage); } // ---------------------------------------------------------------------------- // replication thread static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { internal_error( host->sender && !rrdpush_sender_pending_replication_requests(host->sender) && dictionary_entries(host->sender->replication.requests) != 0, "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication", rrdhost_hostname(host), rrdpush_sender_pending_replication_requests(host->sender), dictionary_entries(host->sender->replication.requests) ); size_t ok = 0; size_t errors = 0; RRDSET *st; rrdset_foreach_read(st, host) { RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED); bool is_error = false; if(!flags) { internal_error( true, "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED", rrdhost_hostname(host), rrdset_id(st) ); is_error = true; } if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { internal_error( true, "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished", rrdhost_hostname(host), rrdset_id(st) ); is_error = true; } if(is_error) errors++; else ok++; } rrdset_foreach_done(st); internal_error(errors, "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished", rrdhost_hostname(host), ok, errors); return errors; } static void verify_all_hosts_charts_are_streaming_now(void) { worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY); size_t errors = 0; RRDHOST *host; dfe_start_read(rrdhost_root_index, host) errors += verify_host_charts_are_streaming_now(host); dfe_done(host); size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.main_thread.last_executed, errors); replication_globals.main_thread.last_executed = executed; } static void replication_initialize_workers(bool master) { worker_register("REPLICATION"); worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next"); worker_register_job_name(WORKER_JOB_QUERYING, "querying"); worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query"); worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); worker_register_job_name(WORKER_JOB_WAIT, "wait"); if(master) { worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE); } } #define REQUEST_OK (0) #define REQUEST_QUEUE_EMPTY (-1) #define REQUEST_CHART_NOT_FOUND (-2) static __thread struct replication_thread_pipeline { int max_requests_ahead; struct replication_request *rqs; int rqs_last_executed, rqs_last_prepared; size_t queue_rounds; } rtp = { .max_requests_ahead = 0, .rqs = NULL, .rqs_last_executed = 0, .rqs_last_prepared = 0, .queue_rounds = 0, }; static void replication_pipeline_cancel_and_cleanup(void) { if(!rtp.rqs) return; struct replication_request *rq; size_t cancelled = 0; do { if (++rtp.rqs_last_executed >= rtp.max_requests_ahead) rtp.rqs_last_executed = 0; rq = &rtp.rqs[rtp.rqs_last_executed]; if (rq->q) { internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq"); replication_response_cancel_and_finalize(rq->q); rq->q = NULL; cancelled++; } rq->executed = true; rq->found = false; } while (rtp.rqs_last_executed != rtp.rqs_last_prepared); internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled); freez(rtp.rqs); rtp.rqs = NULL; rtp.max_requests_ahead = 0; rtp.rqs_last_executed = 0; rtp.rqs_last_prepared = 0; rtp.queue_rounds = 0; } static int replication_pipeline_execute_next(void) { struct replication_request *rq; if(unlikely(!rtp.rqs)) { rtp.max_requests_ahead = (int)get_netdata_cpus() / 2; if(rtp.max_requests_ahead > libuv_worker_threads * 2) rtp.max_requests_ahead = libuv_worker_threads * 2; if(rtp.max_requests_ahead < 2) rtp.max_requests_ahead = 2; rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request)); __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); } // fill the queue do { if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) { rtp.rqs_last_prepared = 0; rtp.queue_rounds++; } internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q, "REPLAY FATAL: slot is used by query that has not been executed!"); worker_is_busy(WORKER_JOB_FIND_NEXT); rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available(); rq = &rtp.rqs[rtp.rqs_last_prepared]; if(rq->found) { if (!rq->st) { worker_is_busy(WORKER_JOB_FIND_CHART); rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); } if (rq->st && !rq->q) { worker_is_busy(WORKER_JOB_PREPARE_QUERY); rq->q = replication_response_prepare( rq->st, rq->start_streaming, rq->after, rq->before, rq->sender->capabilities); } rq->executed = false; } } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed); // pick the first usable do { if (++rtp.rqs_last_executed >= rtp.max_requests_ahead) rtp.rqs_last_executed = 0; rq = &rtp.rqs[rtp.rqs_last_executed]; if(rq->found) { internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) { // the sender has reconnected since this request was queued, // we can safely throw it away, since the parent will resend it replication_response_cancel_and_finalize(rq->q); rq->executed = true; rq->found = false; rq->q = NULL; } else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) { // the sender buffer is full, so we can ignore this request, // it has already been marked as 'preprocessed' in the dictionary, // and the sender will put it back in when there is // enough room in the buffer for processing replication requests replication_response_cancel_and_finalize(rq->q); rq->executed = true; rq->found = false; rq->q = NULL; } else { // we can execute this, // delete it from the dictionary worker_is_busy(WORKER_JOB_DELETE_ENTRY); dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id)); } } else internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!"); } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared); if(unlikely(!rq->found)) { worker_is_idle(); return REQUEST_QUEUE_EMPTY; } replication_set_latest_first_time(rq->after); bool chart_found = replication_execute_request(rq, true); rq->executed = true; rq->found = false; rq->q = NULL; if(unlikely(!chart_found)) { worker_is_idle(); return REQUEST_CHART_NOT_FOUND; } worker_is_idle(); return REQUEST_OK; } static void replication_worker_cleanup(void *pptr) { if(CLEANUP_FUNCTION_GET_PTR(pptr) != (void *)0x01) return; replication_pipeline_cancel_and_cleanup(); worker_unregister(); } static void *replication_worker_thread(void *ptr __maybe_unused) { CLEANUP_FUNCTION_REGISTER(replication_worker_cleanup) cleanup_ptr = (void *)0x1; replication_initialize_workers(false); while (service_running(SERVICE_REPLICATION)) { if (unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); sleep_usec(1 * USEC_PER_SEC); } } return NULL; } static void replication_main_cleanup(void *pptr) { struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr); if(!static_thread) return; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; replication_pipeline_cancel_and_cleanup(); int threads = (int)replication_globals.main_thread.threads; for(int i = 0; i < threads ;i++) { nd_thread_join(replication_globals.main_thread.threads_ptrs[i]); __atomic_sub_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED); } freez(replication_globals.main_thread.threads_ptrs); replication_globals.main_thread.threads_ptrs = NULL; __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED); aral_destroy(replication_globals.aral_rse); replication_globals.aral_rse = NULL; // custom code worker_unregister(); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; } void replication_initialize(void) { replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry), 0, 65536, aral_by_size_statistics(), NULL, NULL, false, false); } void *replication_thread_main(void *ptr __maybe_unused) { replication_initialize_workers(true); int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1); if(threads < 1 || threads > MAX_REPLICATION_THREADS) { netdata_log_error("replication threads given %d is invalid, resetting to 1", threads); threads = 1; } if(--threads) { replication_globals.main_thread.threads = threads; replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(ND_THREAD *)); __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(ND_THREAD *), __ATOMIC_RELAXED); for(int i = 0; i < threads ;i++) { char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2); replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(ND_THREAD *)); __atomic_add_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED); replication_globals.main_thread.threads_ptrs[i] = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); } } CLEANUP_FUNCTION_REGISTER(replication_main_cleanup) cleanup_ptr = ptr; // start from 100% completed worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place bool slow = true; // control the time we sleep - it has to start with true! usec_t last_now_mono_ut = now_monotonic_usec(); time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds size_t last_executed = 0; size_t last_sender_resets = 0; while(service_running(SERVICE_REPLICATION)) { // statistics usec_t now_mono_ut = now_monotonic_usec(); if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) { last_now_mono_ut = now_mono_ut; worker_is_busy(WORKER_JOB_STATISTICS); replication_recursive_lock(); size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); if(last_executed != current_executed) { run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; last_executed = current_executed; slow = false; } if(replication_reset_next_point_in_time_countdown-- == 0) { // once per second, make it scan all the pending requests next time replication_set_next_point_in_time(0, 0); // replication_globals.protected.skipped_no_room_since_last_reset = 0; replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; } if(--run_verification_countdown == 0) { if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) { // reset the statistics about completion percentage replication_globals.unsafe.first_time_t = 0; replication_set_latest_first_time(0); verify_all_hosts_charts_are_streaming_now(); run_verification_countdown = LONG_MAX; slow = true; } else run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; } time_t latest_first_time_t = replication_get_latest_first_time(); if(latest_first_time_t && replication_globals.unsafe.pending) { // completion percentage statistics time_t now = now_realtime_sec(); time_t total = now - replication_globals.unsafe.first_time_t; time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t; worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total); } else worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED)); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full); replication_recursive_unlock(); worker_is_idle(); } if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) { worker_is_busy(WORKER_JOB_WAIT); replication_recursive_lock(); // the timeout also defines now frequently we will traverse all the pending requests // when the outbound buffers of all senders is full usec_t timeout; if(slow) { // no work to be done, wait for a request to come in timeout = 1000 * USEC_PER_MS; sender_thread_buffer_free(); } else if(replication_globals.unsafe.pending > 0) { if(replication_globals.unsafe.sender_resets == last_sender_resets) timeout = 1000 * USEC_PER_MS; else { // there are pending requests waiting to be executed, // but none could be executed at this time. // try again after this time. timeout = 100 * USEC_PER_MS; } last_sender_resets = replication_globals.unsafe.sender_resets; } else { // no requests pending, but there were requests recently (run_verification_countdown) // so, try in a short time. // if this is big, one chart replicating will be slow to finish (ping - pong just one chart) timeout = 10 * USEC_PER_MS; last_sender_resets = replication_globals.unsafe.sender_resets; } replication_recursive_unlock(); worker_is_idle(); sleep_usec(timeout); // make it scan all the pending requests next time replication_set_next_point_in_time(0, 0); replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; continue; } } return NULL; }