summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/replication.c1104
1 files changed, 787 insertions, 317 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index d659d701..7c1f16b4 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -3,28 +3,30 @@
#include "replication.h"
#include "Judy.h"
-#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
-#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
-#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
+#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
+#define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
+#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
#define WORKER_JOB_FIND_NEXT 1
#define WORKER_JOB_QUERYING 2
#define WORKER_JOB_DELETE_ENTRY 3
#define WORKER_JOB_FIND_CHART 4
-#define WORKER_JOB_CHECK_CONSISTENCY 5
-#define WORKER_JOB_BUFFER_COMMIT 6
-#define WORKER_JOB_CLEANUP 7
-#define WORKER_JOB_WAIT 8
+#define WORKER_JOB_PREPARE_QUERY 5
+#define WORKER_JOB_CHECK_CONSISTENCY 6
+#define WORKER_JOB_BUFFER_COMMIT 7
+#define WORKER_JOB_CLEANUP 8
+#define WORKER_JOB_WAIT 9
// master thread worker jobs
-#define WORKER_JOB_STATISTICS 9
-#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11
-#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12
-#define WORKER_JOB_CUSTOM_METRIC_ADDED 13
-#define WORKER_JOB_CUSTOM_METRIC_DONE 14
-#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15
-#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16
+#define WORKER_JOB_STATISTICS 10
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 14
+#define WORKER_JOB_CUSTOM_METRIC_DONE 15
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
#define SECONDS_TO_RESET_POINT_IN_TIME 10
@@ -44,6 +46,12 @@ struct replication_query_statistics replication_get_query_statistics(void) {
return ret;
}
+size_t replication_buffers_allocated = 0;
+
+size_t replication_allocated_buffers(void) {
+ return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
+}
+
// ----------------------------------------------------------------------------
// sending replication replies
@@ -51,137 +59,400 @@ struct replication_dimension {
STORAGE_POINT sp;
struct storage_engine_query_handle handle;
bool enabled;
+ bool skip;
DICTIONARY *dict;
const DICTIONARY_ITEM *rda;
RRDDIM *rd;
};
-static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
+struct replication_query {
+ RRDSET *st;
+
+ struct {
+ time_t first_entry_t;
+ time_t last_entry_t;
+ } db;
+
+ struct { // what the parent requested
+ time_t after;
+ time_t before;
+ bool enable_streaming;
+ } request;
+
+ struct { // what the child will do
+ time_t after;
+ time_t before;
+ bool enable_streaming;
+
+ bool locked_data_collection;
+ bool execute;
+ bool interrupted;
+ } query;
+
+ time_t wall_clock_time;
+
+ size_t points_read;
+ size_t points_generated;
+
+ struct storage_engine_query_ops *ops;
+ struct replication_request *rq;
+
+ size_t dimensions;
+ struct replication_dimension data[];
+};
+
+static struct replication_query *replication_query_prepare(
+ RRDSET *st,
+ time_t db_first_entry,
+ time_t db_last_entry,
+ time_t requested_after,
+ time_t requested_before,
+ bool requested_enable_streaming,
+ time_t query_after,
+ time_t query_before,
+ bool query_enable_streaming,
+ time_t wall_clock_time
+) {
size_t dimensions = rrdset_number_of_dimensions(st);
- size_t points_read = 0, points_generated = 0;
+ struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
+ __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
- struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
- struct replication_dimension data[dimensions];
- memset(data, 0, sizeof(data));
+ q->dimensions = dimensions;
+ q->st = st;
- if(enable_streaming && st->last_updated.tv_sec > before) {
- internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)before,
- (unsigned long long)st->last_updated.tv_sec
- );
- before = st->last_updated.tv_sec;
+ q->db.first_entry_t = db_first_entry;
+ q->db.last_entry_t = db_last_entry;
+
+ q->request.after = requested_after,
+ q->request.before = requested_before,
+ q->request.enable_streaming = requested_enable_streaming,
+
+ q->query.after = query_after;
+ q->query.before = query_before;
+ q->query.enable_streaming = query_enable_streaming;
+
+ q->wall_clock_time = wall_clock_time;
+
+ if (!q->dimensions || !q->query.after || !q->query.before) {
+ q->query.execute = false;
+ q->dimensions = 0;
+ return q;
+ }
+
+ if(q->query.enable_streaming) {
+ netdata_spinlock_lock(&st->data_collection_lock);
+ q->query.locked_data_collection = true;
+
+ if (st->last_updated.tv_sec > q->query.before) {
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
+ "has start_streaming = true, "
+ "adjusting replication before timestamp from %llu to %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long) q->query.before,
+ (unsigned long long) st->last_updated.tv_sec
+ );
+#endif
+ q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
+ }
}
+ q->ops = &st->rrdhost->db[0].eng->api.query_ops;
+
// prepare our array of dimensions
- {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(unlikely(!rd || !rd_dfe.item || !rd->exposed))
- continue;
+ size_t count = 0;
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ continue;
- if (unlikely(rd_dfe.counter >= dimensions)) {
- internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- break;
- }
+ if (unlikely(rd_dfe.counter >= q->dimensions)) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ break;
+ }
+
+ struct replication_dimension *d = &q->data[rd_dfe.counter];
+
+ d->dict = rd_dfe.dict;
+ d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ d->rd = rd;
- struct replication_dimension *d = &data[rd_dfe.counter];
+ q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
+ q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
+ d->enabled = true;
+ d->skip = false;
+ count++;
+ }
+ rrddim_foreach_done(rd);
+
+ if(!count) {
+ // no data for this chart
- d->dict = rd_dfe.dict;
- d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
- d->rd = rd;
+ q->query.execute = false;
- ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before);
- d->enabled = true;
+ if(q->query.locked_data_collection) {
+ netdata_spinlock_unlock(&st->data_collection_lock);
+ q->query.locked_data_collection = false;
}
- rrddim_foreach_done(rd);
+
+ }
+ else {
+ // we have data for this chart
+
+ q->query.execute = true;
+ }
+
+ return q;
+}
+
+static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(!rd->exposed) continue;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
+ rrddim_id(rd),
+ (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
+ rd->last_collected_value,
+ rd->last_calculated_value,
+ rd->last_stored_value
+ );
+ }
+ rrddim_foreach_done(rd);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
+ (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
+ (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
+ );
+}
+
+static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
+ size_t dimensions = q->dimensions;
+
+ if(wb && q->query.enable_streaming)
+ replication_send_chart_collection_state(wb, q->st);
+
+ if(q->query.locked_data_collection) {
+ netdata_spinlock_unlock(&q->st->data_collection_lock);
+ q->query.locked_data_collection = false;
+ }
+
+ // release all the dictionary items acquired
+ // finalize the queries
+ size_t queries = 0;
+
+ for (size_t i = 0; i < dimensions; i++) {
+ struct replication_dimension *d = &q->data[i];
+ if (unlikely(!d->enabled)) continue;
+
+ q->ops->finalize(&d->handle);
+
+ dictionary_acquired_item_release(d->dict, d->rda);
+
+ // update global statistics
+ queries++;
+ }
+
+ if(executed) {
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ replication_queries.queries_started += queries;
+ replication_queries.queries_finished += queries;
+ replication_queries.points_read += q->points_read;
+ replication_queries.points_generated += q->points_generated;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
}
- time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
+ __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
+ freez(q);
+}
+
+static void replication_query_align_to_optimal_before(struct replication_query *q) {
+ if(!q->query.execute || q->query.enable_streaming)
+ return;
+
+ size_t dimensions = q->dimensions;
+ time_t expanded_before = 0;
+
+ for (size_t i = 0; i < dimensions; i++) {
+ struct replication_dimension *d = &q->data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ time_t new_before = q->ops->align_to_optimal_before(&d->handle);
+ if (!expanded_before || new_before < expanded_before)
+ expanded_before = new_before;
+ }
+
+ if(expanded_before > q->query.before && // it is later than the original
+ (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
+ expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
+ expanded_before < q->wall_clock_time) // it is not later than the wall clock time
+ q->query.before = expanded_before;
+}
+
+static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
+ replication_query_align_to_optimal_before(q);
+
+ time_t after = q->query.after;
+ time_t before = q->query.before;
+ size_t dimensions = q->dimensions;
+ struct storage_engine_query_ops *ops = q->ops;
+ time_t wall_clock_time = q->wall_clock_time;
+
+ size_t points_read = q->points_read, points_generated = q->points_generated;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ time_t actual_after = 0, actual_before = 0;
+#endif
+
+ time_t now = after + 1;
+ time_t last_end_time_in_buffer = 0;
while(now <= before) {
- time_t min_start_time = 0, min_end_time = 0;
+ time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
for (size_t i = 0; i < dimensions ;i++) {
- struct replication_dimension *d = &data[i];
- if(unlikely(!d->enabled)) continue;
+ struct replication_dimension *d = &q->data[i];
+ if(unlikely(!d->enabled || d->skip)) continue;
// fetch the first valid point for the dimension
- int max_skip = 100;
- while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) {
+ int max_skip = 1000;
+ while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
d->sp = ops->next_metric(&d->handle);
points_read++;
}
- internal_error(max_skip <= 0,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now);
+ if(max_skip <= 0) {
+ d->skip = true;
- if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp)))
- continue;
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
+ (unsigned long long) now);
- if(unlikely(!min_start_time)) {
- min_start_time = d->sp.start_time;
- min_end_time = d->sp.end_time;
- }
- else {
- min_start_time = MIN(min_start_time, d->sp.start_time);
- min_end_time = MIN(min_end_time, d->sp.end_time);
+ continue;
}
- }
- if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)min_start_time,
- (unsigned long long)min_end_time,
- (unsigned long long)wall_clock_time);
- break;
- }
+ if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
+ // this dimension does not provide any data
+ continue;
- if(unlikely(min_end_time < now)) {
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- internal_error(true,
- "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
-#endif // NETDATA_LOG_REPLICATION_REQUESTS
- break;
+ time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
+ if(unlikely(!update_every))
+ update_every = q->st->update_every;
+
+ if(unlikely(!min_update_every))
+ min_update_every = update_every;
+
+ if(unlikely(!min_start_time))
+ min_start_time = d->sp.start_time_s;
+
+ if(unlikely(!min_end_time))
+ min_end_time = d->sp.end_time_s;
+
+ min_update_every = MIN(min_update_every, update_every);
+ max_update_every = MAX(max_update_every, update_every);
+
+ min_start_time = MIN(min_start_time, d->sp.start_time_s);
+ max_start_time = MAX(max_start_time, d->sp.start_time_s);
+
+ min_end_time = MIN(min_end_time, d->sp.end_time_s);
+ max_end_time = MAX(max_end_time, d->sp.end_time_s);
}
- if(unlikely(min_end_time <= min_start_time))
- min_start_time = min_end_time - st->update_every;
+ if (unlikely(min_update_every != max_update_every ||
+ min_start_time != max_start_time)) {
- if(unlikely(!actual_after)) {
- actual_after = min_end_time;
- actual_before = min_end_time;
+ time_t fix_min_start_time;
+ if(last_end_time_in_buffer &&
+ last_end_time_in_buffer >= min_start_time &&
+ last_end_time_in_buffer <= max_start_time) {
+ fix_min_start_time = last_end_time_in_buffer;
+ }
+ else
+ fix_min_start_time = min_end_time - min_update_every;
+
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
+ "misaligned dimensions "
+ "update every (min: %ld, max: %ld), "
+ "start time (min: %ld, max: %ld), "
+ "end time (min %ld, max %ld), "
+ "now %ld, last end time sent %ld, "
+ "min start time is fixed to %ld",
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
+ min_update_every, max_update_every,
+ min_start_time, max_start_time,
+ min_end_time, max_end_time,
+ now, last_end_time_in_buffer,
+ fix_min_start_time
+ );
+
+ min_start_time = fix_min_start_time;
}
- else
+
+ if(likely(min_start_time <= now && min_end_time >= now)) {
+ // we have a valid point
+
+ if (unlikely(min_end_time == min_start_time))
+ min_start_time = min_end_time - q->st->update_every;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if (unlikely(!actual_after))
+ actual_after = min_end_time;
+
actual_before = min_end_time;
+#endif
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n"
- , (unsigned long long)min_start_time
- , (unsigned long long)min_end_time
- , (unsigned long long)wall_clock_time
- );
+ if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
+ q->query.before = last_end_time_in_buffer;
+ q->query.enable_streaming = false;
- // output the replay values for this time
- for (size_t i = 0; i < dimensions ;i++) {
- struct replication_dimension *d = &data[i];
- if(unlikely(!d->enabled)) continue;
+ internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. "
+ "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
+ buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
+ q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
+ q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
- if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time))
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
- rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+ q->query.interrupted = true;
- else
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
- rrddim_id(d->rd));
+ break;
+ }
+ last_end_time_in_buffer = min_end_time;
- points_generated++;
- }
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
+ (unsigned long long) min_start_time,
+ (unsigned long long) min_end_time,
+ (unsigned long long) wall_clock_time
+ );
+
+ // output the replay values for this time
+ for (size_t i = 0; i < dimensions; i++) {
+ struct replication_dimension *d = &q->data[i];
+ if (unlikely(!d->enabled)) continue;
+
+ if (likely( d->sp.start_time_s <= min_end_time &&
+ d->sp.end_time_s >= min_end_time &&
+ !storage_point_is_unset(d->sp) &&
+ !storage_point_is_gap(d->sp))) {
- now = min_end_time + 1;
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
+ rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+
+ points_generated++;
+ }
+ }
+
+ now = min_end_time + 1;
+ }
+ else if(unlikely(min_end_time < now))
+ // the query does not progress
+ break;
+ else
+ // we have gap - all points are in the future
+ now = min_start_time;
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -202,110 +473,89 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
- // release all the dictionary items acquired
- // finalize the queries
- size_t queries = 0;
- for(size_t i = 0; i < dimensions ;i++) {
- struct replication_dimension *d = &data[i];
- if(unlikely(!d->enabled)) continue;
+ q->points_read = points_read;
+ q->points_generated = points_generated;
- ops->finalize(&d->handle);
+ bool finished_with_gap = false;
+ if(last_end_time_in_buffer < before - q->st->update_every)
+ finished_with_gap = true;
- dictionary_acquired_item_release(d->dict, d->rda);
+ return finished_with_gap;
+}
- // update global statistics
- queries++;
- }
+static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
+ time_t wall_clock_time = now_realtime_sec();
- netdata_spinlock_lock(&replication_queries.spinlock);
- replication_queries.queries_started += queries;
- replication_queries.queries_finished += queries;
- replication_queries.points_read += points_read;
- replication_queries.points_generated += points_generated;
- netdata_spinlock_unlock(&replication_queries.spinlock);
+ if(requested_after > requested_before) {
+ // flip them
+ time_t t = requested_before;
+ requested_before = requested_after;
+ requested_after = t;
+ }
- return before;
-}
+ if(requested_after > wall_clock_time) {
+ requested_after = 0;
+ requested_before = 0;
+ requested_enable_streaming = true;
+ }
-static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(!rd->exposed) continue;
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
- rrddim_id(rd),
- (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
- rd->last_collected_value,
- rd->last_calculated_value,
- rd->last_stored_value
- );
+ if(requested_before > wall_clock_time) {
+ requested_before = wall_clock_time;
+ requested_enable_streaming = true;
}
- rrddim_foreach_done(rd);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
- (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
- (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
- );
-}
+ time_t query_after = requested_after;
+ time_t query_before = requested_before;
+ bool query_enable_streaming = requested_enable_streaming;
-bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
- time_t query_after = after;
- time_t query_before = before;
- time_t now = now_realtime_sec();
- time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
- // a data collection has been made
- // so, we give this tolerance to detect invalid timestamps
-
- // find the first entry we have
- time_t first_entry_local = rrdset_first_entry_t(st);
- if(first_entry_local > now + tolerance) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)first_entry_local, (unsigned long long)now);
- first_entry_local = now;
+ time_t db_first_entry = 0, db_last_entry = 0;
+ rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
+
+ if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
+ // no data requested - just enable streaming
+ ;
}
+ else {
+ if (query_after < db_first_entry)
+ query_after = db_first_entry;
- if (query_after < first_entry_local)
- query_after = first_entry_local;
+ if (query_before > db_last_entry)
+ query_before = db_last_entry;
- // find the latest entry we have
- time_t last_entry_local = st->last_updated.tv_sec;
- if(!last_entry_local) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- last_entry_local = rrdset_last_entry_t(st);
- if(!last_entry_local) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- last_entry_local = now;
+ // if the parent asked us to start streaming, then fill the rest with the data that we have
+ if (requested_enable_streaming)
+ query_before = db_last_entry;
+
+ if (query_after > query_before) {
+ time_t tmp = query_before;
+ query_before = query_after;
+ query_after = tmp;
}
- }
- if(last_entry_local > now + tolerance) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)last_entry_local, (unsigned long long)now);
- last_entry_local = now;
+ query_enable_streaming = (requested_enable_streaming ||
+ query_before == db_last_entry ||
+ !requested_after ||
+ !requested_before) ? true : false;
}
- if (query_before > last_entry_local)
- query_before = last_entry_local;
+ return replication_query_prepare(
+ st,
+ db_first_entry, db_last_entry,
+ requested_after, requested_before, requested_enable_streaming,
+ query_after, query_before, query_enable_streaming,
+ wall_clock_time);
+}
- // if the parent asked us to start streaming, then fill the rest with the data that we have
- if (start_streaming)
- query_before = last_entry_local;
+void replication_response_cancel_and_finalize(struct replication_query *q) {
+ replication_query_finalize(NULL, q, false);
+}
- if (query_after > query_before) {
- time_t tmp = query_before;
- query_before = query_after;
- query_after = tmp;
- }
+static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
- bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
+bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
+ struct replication_request *rq = q->rq;
+ RRDSET *st = q->st;
+ RRDHOST *host = st->rrdhost;
// we might want to optimize this by filling a temporary buffer
// and copying the result to the host's buffer in order to avoid
@@ -314,25 +564,24 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
- if(after != 0 && before != 0)
- before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
- else {
- after = 0;
- before = 0;
- enable_streaming = true;
- }
+ bool locked_data_collection = q->query.locked_data_collection;
+ q->query.locked_data_collection = false;
- // get again the world clock time
- time_t world_clock_time = now_realtime_sec();
- if(enable_streaming) {
- if(now < world_clock_time) {
- // we needed time to execute this request
- // so, the parent will need to replicate more data
- enable_streaming = false;
- }
- else
- replicate_chart_collection_state(wb, st);
- }
+ bool finished_with_gap = false;
+ if(q->query.execute)
+ finished_with_gap = replication_query_execute(wb, q, max_msg_size);
+
+ time_t after = q->request.after;
+ time_t before = q->query.before;
+ bool enable_streaming = q->query.enable_streaming;
+
+ replication_query_finalize(wb, q, q->query.execute);
+ q = NULL; // IMPORTANT: q is invalid now
+
+ // get a fresh retention to send to the parent
+ time_t wall_clock_time = now_realtime_sec();
+ time_t db_first_entry, db_last_entry;
+ rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
// end with first/last entries we have, and the first start time and
// last end time of the data we sent
@@ -342,7 +591,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
(int)st->update_every
// child first db time, child end db time
- , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
+ , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
// start streaming boolean
, enable_streaming ? "true" : "false"
@@ -351,13 +600,40 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
, (unsigned long long)after, (unsigned long long)before
// child world clock time
- , (unsigned long long)world_clock_time
+ , (unsigned long long)wall_clock_time
);
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
sender_commit(host->sender, wb);
worker_is_busy(WORKER_JOB_CLEANUP);
+ if(enable_streaming) {
+ if(sender_is_still_connected_for_this_request(rq)) {
+ // enable normal streaming if we have to
+ // but only if the sender buffer has not been flushed since we started
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+
+ if(!finished_with_gap)
+ st->upstream_resync_time_s = 0;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
+ }
+ else
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ }
+ }
+
+ if(locked_data_collection)
+ netdata_spinlock_unlock(&st->data_collection_lock);
+
return enable_streaming;
}
@@ -376,14 +652,14 @@ struct replication_request_details {
struct {
time_t first_entry_t; // the first entry time the child has
time_t last_entry_t; // the last entry time the child has
- time_t world_time_t; // the current time of the child
+ time_t wall_clock_time; // the current time of the child
+ bool fixed_last_entry; // when set we set the last entry to wall clock time
} child_db;
struct {
time_t first_entry_t; // the first entry time we have
time_t last_entry_t; // the last entry time we have
- bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed
- time_t now; // the current local world clock time
+ time_t wall_clock_time; // the current local world clock time
} local_db;
struct {
@@ -403,9 +679,36 @@ struct replication_request_details {
} wanted;
};
-static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
+static void replicate_log_request(struct replication_request_details *r, const char *msg) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ internal_error(true,
+#else
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl,
+#endif
+ "REPLAY ERROR: 'host:%s/chart:%s' child sent: "
+ "db from %ld to %ld%s, wall clock time %ld, "
+ "last request from %ld to %ld, "
+ "issue: %s - "
+ "sending replication request from %ld to %ld, start streaming %s",
+ rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
+ r->child_db.first_entry_t,
+ r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
+ r->child_db.wall_clock_time,
+ r->last_request.after,
+ r->last_request.before,
+ msg,
+ r->wanted.after,
+ r->wanted.before,
+ r->wanted.start_streaming ? "true" : "false");
+}
+
+static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
RRDSET *st = r->st;
+ if(log)
+ replicate_log_request(r, msg);
+
if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
@@ -422,7 +725,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
internal_error(true,
"REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
- "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
+ "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
, rrdhost_hostname(r->host), rrdset_id(r->st)
, r->wanted.after, wanted_after_buf
, r->wanted.before, wanted_before_buf
@@ -432,7 +735,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
, r->child_db.first_entry_t, r->child_db.last_entry_t
, r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
, r->local_db.first_entry_t, r->local_db.last_entry_t
- , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
+ , r->local_db.now
, r->gap.from, r->gap.to
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -459,7 +762,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
}
bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
- time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
+ time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
{
struct replication_request_details r = {
@@ -472,16 +775,16 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
.st = st,
.child_db = {
- .first_entry_t = first_entry_child,
- .last_entry_t = last_entry_child,
- .world_time_t = child_world_time,
+ .first_entry_t = child_first_entry,
+ .last_entry_t = child_last_entry,
+ .wall_clock_time = child_wall_clock_time,
+ .fixed_last_entry = false,
},
.local_db = {
- .first_entry_t = rrdset_first_entry_t(st),
- .last_entry_t = rrdset_last_entry_t(st),
- .last_entry_t_adjusted_to_now = false,
- .now = now_realtime_sec(),
+ .first_entry_t = 0,
+ .last_entry_t = 0,
+ .wall_clock_time = now_realtime_sec(),
},
.last_request = {
@@ -496,12 +799,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
},
};
- // check our local database retention
- if(r.local_db.last_entry_t > r.local_db.now) {
- r.local_db.last_entry_t = r.local_db.now;
- r.local_db.last_entry_t_adjusted_to_now = true;
+ if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
+ replicate_log_request(&r, "child's db last entry > child's wall clock time");
+ r.child_db.last_entry_t = r.child_db.wall_clock_time;
+ r.child_db.fixed_last_entry = true;
}
+ rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
+
// let's find the GAP we have
if(!r.last_request.after || !r.last_request.before) {
// there is no previous request
@@ -511,7 +816,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
r.gap.from = r.local_db.last_entry_t;
else
// we don't have any data, the gap is the max timeframe we are allowed to replicate
- r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
+ r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
}
else {
@@ -522,27 +827,30 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
}
// we want all the data up to now
- r.gap.to = r.local_db.now;
+ r.gap.to = r.local_db.wall_clock_time;
// The gap is now r.gap.from -> r.gap.to
if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
- return send_replay_chart_cmd(&r, "empty replication request, replication is disabled");
-
- if (unlikely(!r.child_db.last_entry_t))
- return send_replay_chart_cmd(&r, "empty replication request, child has no stored data");
+ return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
if (unlikely(!rrdset_number_of_dimensions(st)))
- return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions");
+ return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
+
+ if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
- if (r.child_db.first_entry_t <= 0)
- return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid");
+ if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
+ return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
- if (r.child_db.first_entry_t > r.child_db.last_entry_t)
- return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)");
+ if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
+ return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
- if (r.local_db.last_entry_t > r.child_db.last_entry_t)
- return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one");
+ if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
+
+ if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
// let's find what the child can provide to fill that gap
@@ -564,15 +872,22 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
if(r.wanted.before > r.child_db.last_entry_t)
r.wanted.before = r.child_db.last_entry_t;
- if(r.wanted.after > r.wanted.before)
- r.wanted.after = r.wanted.before;
+ if(r.wanted.after > r.wanted.before) {
+ r.wanted.after = 0;
+ r.wanted.before = 0;
+ r.wanted.start_streaming = true;
+ return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
+ }
// the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
- r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
+ r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
+ r.wanted.before >= r.child_db.last_entry_t ||
+ r.wanted.before >= r.child_db.wall_clock_time ||
+ r.wanted.before >= r.local_db.wall_clock_time);
// the wanted timeframe is now r.wanted.after -> r.wanted.before
// send it
- return send_replay_chart_cmd(&r, "OK");
+ return send_replay_chart_cmd(&r, "OK", false);
}
// ----------------------------------------------------------------------------
@@ -585,13 +900,20 @@ struct replication_request {
STRING *chart_id; // the chart of the request
time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
time_t before; // the end time of the query (maybe zero)
- bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
Word_t unique_id; // auto-increment, later requests have bigger
- bool found; // used as a result boolean for the find call
+
+ bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
bool indexed_in_judy; // true when the request is indexed in judy
bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
+ bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing
+
+ // prepare ahead members - preprocessing
+ bool found; // used as a result boolean for the find call
+ bool executed; // used to detect if we have skipped requests while preprocessing
+ RRDSET *st; // caching of the chart during preprocessing
+ struct replication_query *q; // the preprocessing query initialization
};
// replication sort entry in JudyL array
@@ -631,6 +953,7 @@ static struct replication_thread {
struct {
size_t executed; // the number of replication requests executed
size_t latest_first_time; // the 'after' timestamp of the last request we executed
+ size_t memory; // the total memory allocated by replication
} atomic; // access should be with atomic operations
struct {
@@ -663,6 +986,7 @@ static struct replication_thread {
.atomic = {
.executed = 0,
.latest_first_time = 0,
+ .memory = 0,
},
.main_thread = {
.last_executed = 0,
@@ -671,6 +995,10 @@ static struct replication_thread {
},
};
+size_t replication_allocated_memory(void) {
+ return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
+}
+
#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
@@ -723,6 +1051,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
fatal_when_replication_is_not_locked_for_me();
struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+ __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
@@ -734,11 +1063,13 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
rq->unique_id = rse->unique_id;
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = false;
+ rq->not_indexed_preprocessing = false;
return rse;
}
static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
freez(rse);
+ __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
}
static void replication_sort_entry_add(struct replication_request *rq) {
@@ -747,6 +1078,7 @@ static void replication_sort_entry_add(struct replication_request *rq) {
if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
+ rq->not_indexed_preprocessing = false;
replication_globals.unsafe.pending_no_room++;
replication_recursive_unlock();
return;
@@ -771,23 +1103,33 @@ static void replication_sort_entry_add(struct replication_request *rq) {
Pvoid_t *inner_judy_ptr;
// find the outer judy entry, using after as key
- inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
- if(!inner_judy_ptr)
- inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
+ fatal("REPLICATION: corrupted outer judyL");
// add it to the inner judy, using unique_id as key
+ size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
+ size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
+ if(unlikely(!item || item == PJERR))
+ fatal("REPLICATION: corrupted inner judyL");
+
*item = rse;
rq->indexed_in_judy = true;
rq->not_indexed_buffer_full = false;
+ rq->not_indexed_preprocessing = false;
if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
replication_globals.unsafe.first_time_t = rq->after;
replication_recursive_unlock();
+
+ __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
}
-static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
+static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
fatal_when_replication_is_not_locked_for_me();
bool inner_judy_deleted = false;
@@ -798,19 +1140,30 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
rse->rq->indexed_in_judy = false;
+ rse->rq->not_indexed_preprocessing = preprocessing;
+
+ size_t memory_saved = 0;
// delete it from the inner judy
+ size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
+ size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
+ memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
// if no items left, delete it from the outer judy
if(**inner_judy_ppptr == NULL) {
+ size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
+ size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
+ memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
inner_judy_deleted = true;
}
// free memory
replication_sort_entry_destroy(rse);
+ __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
+
return inner_judy_deleted;
}
@@ -826,7 +1179,7 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff
Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
if (our_item_pptr) {
rse_to_delete = *our_item_pptr;
- replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
+ replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false);
if(buffer_full) {
replication_globals.unsafe.pending_no_room++;
@@ -844,13 +1197,6 @@ static void replication_sort_entry_del(struct replication_request *rq, bool buff
replication_recursive_unlock();
}
-static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) {
- if(unlikely(first))
- return JudyLFirst(PArray, PIndex, PJE0);
-
- return JudyLNext(PArray, PIndex, PJE0);
-}
-
static struct replication_request replication_request_get_first_available() {
Pvoid_t *inner_judy_pptr;
@@ -881,7 +1227,7 @@ static struct replication_request replication_request_get_first_available() {
}
bool find_same_after = true;
- while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) {
+ while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
Pvoid_t *our_item_pptr;
if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
@@ -898,14 +1244,11 @@ static struct replication_request replication_request_get_first_available() {
// set the return result to found
rq_to_return.found = true;
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true))
// we removed the item from the outer JudyL
break;
}
- // call JudyLNext from now on
- find_same_after = false;
-
// prepare for the next iteration on the outer loop
replication_globals.unsafe.queue.unique_id = 0;
}
@@ -945,7 +1288,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
replication_recursive_lock();
- if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) {
// we can replace this command
internal_error(
true,
@@ -958,7 +1301,7 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
rq->before = rq_new->before;
rq->start_streaming = rq_new->start_streaming;
}
- else if(!rq->indexed_in_judy) {
+ else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) {
replication_sort_entry_add(rq);
internal_error(
true,
@@ -1001,55 +1344,57 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma
string_freez(rq->chart_id);
}
+static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
+ return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
+};
+
static bool replication_execute_request(struct replication_request *rq, bool workers) {
bool ret = false;
- if(likely(workers))
- worker_is_busy(WORKER_JOB_FIND_CHART);
+ if(!rq->st) {
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_FIND_CHART);
+
+ rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
+ }
- RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
- if(!st) {
+ if(!rq->st) {
internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
goto cleanup;
}
- if(likely(workers))
- worker_is_busy(WORKER_JOB_QUERYING);
-
netdata_thread_disable_cancelability();
- // send the replication data
- bool start_streaming = replicate_chart_response(
- st->rrdhost, st, rq->start_streaming, rq->after, rq->before);
+ if(!rq->q) {
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- netdata_thread_enable_cancelability();
+ rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ }
- if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) {
- // enable normal streaming if we have to
- // but only if the sender buffer has not been flushed since we started
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_QUERYING);
- if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+ // send the replication data
+ rq->q->rq = rq;
+ replication_response_execute_and_finalize(
+ rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
-#endif
- }
- else
- internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
- rrdhost_hostname(st->rrdhost), string2str(rq->chart_id));
- }
+ rq->q = NULL;
+ netdata_thread_enable_cancelability();
__atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
ret = true;
cleanup:
+ if(rq->q) {
+ replication_response_cancel_and_finalize(rq->q);
+ rq->q = NULL;
+ }
+
string_freez(rq->chart_id);
worker_is_idle();
return ret;
@@ -1068,6 +1413,7 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
.indexed_in_judy = false,
.not_indexed_buffer_full = false,
+ .not_indexed_preprocessing = false,
};
if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
@@ -1079,13 +1425,13 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
void replication_sender_delete_pending_requests(struct sender_state *sender) {
// allow the dictionary destructor to go faster on locks
- replication_recursive_lock();
dictionary_flush(sender->replication.requests);
- replication_recursive_unlock();
}
void replication_init_sender(struct sender_state *sender) {
- sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct replication_request));
+
dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
@@ -1107,9 +1453,8 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
struct replication_request *rq;
dfe_start_read(s->replication.requests, rq) {
- if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) {
+ if(rq->indexed_in_judy)
replication_sort_entry_del(rq, true);
- }
}
dfe_done(rq);
@@ -1122,9 +1467,8 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
struct replication_request *rq;
dfe_start_read(s->replication.requests, rq) {
- if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing))
replication_sort_entry_add(rq);
- }
}
dfe_done(rq);
@@ -1214,6 +1558,7 @@ static void replication_initialize_workers(bool master) {
worker_register_job_name(WORKER_JOB_QUERYING, "querying");
worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
+ worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query");
worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
@@ -1235,24 +1580,137 @@ static void replication_initialize_workers(bool master) {
#define REQUEST_QUEUE_EMPTY (-1)
#define REQUEST_CHART_NOT_FOUND (-2)
-static int replication_execute_next_pending_request(void) {
- worker_is_busy(WORKER_JOB_FIND_NEXT);
- struct replication_request rq = replication_request_get_first_available();
+static int replication_execute_next_pending_request(bool cancel) {
+ static __thread int max_requests_ahead = 0;
+ static __thread struct replication_request *rqs = NULL;
+ static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
+ static __thread size_t queue_rounds = 0; (void)queue_rounds;
+ struct replication_request *rq;
+
+ if(unlikely(cancel)) {
+ if(rqs) {
+ size_t cancelled = 0;
+ do {
+ if (++rqs_last_executed >= max_requests_ahead)
+ rqs_last_executed = 0;
+
+ rq = &rqs[rqs_last_executed];
+
+ if (rq->q) {
+ internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+ internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
+
+ replication_response_cancel_and_finalize(rq->q);
+ rq->q = NULL;
+ cancelled++;
+ }
+
+ rq->executed = true;
+ rq->found = false;
+
+ } while (rqs_last_executed != rqs_last_prepared);
+
+ internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
+ }
+ return REQUEST_QUEUE_EMPTY;
+ }
+
+ if(unlikely(!rqs)) {
+ max_requests_ahead = get_netdata_cpus() / 2;
+
+ if(max_requests_ahead > libuv_worker_threads * 2)
+ max_requests_ahead = libuv_worker_threads * 2;
+
+ if(max_requests_ahead < 2)
+ max_requests_ahead = 2;
+
+ rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
+ __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
+ }
+
+ // fill the queue
+ do {
+ if(++rqs_last_prepared >= max_requests_ahead) {
+ rqs_last_prepared = 0;
+ queue_rounds++;
+ }
+
+ internal_fatal(rqs[rqs_last_prepared].q,
+ "REPLAY FATAL: slot is used by query that has not been executed!");
+
+ worker_is_busy(WORKER_JOB_FIND_NEXT);
+ rqs[rqs_last_prepared] = replication_request_get_first_available();
+ rq = &rqs[rqs_last_prepared];
+
+ if(rq->found) {
+ if (!rq->st) {
+ worker_is_busy(WORKER_JOB_FIND_CHART);
+ rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
+ }
+
+ if (rq->st && !rq->q) {
+ worker_is_busy(WORKER_JOB_PREPARE_QUERY);
+ rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ }
- if(unlikely(!rq.found)) {
+ rq->executed = false;
+ }
+
+ } while(rq->found && rqs_last_prepared != rqs_last_executed);
+
+ // pick the first usable
+ do {
+ if (++rqs_last_executed >= max_requests_ahead)
+ rqs_last_executed = 0;
+
+ rq = &rqs[rqs_last_executed];
+
+ if(rq->found) {
+ internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+
+ if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
+ // the sender has reconnected since this request was queued,
+ // we can safely throw it away, since the parent will resend it
+ replication_response_cancel_and_finalize(rq->q);
+ rq->executed = true;
+ rq->found = false;
+ rq->q = NULL;
+ }
+ else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
+ // the sender buffer is full, so we can ignore this request,
+ // it has already been marked as 'preprocessed' in the dictionary,
+ // and the sender will put it back in when there is
+ // enough room in the buffer for processing replication requests
+ replication_response_cancel_and_finalize(rq->q);
+ rq->executed = true;
+ rq->found = false;
+ rq->q = NULL;
+ }
+ else {
+ // we can execute this,
+ // delete it from the dictionary
+ worker_is_busy(WORKER_JOB_DELETE_ENTRY);
+ dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
+ }
+ }
+ else
+ internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
+
+ } while(!rq->found && rqs_last_executed != rqs_last_prepared);
+
+ if(unlikely(!rq->found)) {
worker_is_idle();
return REQUEST_QUEUE_EMPTY;
}
- // delete the request from the dictionary
- worker_is_busy(WORKER_JOB_DELETE_ENTRY);
- if(!dictionary_del(rq.sender->replication.requests, string2str(rq.chart_id)))
- error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index",
- rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
+ replication_set_latest_first_time(rq->after);
- replication_set_latest_first_time(rq.after);
+ bool chart_found = replication_execute_request(rq, true);
+ rq->executed = true;
+ rq->found = false;
+ rq->q = NULL;
- if(unlikely(!replication_execute_request(&rq, true))) {
+ if(unlikely(!chart_found)) {
worker_is_idle();
return REQUEST_CHART_NOT_FOUND;
}
@@ -1262,6 +1720,7 @@ static int replication_execute_next_pending_request(void) {
}
static void replication_worker_cleanup(void *ptr __maybe_unused) {
+ replication_execute_next_pending_request(true);
worker_unregister();
}
@@ -1270,8 +1729,9 @@ static void *replication_worker_thread(void *ptr) {
netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
- while(!netdata_exit) {
- if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ while(service_running(SERVICE_REPLICATION)) {
+ if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+ sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
sleep_usec(1 * USEC_PER_SEC);
@@ -1286,13 +1746,17 @@ static void replication_main_cleanup(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+ replication_execute_next_pending_request(true);
+
int threads = (int)replication_globals.main_thread.threads;
for(int i = 0; i < threads ;i++) {
netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
freez(replication_globals.main_thread.threads_ptrs[i]);
+ __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
}
freez(replication_globals.main_thread.threads_ptrs);
replication_globals.main_thread.threads_ptrs = NULL;
+ __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
// custom code
worker_unregister();
@@ -1312,10 +1776,14 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(--threads) {
replication_globals.main_thread.threads = threads;
replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
+ __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
for(int i = 0; i < threads ;i++) {
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
- netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], "REPLICATION",
+ __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
+ netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
}
}
@@ -1333,7 +1801,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
size_t last_executed = 0;
size_t last_sender_resets = 0;
- while(!netdata_exit) {
+ while(service_running(SERVICE_REPLICATION)) {
// statistics
usec_t now_mono_ut = now_monotonic_usec();
@@ -1395,7 +1863,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_is_idle();
}
- if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();
@@ -1403,14 +1871,16 @@ void *replication_thread_main(void *ptr __maybe_unused) {
// the timeout also defines now frequently we will traverse all the pending requests
// when the outbound buffers of all senders is full
usec_t timeout;
- if(slow)
+ if(slow) {
// no work to be done, wait for a request to come in
timeout = 1000 * USEC_PER_MS;
+ sender_thread_buffer_free();
+ }
else if(replication_globals.unsafe.pending > 0) {
- if(replication_globals.unsafe.sender_resets == last_sender_resets) {
+ if(replication_globals.unsafe.sender_resets == last_sender_resets)
timeout = 1000 * USEC_PER_MS;
- }
+
else {
// there are pending requests waiting to be executed,
// but none could be executed at this time.