summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c340
1 files changed, 211 insertions, 129 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 7c1f16b4c..a50913a1a 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -88,6 +88,7 @@ struct replication_query {
bool locked_data_collection;
bool execute;
bool interrupted;
+ STREAM_CAPABILITIES capabilities;
} query;
time_t wall_clock_time;
@@ -95,7 +96,7 @@ struct replication_query {
size_t points_read;
size_t points_generated;
- struct storage_engine_query_ops *ops;
+ STORAGE_ENGINE_BACKEND backend;
struct replication_request *rq;
size_t dimensions;
@@ -112,7 +113,8 @@ static struct replication_query *replication_query_prepare(
time_t query_after,
time_t query_before,
bool query_enable_streaming,
- time_t wall_clock_time
+ time_t wall_clock_time,
+ STREAM_CAPABILITIES capabilities
) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
@@ -131,6 +133,7 @@ static struct replication_query *replication_query_prepare(
q->query.after = query_after;
q->query.before = query_before;
q->query.enable_streaming = query_enable_streaming;
+ q->query.capabilities = capabilities;
q->wall_clock_time = wall_clock_time;
@@ -159,7 +162,7 @@ static struct replication_query *replication_query_prepare(
}
}
- q->ops = &st->rrdhost->db[0].eng->api.query_ops;
+ q->backend = st->rrdhost->db[0].eng->backend;
// prepare our array of dimensions
size_t count = 0;
@@ -181,7 +184,7 @@ static struct replication_query *replication_query_prepare(
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
- q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
+ storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
d->enabled = true;
d->skip = false;
@@ -209,32 +212,40 @@ static struct replication_query *replication_query_prepare(
return q;
}
-static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
+static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
+ NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(!rd->exposed) continue;
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
- rrddim_id(rd),
- (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
- rd->last_collected_value,
- rd->last_calculated_value,
- rd->last_stored_value
- );
- }
+ rrddim_foreach_read(rd, st){
+ if (!rd->exposed) continue;
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
+ sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->last_collected_time.tv_usec);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_int64_encoded(wb, encoding, rd->last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
rrddim_foreach_done(rd);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
- (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
- (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
+ buffer_fast_strcat(wb, "\n", 1);
}
static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
size_t dimensions = q->dimensions;
if(wb && q->query.enable_streaming)
- replication_send_chart_collection_state(wb, q->st);
+ replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
if(q->query.locked_data_collection) {
netdata_spinlock_unlock(&q->st->data_collection_lock);
@@ -249,7 +260,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
struct replication_dimension *d = &q->data[i];
if (unlikely(!d->enabled)) continue;
- q->ops->finalize(&d->handle);
+ storage_engine_query_finalize(&d->handle);
dictionary_acquired_item_release(d->dict, d->rda);
@@ -281,7 +292,7 @@ static void replication_query_align_to_optimal_before(struct replication_query *
struct replication_dimension *d = &q->data[i];
if(unlikely(!d->enabled)) continue;
- time_t new_before = q->ops->align_to_optimal_before(&d->handle);
+ time_t new_before = storage_engine_align_to_optimal_before(&d->handle);
if (!expanded_before || new_before < expanded_before)
expanded_before = new_before;
}
@@ -296,13 +307,14 @@ static void replication_query_align_to_optimal_before(struct replication_query *
static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
replication_query_align_to_optimal_before(q);
+ NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
time_t after = q->query.after;
time_t before = q->query.before;
size_t dimensions = q->dimensions;
- struct storage_engine_query_ops *ops = q->ops;
time_t wall_clock_time = q->wall_clock_time;
- size_t points_read = q->points_read, points_generated = q->points_generated;
+ bool finished_with_gap = false;
+ size_t points_read = 0, points_generated = 0;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
time_t actual_after = 0, actual_before = 0;
@@ -318,8 +330,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
// fetch the first valid point for the dimension
int max_skip = 1000;
- while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
- d->sp = ops->next_metric(&d->handle);
+ while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
+ d->sp = storage_engine_query_next_metric(&d->handle);
points_read++;
}
@@ -328,9 +340,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
- rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
- (unsigned long long) now);
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
+ "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
+ (unsigned long long) now);
continue;
}
@@ -374,9 +387,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
else
fix_min_start_time = min_end_time - min_update_every;
+#ifdef NETDATA_INTERNAL_CHECKS
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
- "misaligned dimensions "
+ "misaligned dimensions, "
"update every (min: %ld, max: %ld), "
"start time (min: %ld, max: %ld), "
"end time (min %ld, max %ld), "
@@ -389,6 +403,7 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
now, last_end_time_in_buffer,
fix_min_start_time
);
+#endif
min_start_time = fix_min_start_time;
}
@@ -410,7 +425,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
q->query.before = last_end_time_in_buffer;
q->query.enable_streaming = false;
- internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. "
+ internal_error(true, "REPLICATION: current buffer size %zu is more than the "
+ "max message size %zu for chart '%s' of host '%s'. "
"Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
@@ -422,11 +438,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
}
last_end_time_in_buffer = min_end_time;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
- (unsigned long long) min_start_time,
- (unsigned long long) min_end_time,
- (unsigned long long) wall_clock_time
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
+ buffer_print_uint64_encoded(wb, encoding, min_start_time);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, min_end_time);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
// output the replay values for this time
for (size_t i = 0; i < dimensions; i++) {
@@ -438,8 +456,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
!storage_point_is_unset(d->sp) &&
!storage_point_is_gap(d->sp))) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
- rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
+ buffer_fast_strcat(wb, "\" ", 2);
+ buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
+ buffer_fast_strcat(wb, "\n", 1);
points_generated++;
}
@@ -450,9 +473,16 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
else if(unlikely(min_end_time < now))
// the query does not progress
break;
- else
+ else {
// we have gap - all points are in the future
now = min_start_time;
+
+ if(min_start_time > before && !points_generated) {
+ before = q->query.before = min_start_time - 1;
+ finished_with_gap = true;
+ break;
+ }
+ }
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -462,28 +492,33 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
(unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
(unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
}
else
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
- q->points_read = points_read;
- q->points_generated = points_generated;
+ q->points_read += points_read;
+ q->points_generated += points_generated;
- bool finished_with_gap = false;
if(last_end_time_in_buffer < before - q->st->update_every)
finished_with_gap = true;
return finished_with_gap;
}
-static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
+static struct replication_query *replication_response_prepare(
+ RRDSET *st,
+ bool requested_enable_streaming,
+ time_t requested_after,
+ time_t requested_before,
+ STREAM_CAPABILITIES capabilities
+ ) {
time_t wall_clock_time = now_realtime_sec();
if(requested_after > requested_before) {
@@ -509,7 +544,8 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r
bool query_enable_streaming = requested_enable_streaming;
time_t db_first_entry = 0, db_last_entry = 0;
- rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
+ rrdset_get_retention_of_tier_for_collected_chart(
+ st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
// no data requested - just enable streaming
@@ -543,7 +579,7 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r
db_first_entry, db_last_entry,
requested_after, requested_before, requested_enable_streaming,
query_after, query_before, query_enable_streaming,
- wall_clock_time);
+ wall_clock_time, capabilities);
}
void replication_response_cancel_and_finalize(struct replication_query *q) {
@@ -553,6 +589,7 @@ void replication_response_cancel_and_finalize(struct replication_query *q) {
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
+ NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
struct replication_request *rq = q->rq;
RRDSET *st = q->st;
RRDHOST *host = st->rrdhost;
@@ -562,7 +599,11 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// holding the host's buffer lock for too long
BUFFER *wb = sender_start(host->sender);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "'\n", 2);
+
+// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
bool locked_data_collection = q->query.locked_data_collection;
q->query.locked_data_collection = false;
@@ -585,23 +626,22 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// end with first/last entries we have, and the first start time and
// last end time of the data we sent
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
- // current chart update every
- (int)st->update_every
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
+ buffer_print_int64_encoded(wb, encoding, st->update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, db_first_entry);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, db_last_entry);
- // child first db time, child end db time
- , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
+ buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
- // start streaming boolean
- , enable_streaming ? "true" : "false"
-
- // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
- , (unsigned long long)after, (unsigned long long)before
-
- // child world clock time
- , (unsigned long long)wall_clock_time
- );
+ buffer_print_uint64_encoded(wb, encoding, after);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, before);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
sender_commit(host->sender, wb);
@@ -733,9 +773,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
, msg
, r->last_request.after, r->last_request.before
, r->child_db.first_entry_t, r->child_db.last_entry_t
- , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+ , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD"
, r->local_db.first_entry_t, r->local_db.last_entry_t
- , r->local_db.now
+ , r->local_db.wall_clock_time
, r->gap.from, r->gap.to
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -928,11 +968,12 @@ struct replication_sort_entry {
// the global variables for the replication thread
static struct replication_thread {
+ ARAL *aral_rse;
+
SPINLOCK spinlock;
struct {
size_t pending; // number of requests pending in the queue
- Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
// statistics
size_t added; // number of requests added to the queue
@@ -951,6 +992,7 @@ static struct replication_thread {
} unsafe; // protected from replication_recursive_lock()
struct {
+ Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
size_t executed; // the number of replication requests executed
size_t latest_first_time; // the 'after' timestamp of the last request we executed
size_t memory; // the total memory allocated by replication
@@ -964,10 +1006,10 @@ static struct replication_thread {
} main_thread; // access is allowed only by the main thread
} replication_globals = {
+ .aral_rse = NULL,
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.unsafe = {
.pending = 0,
- .unique_id = 0,
.added = 0,
.removed = 0,
@@ -984,6 +1026,7 @@ static struct replication_thread {
},
},
.atomic = {
+ .unique_id = 0,
.executed = 0,
.latest_first_time = 0,
.memory = 0,
@@ -1047,17 +1090,15 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) {
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
- fatal_when_replication_is_not_locked_for_me();
-
- struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+ struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
__atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
// copy the request
rse->rq = rq;
- rse->unique_id = ++replication_globals.unsafe.unique_id;
+ rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST);
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
@@ -1068,26 +1109,30 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
}
static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
- freez(rse);
+ aral_freez(replication_globals.aral_rse, rse);
__atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
}
static void replication_sort_entry_add(struct replication_request *rq) {
- replication_recursive_lock();
-
if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
rq->not_indexed_preprocessing = false;
+ replication_recursive_lock();
replication_globals.unsafe.pending_no_room++;
replication_recursive_unlock();
return;
}
- if(rq->not_indexed_buffer_full)
- replication_globals.unsafe.pending_no_room--;
+ // cache this, because it will be changed
+ bool decrement_no_room = rq->not_indexed_buffer_full;
+
+ struct replication_sort_entry *rse = replication_sort_entry_create(rq);
+
+ replication_recursive_lock();
- struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
+ if(decrement_no_room)
+ replication_globals.unsafe.pending_no_room--;
// if(rq->after < (time_t)replication_globals.protected.queue.after &&
// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
@@ -1371,7 +1416,12 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
if(likely(workers))
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ rq->q = replication_response_prepare(
+ rq->st,
+ rq->start_streaming,
+ rq->after,
+ rq->before,
+ rq->sender->capabilities);
}
if(likely(workers))
@@ -1580,67 +1630,85 @@ static void replication_initialize_workers(bool master) {
#define REQUEST_QUEUE_EMPTY (-1)
#define REQUEST_CHART_NOT_FOUND (-2)
-static int replication_execute_next_pending_request(bool cancel) {
- static __thread int max_requests_ahead = 0;
- static __thread struct replication_request *rqs = NULL;
- static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
- static __thread size_t queue_rounds = 0; (void)queue_rounds;
+static __thread struct replication_thread_pipeline {
+ int max_requests_ahead;
+ struct replication_request *rqs;
+ int rqs_last_executed, rqs_last_prepared;
+ size_t queue_rounds;
+} rtp = {
+ .max_requests_ahead = 0,
+ .rqs = NULL,
+ .rqs_last_executed = 0,
+ .rqs_last_prepared = 0,
+ .queue_rounds = 0,
+};
+
+static void replication_pipeline_cancel_and_cleanup(void) {
+ if(!rtp.rqs)
+ return;
+
struct replication_request *rq;
+ size_t cancelled = 0;
+
+ do {
+ if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+ rtp.rqs_last_executed = 0;
- if(unlikely(cancel)) {
- if(rqs) {
- size_t cancelled = 0;
- do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
+ rq = &rtp.rqs[rtp.rqs_last_executed];
- rq = &rqs[rqs_last_executed];
+ if (rq->q) {
+ internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+ internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
- if (rq->q) {
- internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
- internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
+ replication_response_cancel_and_finalize(rq->q);
+ rq->q = NULL;
+ cancelled++;
+ }
- replication_response_cancel_and_finalize(rq->q);
- rq->q = NULL;
- cancelled++;
- }
+ rq->executed = true;
+ rq->found = false;
- rq->executed = true;
- rq->found = false;
+ } while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
- } while (rqs_last_executed != rqs_last_prepared);
+ internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
- internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
- }
- return REQUEST_QUEUE_EMPTY;
- }
+ freez(rtp.rqs);
+ rtp.rqs = NULL;
+ rtp.max_requests_ahead = 0;
+ rtp.rqs_last_executed = 0;
+ rtp.rqs_last_prepared = 0;
+ rtp.queue_rounds = 0;
+}
+
+static int replication_pipeline_execute_next(void) {
+ struct replication_request *rq;
- if(unlikely(!rqs)) {
- max_requests_ahead = get_netdata_cpus() / 2;
+ if(unlikely(!rtp.rqs)) {
+ rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
- if(max_requests_ahead > libuv_worker_threads * 2)
- max_requests_ahead = libuv_worker_threads * 2;
+ if(rtp.max_requests_ahead > libuv_worker_threads * 2)
+ rtp.max_requests_ahead = libuv_worker_threads * 2;
- if(max_requests_ahead < 2)
- max_requests_ahead = 2;
+ if(rtp.max_requests_ahead < 2)
+ rtp.max_requests_ahead = 2;
- rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
- __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
+ rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
+ __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
}
// fill the queue
do {
- if(++rqs_last_prepared >= max_requests_ahead) {
- rqs_last_prepared = 0;
- queue_rounds++;
+ if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
+ rtp.rqs_last_prepared = 0;
+ rtp.queue_rounds++;
}
- internal_fatal(rqs[rqs_last_prepared].q,
+ internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
"REPLAY FATAL: slot is used by query that has not been executed!");
worker_is_busy(WORKER_JOB_FIND_NEXT);
- rqs[rqs_last_prepared] = replication_request_get_first_available();
- rq = &rqs[rqs_last_prepared];
+ rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
+ rq = &rtp.rqs[rtp.rqs_last_prepared];
if(rq->found) {
if (!rq->st) {
@@ -1650,20 +1718,25 @@ static int replication_execute_next_pending_request(bool cancel) {
if (rq->st && !rq->q) {
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ rq->q = replication_response_prepare(
+ rq->st,
+ rq->start_streaming,
+ rq->after,
+ rq->before,
+ rq->sender->capabilities);
}
rq->executed = false;
}
- } while(rq->found && rqs_last_prepared != rqs_last_executed);
+ } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
// pick the first usable
do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
+ if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+ rtp.rqs_last_executed = 0;
- rq = &rqs[rqs_last_executed];
+ rq = &rtp.rqs[rtp.rqs_last_executed];
if(rq->found) {
internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
@@ -1696,7 +1769,7 @@ static int replication_execute_next_pending_request(bool cancel) {
else
internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
- } while(!rq->found && rqs_last_executed != rqs_last_prepared);
+ } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
if(unlikely(!rq->found)) {
worker_is_idle();
@@ -1720,7 +1793,7 @@ static int replication_execute_next_pending_request(bool cancel) {
}
static void replication_worker_cleanup(void *ptr __maybe_unused) {
- replication_execute_next_pending_request(true);
+ replication_pipeline_cancel_and_cleanup();
worker_unregister();
}
@@ -1730,7 +1803,7 @@ static void *replication_worker_thread(void *ptr) {
netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
while(service_running(SERVICE_REPLICATION)) {
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
@@ -1746,7 +1819,7 @@ static void replication_main_cleanup(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- replication_execute_next_pending_request(true);
+ replication_pipeline_cancel_and_cleanup();
int threads = (int)replication_globals.main_thread.threads;
for(int i = 0; i < threads ;i++) {
@@ -1758,12 +1831,21 @@ static void replication_main_cleanup(void *ptr) {
replication_globals.main_thread.threads_ptrs = NULL;
__atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
+ aral_destroy(replication_globals.aral_rse);
+ replication_globals.aral_rse = NULL;
+
// custom code
worker_unregister();
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
+void replication_initialize(void) {
+ replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry),
+ 0, 65536, aral_by_size_statistics(),
+ NULL, NULL, false, false);
+}
+
void *replication_thread_main(void *ptr __maybe_unused) {
replication_initialize_workers(true);
@@ -1863,7 +1945,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_is_idle();
}
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();