diff options
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 1104 |
1 files changed, 787 insertions, 317 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index d659d701d..7c1f16b4c 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -3,28 +3,30 @@ #include "replication.h" #include "Judy.h" -#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 +#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL +#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL #define WORKER_JOB_FIND_NEXT 1 #define WORKER_JOB_QUERYING 2 #define WORKER_JOB_DELETE_ENTRY 3 #define WORKER_JOB_FIND_CHART 4 -#define WORKER_JOB_CHECK_CONSISTENCY 5 -#define WORKER_JOB_BUFFER_COMMIT 6 -#define WORKER_JOB_CLEANUP 7 -#define WORKER_JOB_WAIT 8 +#define WORKER_JOB_PREPARE_QUERY 5 +#define WORKER_JOB_CHECK_CONSISTENCY 6 +#define WORKER_JOB_BUFFER_COMMIT 7 +#define WORKER_JOB_CLEANUP 8 +#define WORKER_JOB_WAIT 9 // master thread worker jobs -#define WORKER_JOB_STATISTICS 9 -#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11 -#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12 -#define WORKER_JOB_CUSTOM_METRIC_ADDED 13 -#define WORKER_JOB_CUSTOM_METRIC_DONE 14 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16 +#define WORKER_JOB_STATISTICS 10 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 14 +#define WORKER_JOB_CUSTOM_METRIC_DONE 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17 #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 #define SECONDS_TO_RESET_POINT_IN_TIME 10 @@ -44,6 +46,12 @@ struct replication_query_statistics replication_get_query_statistics(void) { return ret; } +size_t replication_buffers_allocated = 0; + +size_t replication_allocated_buffers(void) { + return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED); +} + // ---------------------------------------------------------------------------- // sending replication replies @@ -51,137 +59,400 @@ struct replication_dimension { STORAGE_POINT sp; struct storage_engine_query_handle handle; bool enabled; + bool skip; DICTIONARY *dict; const DICTIONARY_ITEM *rda; RRDDIM *rd; }; -static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) { +struct replication_query { + RRDSET *st; + + struct { + time_t first_entry_t; + time_t last_entry_t; + } db; + + struct { // what the parent requested + time_t after; + time_t before; + bool enable_streaming; + } request; + + struct { // what the child will do + time_t after; + time_t before; + bool enable_streaming; + + bool locked_data_collection; + bool execute; + bool interrupted; + } query; + + time_t wall_clock_time; + + size_t points_read; + size_t points_generated; + + struct storage_engine_query_ops *ops; + struct replication_request *rq; + + size_t dimensions; + struct replication_dimension data[]; +}; + +static struct replication_query *replication_query_prepare( + RRDSET *st, + time_t db_first_entry, + time_t db_last_entry, + time_t requested_after, + time_t requested_before, + bool requested_enable_streaming, + time_t query_after, + time_t query_before, + bool query_enable_streaming, + time_t wall_clock_time +) { size_t dimensions = rrdset_number_of_dimensions(st); - size_t points_read = 0, points_generated = 0; + struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); + __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); - struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; - struct replication_dimension data[dimensions]; - memset(data, 0, sizeof(data)); + q->dimensions = dimensions; + q->st = st; - if(enable_streaming && st->last_updated.tv_sec > before) { - internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)before, - (unsigned long long)st->last_updated.tv_sec - ); - before = st->last_updated.tv_sec; + q->db.first_entry_t = db_first_entry; + q->db.last_entry_t = db_last_entry; + + q->request.after = requested_after, + q->request.before = requested_before, + q->request.enable_streaming = requested_enable_streaming, + + q->query.after = query_after; + q->query.before = query_before; + q->query.enable_streaming = query_enable_streaming; + + q->wall_clock_time = wall_clock_time; + + if (!q->dimensions || !q->query.after || !q->query.before) { + q->query.execute = false; + q->dimensions = 0; + return q; + } + + if(q->query.enable_streaming) { + netdata_spinlock_lock(&st->data_collection_lock); + q->query.locked_data_collection = true; + + if (st->last_updated.tv_sec > q->query.before) { +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s' " + "has start_streaming = true, " + "adjusting replication before timestamp from %llu to %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long) q->query.before, + (unsigned long long) st->last_updated.tv_sec + ); +#endif + q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time); + } } + q->ops = &st->rrdhost->db[0].eng->api.query_ops; + // prepare our array of dimensions - { - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(unlikely(!rd || !rd_dfe.item || !rd->exposed)) - continue; + size_t count = 0; + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if (unlikely(!rd || !rd_dfe.item || !rd->exposed)) + continue; - if (unlikely(rd_dfe.counter >= dimensions)) { - internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - break; - } + if (unlikely(rd_dfe.counter >= q->dimensions)) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + break; + } + + struct replication_dimension *d = &q->data[rd_dfe.counter]; + + d->dict = rd_dfe.dict; + d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + d->rd = rd; - struct replication_dimension *d = &data[rd_dfe.counter]; + q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before, + q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); + d->enabled = true; + d->skip = false; + count++; + } + rrddim_foreach_done(rd); + + if(!count) { + // no data for this chart - d->dict = rd_dfe.dict; - d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); - d->rd = rd; + q->query.execute = false; - ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before); - d->enabled = true; + if(q->query.locked_data_collection) { + netdata_spinlock_unlock(&st->data_collection_lock); + q->query.locked_data_collection = false; } - rrddim_foreach_done(rd); + + } + else { + // we have data for this chart + + q->query.execute = true; + } + + return q; +} + +static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(!rd->exposed) continue; + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", + rrddim_id(rd), + (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, + rd->last_collected_value, + rd->last_calculated_value, + rd->last_stored_value + ); + } + rrddim_foreach_done(rd); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", + (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, + (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec + ); +} + +static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) { + size_t dimensions = q->dimensions; + + if(wb && q->query.enable_streaming) + replication_send_chart_collection_state(wb, q->st); + + if(q->query.locked_data_collection) { + netdata_spinlock_unlock(&q->st->data_collection_lock); + q->query.locked_data_collection = false; + } + + // release all the dictionary items acquired + // finalize the queries + size_t queries = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; + + q->ops->finalize(&d->handle); + + dictionary_acquired_item_release(d->dict, d->rda); + + // update global statistics + queries++; + } + + if(executed) { + netdata_spinlock_lock(&replication_queries.spinlock); + replication_queries.queries_started += queries; + replication_queries.queries_finished += queries; + replication_queries.points_read += q->points_read; + replication_queries.points_generated += q->points_generated; + netdata_spinlock_unlock(&replication_queries.spinlock); } - time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before; + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); + freez(q); +} + +static void replication_query_align_to_optimal_before(struct replication_query *q) { + if(!q->query.execute || q->query.enable_streaming) + return; + + size_t dimensions = q->dimensions; + time_t expanded_before = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled)) continue; + + time_t new_before = q->ops->align_to_optimal_before(&d->handle); + if (!expanded_before || new_before < expanded_before) + expanded_before = new_before; + } + + if(expanded_before > q->query.before && // it is later than the original + (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page) + expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time + expanded_before < q->wall_clock_time) // it is not later than the wall clock time + q->query.before = expanded_before; +} + +static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) { + replication_query_align_to_optimal_before(q); + + time_t after = q->query.after; + time_t before = q->query.before; + size_t dimensions = q->dimensions; + struct storage_engine_query_ops *ops = q->ops; + time_t wall_clock_time = q->wall_clock_time; + + size_t points_read = q->points_read, points_generated = q->points_generated; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + time_t actual_after = 0, actual_before = 0; +#endif + + time_t now = after + 1; + time_t last_end_time_in_buffer = 0; while(now <= before) { - time_t min_start_time = 0, min_end_time = 0; + time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0; for (size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled || d->skip)) continue; // fetch the first valid point for the dimension - int max_skip = 100; - while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) { + int max_skip = 1000; + while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) { d->sp = ops->next_metric(&d->handle); points_read++; } - internal_error(max_skip <= 0, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now); + if(max_skip <= 0) { + d->skip = true; - if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp))) - continue; + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), + (unsigned long long) now); - if(unlikely(!min_start_time)) { - min_start_time = d->sp.start_time; - min_end_time = d->sp.end_time; - } - else { - min_start_time = MIN(min_start_time, d->sp.start_time); - min_end_time = MIN(min_end_time, d->sp.end_time); + continue; } - } - if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)min_start_time, - (unsigned long long)min_end_time, - (unsigned long long)wall_clock_time); - break; - } + if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s)) + // this dimension does not provide any data + continue; - if(unlikely(min_end_time < now)) { -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error(true, - "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); -#endif // NETDATA_LOG_REPLICATION_REQUESTS - break; + time_t update_every = d->sp.end_time_s - d->sp.start_time_s; + if(unlikely(!update_every)) + update_every = q->st->update_every; + + if(unlikely(!min_update_every)) + min_update_every = update_every; + + if(unlikely(!min_start_time)) + min_start_time = d->sp.start_time_s; + + if(unlikely(!min_end_time)) + min_end_time = d->sp.end_time_s; + + min_update_every = MIN(min_update_every, update_every); + max_update_every = MAX(max_update_every, update_every); + + min_start_time = MIN(min_start_time, d->sp.start_time_s); + max_start_time = MAX(max_start_time, d->sp.start_time_s); + + min_end_time = MIN(min_end_time, d->sp.end_time_s); + max_end_time = MAX(max_end_time, d->sp.end_time_s); } - if(unlikely(min_end_time <= min_start_time)) - min_start_time = min_end_time - st->update_every; + if (unlikely(min_update_every != max_update_every || + min_start_time != max_start_time)) { - if(unlikely(!actual_after)) { - actual_after = min_end_time; - actual_before = min_end_time; + time_t fix_min_start_time; + if(last_end_time_in_buffer && + last_end_time_in_buffer >= min_start_time && + last_end_time_in_buffer <= max_start_time) { + fix_min_start_time = last_end_time_in_buffer; + } + else + fix_min_start_time = min_end_time - min_update_every; + + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' " + "misaligned dimensions " + "update every (min: %ld, max: %ld), " + "start time (min: %ld, max: %ld), " + "end time (min %ld, max %ld), " + "now %ld, last end time sent %ld, " + "min start time is fixed to %ld", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), + min_update_every, max_update_every, + min_start_time, max_start_time, + min_end_time, max_end_time, + now, last_end_time_in_buffer, + fix_min_start_time + ); + + min_start_time = fix_min_start_time; } - else + + if(likely(min_start_time <= now && min_end_time >= now)) { + // we have a valid point + + if (unlikely(min_end_time == min_start_time)) + min_start_time = min_end_time - q->st->update_every; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if (unlikely(!actual_after)) + actual_after = min_end_time; + actual_before = min_end_time; +#endif - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n" - , (unsigned long long)min_start_time - , (unsigned long long)min_end_time - , (unsigned long long)wall_clock_time - ); + if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) { + q->query.before = last_end_time_in_buffer; + q->query.enable_streaming = false; - // output the replay values for this time - for (size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. " + "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.", + buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost), + q->request.after, q->request.before, q->request.enable_streaming?"true":"false", + q->query.after, q->query.before, q->query.enable_streaming?"true":"false"); - if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time)) - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", - rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + q->query.interrupted = true; - else - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n", - rrddim_id(d->rd)); + break; + } + last_end_time_in_buffer = min_end_time; - points_generated++; - } + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n", + (unsigned long long) min_start_time, + (unsigned long long) min_end_time, + (unsigned long long) wall_clock_time + ); + + // output the replay values for this time + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; + + if (likely( d->sp.start_time_s <= min_end_time && + d->sp.end_time_s >= min_end_time && + !storage_point_is_unset(d->sp) && + !storage_point_is_gap(d->sp))) { - now = min_end_time + 1; + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", + rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + + points_generated++; + } + } + + now = min_end_time + 1; + } + else if(unlikely(min_end_time < now)) + // the query does not progress + break; + else + // we have gap - all points are in the future + now = min_start_time; } #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -202,110 +473,89 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS - // release all the dictionary items acquired - // finalize the queries - size_t queries = 0; - for(size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + q->points_read = points_read; + q->points_generated = points_generated; - ops->finalize(&d->handle); + bool finished_with_gap = false; + if(last_end_time_in_buffer < before - q->st->update_every) + finished_with_gap = true; - dictionary_acquired_item_release(d->dict, d->rda); + return finished_with_gap; +} - // update global statistics - queries++; - } +static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) { + time_t wall_clock_time = now_realtime_sec(); - netdata_spinlock_lock(&replication_queries.spinlock); - replication_queries.queries_started += queries; - replication_queries.queries_finished += queries; - replication_queries.points_read += points_read; - replication_queries.points_generated += points_generated; - netdata_spinlock_unlock(&replication_queries.spinlock); + if(requested_after > requested_before) { + // flip them + time_t t = requested_before; + requested_before = requested_after; + requested_after = t; + } - return before; -} + if(requested_after > wall_clock_time) { + requested_after = 0; + requested_before = 0; + requested_enable_streaming = true; + } -static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) { - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(!rd->exposed) continue; - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n", - rrddim_id(rd), - (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec, - rd->last_collected_value, - rd->last_calculated_value, - rd->last_stored_value - ); + if(requested_before > wall_clock_time) { + requested_before = wall_clock_time; + requested_enable_streaming = true; } - rrddim_foreach_done(rd); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n", - (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec, - (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec - ); -} + time_t query_after = requested_after; + time_t query_before = requested_before; + bool query_enable_streaming = requested_enable_streaming; -bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) { - time_t query_after = after; - time_t query_before = before; - time_t now = now_realtime_sec(); - time_t tolerance = 2; // sometimes from the time we get this value, to the time we check, - // a data collection has been made - // so, we give this tolerance to detect invalid timestamps - - // find the first entry we have - time_t first_entry_local = rrdset_first_entry_t(st); - if(first_entry_local > now + tolerance) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)first_entry_local, (unsigned long long)now); - first_entry_local = now; + time_t db_first_entry = 0, db_last_entry = 0; + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); + + if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) { + // no data requested - just enable streaming + ; } + else { + if (query_after < db_first_entry) + query_after = db_first_entry; - if (query_after < first_entry_local) - query_after = first_entry_local; + if (query_before > db_last_entry) + query_before = db_last_entry; - // find the latest entry we have - time_t last_entry_local = st->last_updated.tv_sec; - if(!last_entry_local) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - last_entry_local = rrdset_last_entry_t(st); - if(!last_entry_local) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - last_entry_local = now; + // if the parent asked us to start streaming, then fill the rest with the data that we have + if (requested_enable_streaming) + query_before = db_last_entry; + + if (query_after > query_before) { + time_t tmp = query_before; + query_before = query_after; + query_after = tmp; } - } - if(last_entry_local > now + tolerance) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)last_entry_local, (unsigned long long)now); - last_entry_local = now; + query_enable_streaming = (requested_enable_streaming || + query_before == db_last_entry || + !requested_after || + !requested_before) ? true : false; } - if (query_before > last_entry_local) - query_before = last_entry_local; + return replication_query_prepare( + st, + db_first_entry, db_last_entry, + requested_after, requested_before, requested_enable_streaming, + query_after, query_before, query_enable_streaming, + wall_clock_time); +} - // if the parent asked us to start streaming, then fill the rest with the data that we have - if (start_streaming) - query_before = last_entry_local; +void replication_response_cancel_and_finalize(struct replication_query *q) { + replication_query_finalize(NULL, q, false); +} - if (query_after > query_before) { - time_t tmp = query_before; - query_before = query_after; - query_after = tmp; - } +static bool sender_is_still_connected_for_this_request(struct replication_request *rq); - bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false; +bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) { + struct replication_request *rq = q->rq; + RRDSET *st = q->st; + RRDHOST *host = st->rrdhost; // we might want to optimize this by filling a temporary buffer // and copying the result to the host's buffer in order to avoid @@ -314,25 +564,24 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - if(after != 0 && before != 0) - before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now); - else { - after = 0; - before = 0; - enable_streaming = true; - } + bool locked_data_collection = q->query.locked_data_collection; + q->query.locked_data_collection = false; - // get again the world clock time - time_t world_clock_time = now_realtime_sec(); - if(enable_streaming) { - if(now < world_clock_time) { - // we needed time to execute this request - // so, the parent will need to replicate more data - enable_streaming = false; - } - else - replicate_chart_collection_state(wb, st); - } + bool finished_with_gap = false; + if(q->query.execute) + finished_with_gap = replication_query_execute(wb, q, max_msg_size); + + time_t after = q->request.after; + time_t before = q->query.before; + bool enable_streaming = q->query.enable_streaming; + + replication_query_finalize(wb, q, q->query.execute); + q = NULL; // IMPORTANT: q is invalid now + + // get a fresh retention to send to the parent + time_t wall_clock_time = now_realtime_sec(); + time_t db_first_entry, db_last_entry; + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); // end with first/last entries we have, and the first start time and // last end time of the data we sent @@ -342,7 +591,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t (int)st->update_every // child first db time, child end db time - , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local + , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry // start streaming boolean , enable_streaming ? "true" : "false" @@ -351,13 +600,40 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t , (unsigned long long)after, (unsigned long long)before // child world clock time - , (unsigned long long)world_clock_time + , (unsigned long long)wall_clock_time ); worker_is_busy(WORKER_JOB_BUFFER_COMMIT); sender_commit(host->sender, wb); worker_is_busy(WORKER_JOB_CLEANUP); + if(enable_streaming) { + if(sender_is_still_connected_for_this_request(rq)) { + // enable normal streaming if we have to + // but only if the sender buffer has not been flushed since we started + + if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + + if(!finished_with_gap) + st->upstream_resync_time_s = 0; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif + } + else + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + } + } + + if(locked_data_collection) + netdata_spinlock_unlock(&st->data_collection_lock); + return enable_streaming; } @@ -376,14 +652,14 @@ struct replication_request_details { struct { time_t first_entry_t; // the first entry time the child has time_t last_entry_t; // the last entry time the child has - time_t world_time_t; // the current time of the child + time_t wall_clock_time; // the current time of the child + bool fixed_last_entry; // when set we set the last entry to wall clock time } child_db; struct { time_t first_entry_t; // the first entry time we have time_t last_entry_t; // the last entry time we have - bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed - time_t now; // the current local world clock time + time_t wall_clock_time; // the current local world clock time } local_db; struct { @@ -403,9 +679,36 @@ struct replication_request_details { } wanted; }; -static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) { +static void replicate_log_request(struct replication_request_details *r, const char *msg) { +#ifdef NETDATA_INTERNAL_CHECKS + internal_error(true, +#else + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, +#endif + "REPLAY ERROR: 'host:%s/chart:%s' child sent: " + "db from %ld to %ld%s, wall clock time %ld, " + "last request from %ld to %ld, " + "issue: %s - " + "sending replication request from %ld to %ld, start streaming %s", + rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st), + r->child_db.first_entry_t, + r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "", + r->child_db.wall_clock_time, + r->last_request.after, + r->last_request.before, + msg, + r->wanted.after, + r->wanted.before, + r->wanted.start_streaming ? "true" : "false"); +} + +static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) { RRDSET *st = r->st; + if(log) + replicate_log_request(r, msg); + if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t)) st->rrdhost->receiver->replication_first_time_t = r->wanted.after; @@ -422,7 +725,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c internal_error(true, "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: " - "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s" + "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s" , rrdhost_hostname(r->host), rrdset_id(r->st) , r->wanted.after, wanted_after_buf , r->wanted.before, wanted_before_buf @@ -432,7 +735,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c , r->child_db.first_entry_t, r->child_db.last_entry_t , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD" , r->local_db.first_entry_t, r->local_db.last_entry_t - , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now + , r->local_db.now , r->gap.from, r->gap.to , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL" , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : "" @@ -459,7 +762,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c } bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st, - time_t first_entry_child, time_t last_entry_child, time_t child_world_time, + time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time, time_t prev_first_entry_wanted, time_t prev_last_entry_wanted) { struct replication_request_details r = { @@ -472,16 +775,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST .st = st, .child_db = { - .first_entry_t = first_entry_child, - .last_entry_t = last_entry_child, - .world_time_t = child_world_time, + .first_entry_t = child_first_entry, + .last_entry_t = child_last_entry, + .wall_clock_time = child_wall_clock_time, + .fixed_last_entry = false, }, .local_db = { - .first_entry_t = rrdset_first_entry_t(st), - .last_entry_t = rrdset_last_entry_t(st), - .last_entry_t_adjusted_to_now = false, - .now = now_realtime_sec(), + .first_entry_t = 0, + .last_entry_t = 0, + .wall_clock_time = now_realtime_sec(), }, .last_request = { @@ -496,12 +799,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST }, }; - // check our local database retention - if(r.local_db.last_entry_t > r.local_db.now) { - r.local_db.last_entry_t = r.local_db.now; - r.local_db.last_entry_t_adjusted_to_now = true; + if(r.child_db.last_entry_t > r.child_db.wall_clock_time) { + replicate_log_request(&r, "child's db last entry > child's wall clock time"); + r.child_db.last_entry_t = r.child_db.wall_clock_time; + r.child_db.fixed_last_entry = true; } + rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0); + // let's find the GAP we have if(!r.last_request.after || !r.last_request.before) { // there is no previous request @@ -511,7 +816,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST r.gap.from = r.local_db.last_entry_t; else // we don't have any data, the gap is the max timeframe we are allowed to replicate - r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate; + r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate; } else { @@ -522,27 +827,30 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST } // we want all the data up to now - r.gap.to = r.local_db.now; + r.gap.to = r.local_db.wall_clock_time; // The gap is now r.gap.from -> r.gap.to if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) - return send_replay_chart_cmd(&r, "empty replication request, replication is disabled"); - - if (unlikely(!r.child_db.last_entry_t)) - return send_replay_chart_cmd(&r, "empty replication request, child has no stored data"); + return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false); if (unlikely(!rrdset_number_of_dimensions(st))) - return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions"); + return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false); + + if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false); - if (r.child_db.first_entry_t <= 0) - return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid"); + if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0)) + return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true); - if (r.child_db.first_entry_t > r.child_db.last_entry_t) - return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)"); + if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time)) + return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true); - if (r.local_db.last_entry_t > r.child_db.last_entry_t) - return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one"); + if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true); + + if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t)) + return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false); // let's find what the child can provide to fill that gap @@ -564,15 +872,22 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST if(r.wanted.before > r.child_db.last_entry_t) r.wanted.before = r.child_db.last_entry_t; - if(r.wanted.after > r.wanted.before) - r.wanted.after = r.wanted.before; + if(r.wanted.after > r.wanted.before) { + r.wanted.after = 0; + r.wanted.before = 0; + r.wanted.start_streaming = true; + return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true); + } // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child - r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t); + r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step || + r.wanted.before >= r.child_db.last_entry_t || + r.wanted.before >= r.child_db.wall_clock_time || + r.wanted.before >= r.local_db.wall_clock_time); // the wanted timeframe is now r.wanted.after -> r.wanted.before // send it - return send_replay_chart_cmd(&r, "OK"); + return send_replay_chart_cmd(&r, "OK", false); } // ---------------------------------------------------------------------------- @@ -585,13 +900,20 @@ struct replication_request { STRING *chart_id; // the chart of the request time_t after; // the start time of the query (maybe zero) key for sorting (JudyL) time_t before; // the end time of the query (maybe zero) - bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request Word_t unique_id; // auto-increment, later requests have bigger - bool found; // used as a result boolean for the find call + + bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming bool indexed_in_judy; // true when the request is indexed in judy bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full + bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing + + // prepare ahead members - preprocessing + bool found; // used as a result boolean for the find call + bool executed; // used to detect if we have skipped requests while preprocessing + RRDSET *st; // caching of the chart during preprocessing + struct replication_query *q; // the preprocessing query initialization }; // replication sort entry in JudyL array @@ -631,6 +953,7 @@ static struct replication_thread { struct { size_t executed; // the number of replication requests executed size_t latest_first_time; // the 'after' timestamp of the last request we executed + size_t memory; // the total memory allocated by replication } atomic; // access should be with atomic operations struct { @@ -663,6 +986,7 @@ static struct replication_thread { .atomic = { .executed = 0, .latest_first_time = 0, + .memory = 0, }, .main_thread = { .last_executed = 0, @@ -671,6 +995,10 @@ static struct replication_thread { }, }; +size_t replication_allocated_memory(void) { + return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED); +} + #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) @@ -723,6 +1051,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc fatal_when_replication_is_not_locked_for_me(); struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); + __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); @@ -734,11 +1063,13 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc rq->unique_id = rse->unique_id; rq->indexed_in_judy = false; rq->not_indexed_buffer_full = false; + rq->not_indexed_preprocessing = false; return rse; } static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { freez(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); } static void replication_sort_entry_add(struct replication_request *rq) { @@ -747,6 +1078,7 @@ static void replication_sort_entry_add(struct replication_request *rq) { if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; + rq->not_indexed_preprocessing = false; replication_globals.unsafe.pending_no_room++; replication_recursive_unlock(); return; @@ -771,23 +1103,33 @@ static void replication_sort_entry_add(struct replication_request *rq) { Pvoid_t *inner_judy_ptr; // find the outer judy entry, using after as key - inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); - if(!inner_judy_ptr) - inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR)) + fatal("REPLICATION: corrupted outer judyL"); // add it to the inner judy, using unique_id as key + size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr); Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr); + if(unlikely(!item || item == PJERR)) + fatal("REPLICATION: corrupted inner judyL"); + *item = rse; rq->indexed_in_judy = true; rq->not_indexed_buffer_full = false; + rq->not_indexed_preprocessing = false; if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); + + __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED); } -static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { +static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) { fatal_when_replication_is_not_locked_for_me(); bool inner_judy_deleted = false; @@ -798,19 +1140,30 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); rse->rq->indexed_in_judy = false; + rse->rq->not_indexed_preprocessing = preprocessing; + + size_t memory_saved = 0; // delete it from the inner judy + size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0); + size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr); + memory_saved = mem_before_inner_judyl - mem_after_inner_judyl; // if no items left, delete it from the outer judy if(**inner_judy_ppptr == NULL) { + size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); + size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array); + memory_saved += mem_before_outer_judyl - mem_after_outer_judyl; inner_judy_deleted = true; } // free memory replication_sort_entry_destroy(rse); + __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED); + return inner_judy_deleted; } @@ -826,7 +1179,7 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); if (our_item_pptr) { rse_to_delete = *our_item_pptr; - replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr); + replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false); if(buffer_full) { replication_globals.unsafe.pending_no_room++; @@ -844,13 +1197,6 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff replication_recursive_unlock(); } -static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) { - if(unlikely(first)) - return JudyLFirst(PArray, PIndex, PJE0); - - return JudyLNext(PArray, PIndex, PJE0); -} - static struct replication_request replication_request_get_first_available() { Pvoid_t *inner_judy_pptr; @@ -881,7 +1227,7 @@ static struct replication_request replication_request_get_first_available() { } bool find_same_after = true; - while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) { + while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) { Pvoid_t *our_item_pptr; if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after)) @@ -898,14 +1244,11 @@ static struct replication_request replication_request_get_first_available() { // set the return result to found rq_to_return.found = true; - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true)) // we removed the item from the outer JudyL break; } - // call JudyLNext from now on - find_same_after = false; - // prepare for the next iteration on the outer loop replication_globals.unsafe.queue.unique_id = 0; } @@ -945,7 +1288,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ replication_recursive_lock(); - if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) { // we can replace this command internal_error( true, @@ -958,7 +1301,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ rq->before = rq_new->before; rq->start_streaming = rq_new->start_streaming; } - else if(!rq->indexed_in_judy) { + else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) { replication_sort_entry_add(rq); internal_error( true, @@ -1001,55 +1344,57 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma string_freez(rq->chart_id); } +static bool sender_is_still_connected_for_this_request(struct replication_request *rq) { + return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender); +}; + static bool replication_execute_request(struct replication_request *rq, bool workers) { bool ret = false; - if(likely(workers)) - worker_is_busy(WORKER_JOB_FIND_CHART); + if(!rq->st) { + if(likely(workers)) + worker_is_busy(WORKER_JOB_FIND_CHART); + + rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + } - RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); - if(!st) { + if(!rq->st) { internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found", rrdhost_hostname(rq->sender->host), string2str(rq->chart_id)); goto cleanup; } - if(likely(workers)) - worker_is_busy(WORKER_JOB_QUERYING); - netdata_thread_disable_cancelability(); - // send the replication data - bool start_streaming = replicate_chart_response( - st->rrdhost, st, rq->start_streaming, rq->after, rq->before); + if(!rq->q) { + if(likely(workers)) + worker_is_busy(WORKER_JOB_PREPARE_QUERY); - netdata_thread_enable_cancelability(); + rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before); + } - if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) { - // enable normal streaming if we have to - // but only if the sender buffer has not been flushed since we started + if(likely(workers)) + worker_is_busy(WORKER_JOB_QUERYING); - if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { - rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); - rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); - rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + // send the replication data + rq->q->rq = rq; + replication_response_execute_and_finalize( + rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL)); -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); -#endif - } - else - internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", - rrdhost_hostname(st->rrdhost), string2str(rq->chart_id)); - } + rq->q = NULL; + netdata_thread_enable_cancelability(); __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); ret = true; cleanup: + if(rq->q) { + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + } + string_freez(rq->chart_id); worker_is_idle(); return ret; @@ -1068,6 +1413,7 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), .indexed_in_judy = false, .not_indexed_buffer_full = false, + .not_indexed_preprocessing = false, }; if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) @@ -1079,13 +1425,13 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, void replication_sender_delete_pending_requests(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks - replication_recursive_lock(); dictionary_flush(sender->replication.requests); - replication_recursive_unlock(); } void replication_init_sender(struct sender_state *sender) { - sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct replication_request)); + dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender); dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender); dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender); @@ -1107,9 +1453,8 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { struct replication_request *rq; dfe_start_read(s->replication.requests, rq) { - if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) { + if(rq->indexed_in_judy) replication_sort_entry_del(rq, true); - } } dfe_done(rq); @@ -1122,9 +1467,8 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { struct replication_request *rq; dfe_start_read(s->replication.requests, rq) { - if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing)) replication_sort_entry_add(rq); - } } dfe_done(rq); @@ -1214,6 +1558,7 @@ static void replication_initialize_workers(bool master) { worker_register_job_name(WORKER_JOB_QUERYING, "querying"); worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); + worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query"); worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); @@ -1235,24 +1580,137 @@ static void replication_initialize_workers(bool master) { #define REQUEST_QUEUE_EMPTY (-1) #define REQUEST_CHART_NOT_FOUND (-2) -static int replication_execute_next_pending_request(void) { - worker_is_busy(WORKER_JOB_FIND_NEXT); - struct replication_request rq = replication_request_get_first_available(); +static int replication_execute_next_pending_request(bool cancel) { + static __thread int max_requests_ahead = 0; + static __thread struct replication_request *rqs = NULL; + static __thread int rqs_last_executed = 0, rqs_last_prepared = 0; + static __thread size_t queue_rounds = 0; (void)queue_rounds; + struct replication_request *rq; + + if(unlikely(cancel)) { + if(rqs) { + size_t cancelled = 0; + do { + if (++rqs_last_executed >= max_requests_ahead) + rqs_last_executed = 0; + + rq = &rqs[rqs_last_executed]; + + if (rq->q) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq"); + + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + cancelled++; + } + + rq->executed = true; + rq->found = false; + + } while (rqs_last_executed != rqs_last_prepared); + + internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled); + } + return REQUEST_QUEUE_EMPTY; + } + + if(unlikely(!rqs)) { + max_requests_ahead = get_netdata_cpus() / 2; + + if(max_requests_ahead > libuv_worker_threads * 2) + max_requests_ahead = libuv_worker_threads * 2; + + if(max_requests_ahead < 2) + max_requests_ahead = 2; + + rqs = callocz(max_requests_ahead, sizeof(struct replication_request)); + __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED); + } + + // fill the queue + do { + if(++rqs_last_prepared >= max_requests_ahead) { + rqs_last_prepared = 0; + queue_rounds++; + } + + internal_fatal(rqs[rqs_last_prepared].q, + "REPLAY FATAL: slot is used by query that has not been executed!"); + + worker_is_busy(WORKER_JOB_FIND_NEXT); + rqs[rqs_last_prepared] = replication_request_get_first_available(); + rq = &rqs[rqs_last_prepared]; + + if(rq->found) { + if (!rq->st) { + worker_is_busy(WORKER_JOB_FIND_CHART); + rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + } + + if (rq->st && !rq->q) { + worker_is_busy(WORKER_JOB_PREPARE_QUERY); + rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before); + } - if(unlikely(!rq.found)) { + rq->executed = false; + } + + } while(rq->found && rqs_last_prepared != rqs_last_executed); + + // pick the first usable + do { + if (++rqs_last_executed >= max_requests_ahead) + rqs_last_executed = 0; + + rq = &rqs[rqs_last_executed]; + + if(rq->found) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + + if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) { + // the sender has reconnected since this request was queued, + // we can safely throw it away, since the parent will resend it + replication_response_cancel_and_finalize(rq->q); + rq->executed = true; + rq->found = false; + rq->q = NULL; + } + else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) { + // the sender buffer is full, so we can ignore this request, + // it has already been marked as 'preprocessed' in the dictionary, + // and the sender will put it back in when there is + // enough room in the buffer for processing replication requests + replication_response_cancel_and_finalize(rq->q); + rq->executed = true; + rq->found = false; + rq->q = NULL; + } + else { + // we can execute this, + // delete it from the dictionary + worker_is_busy(WORKER_JOB_DELETE_ENTRY); + dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id)); + } + } + else + internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!"); + + } while(!rq->found && rqs_last_executed != rqs_last_prepared); + + if(unlikely(!rq->found)) { worker_is_idle(); return REQUEST_QUEUE_EMPTY; } - // delete the request from the dictionary - worker_is_busy(WORKER_JOB_DELETE_ENTRY); - if(!dictionary_del(rq.sender->replication.requests, string2str(rq.chart_id))) - error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index", - rrdhost_hostname(rq.sender->host), string2str(rq.chart_id)); + replication_set_latest_first_time(rq->after); - replication_set_latest_first_time(rq.after); + bool chart_found = replication_execute_request(rq, true); + rq->executed = true; + rq->found = false; + rq->q = NULL; - if(unlikely(!replication_execute_request(&rq, true))) { + if(unlikely(!chart_found)) { worker_is_idle(); return REQUEST_CHART_NOT_FOUND; } @@ -1262,6 +1720,7 @@ static int replication_execute_next_pending_request(void) { } static void replication_worker_cleanup(void *ptr __maybe_unused) { + replication_execute_next_pending_request(true); worker_unregister(); } @@ -1270,8 +1729,9 @@ static void *replication_worker_thread(void *ptr) { netdata_thread_cleanup_push(replication_worker_cleanup, ptr); - while(!netdata_exit) { - if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + while(service_running(SERVICE_REPLICATION)) { + if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { + sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); sleep_usec(1 * USEC_PER_SEC); @@ -1286,13 +1746,17 @@ static void replication_main_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + replication_execute_next_pending_request(true); + int threads = (int)replication_globals.main_thread.threads; for(int i = 0; i < threads ;i++) { netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL); freez(replication_globals.main_thread.threads_ptrs[i]); + __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); } freez(replication_globals.main_thread.threads_ptrs); replication_globals.main_thread.threads_ptrs = NULL; + __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); // custom code worker_unregister(); @@ -1312,10 +1776,14 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(--threads) { replication_globals.main_thread.threads = threads; replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *)); + __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); for(int i = 0; i < threads ;i++) { + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2); replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t)); - netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], "REPLICATION", + __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED); + netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag, NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); } } @@ -1333,7 +1801,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { size_t last_executed = 0; size_t last_sender_resets = 0; - while(!netdata_exit) { + while(service_running(SERVICE_REPLICATION)) { // statistics usec_t now_mono_ut = now_monotonic_usec(); @@ -1395,7 +1863,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_is_idle(); } - if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { worker_is_busy(WORKER_JOB_WAIT); replication_recursive_lock(); @@ -1403,14 +1871,16 @@ void *replication_thread_main(void *ptr __maybe_unused) { // the timeout also defines now frequently we will traverse all the pending requests // when the outbound buffers of all senders is full usec_t timeout; - if(slow) + if(slow) { // no work to be done, wait for a request to come in timeout = 1000 * USEC_PER_MS; + sender_thread_buffer_free(); + } else if(replication_globals.unsafe.pending > 0) { - if(replication_globals.unsafe.sender_resets == last_sender_resets) { + if(replication_globals.unsafe.sender_resets == last_sender_resets) timeout = 1000 * USEC_PER_MS; - } + else { // there are pending requests waiting to be executed, // but none could be executed at this time. |