summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-12-05 16:29:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-12-05 16:31:00 +0000
commit89b68730a8a23e3393f0fe9623ac6ec4480021a1 (patch)
tree2fdf1b5447ffd8bdd61e702ca183e814afdcb4fc /streaming/replication.c
parentAdding upstream version 1.37.0. (diff)
downloadnetdata-89b68730a8a23e3393f0fe9623ac6ec4480021a1.tar.xz
netdata-89b68730a8a23e3393f0fe9623ac6ec4480021a1.zip
Adding upstream version 1.37.1.upstream/1.37.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--streaming/replication.c213
1 files changed, 126 insertions, 87 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 8fa501061..d659d701d 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -4,7 +4,7 @@
#include "Judy.h"
#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
-#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
#define WORKER_JOB_FIND_NEXT 1
@@ -14,17 +14,17 @@
#define WORKER_JOB_CHECK_CONSISTENCY 5
#define WORKER_JOB_BUFFER_COMMIT 6
#define WORKER_JOB_CLEANUP 7
+#define WORKER_JOB_WAIT 8
// master thread worker jobs
-#define WORKER_JOB_STATISTICS 8
-#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9
-#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10
-#define WORKER_JOB_CUSTOM_METRIC_ADDED 11
-#define WORKER_JOB_CUSTOM_METRIC_DONE 12
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14
-#define WORKER_JOB_CUSTOM_METRIC_WAITS 15
-#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
+#define WORKER_JOB_STATISTICS 9
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 13
+#define WORKER_JOB_CUSTOM_METRIC_DONE 14
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
#define SECONDS_TO_RESET_POINT_IN_TIME 10
@@ -591,6 +591,7 @@ struct replication_request {
Word_t unique_id; // auto-increment, later requests have bigger
bool found; // used as a result boolean for the find call
bool indexed_in_judy; // true when the request is indexed in judy
+ bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
};
// replication sort entry in JudyL array
@@ -605,7 +606,7 @@ struct replication_sort_entry {
// the global variables for the replication thread
static struct replication_thread {
- netdata_mutex_t mutex;
+ SPINLOCK spinlock;
struct {
size_t pending; // number of requests pending in the queue
@@ -614,9 +615,8 @@ static struct replication_thread {
// statistics
size_t added; // number of requests added to the queue
size_t removed; // number of requests removed from the queue
- size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent
- size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses
-// size_t skipped_no_room_since_last_reset;
+ size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
+ size_t senders_full; // number of times a sender reset our last position in the queue
size_t sender_resets; // number of times a sender reset our last position in the queue
time_t first_time_t; // the minimum 'after' we encountered
@@ -634,7 +634,6 @@ static struct replication_thread {
} atomic; // access should be with atomic operations
struct {
- size_t waits;
size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
netdata_thread_t **threads_ptrs;
@@ -642,17 +641,16 @@ static struct replication_thread {
} main_thread; // access is allowed only by the main thread
} replication_globals = {
- .mutex = NETDATA_MUTEX_INITIALIZER,
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
.unsafe = {
.pending = 0,
.unique_id = 0,
.added = 0,
.removed = 0,
- .skipped_not_connected = 0,
- .skipped_no_room = 0,
-// .skipped_no_room_since_last_reset = 0,
+ .pending_no_room = 0,
.sender_resets = 0,
+ .senders_full = 0,
.first_time_t = 0,
@@ -667,7 +665,6 @@ static struct replication_thread {
.latest_first_time = 0,
},
.main_thread = {
- .waits = 0,
.last_executed = 0,
.threads = 0,
.threads_ptrs = NULL,
@@ -682,11 +679,11 @@ static inline bool replication_recursive_lock_mode(char mode) {
if(mode == 'L') { // (L)ock
if(++recursions == 1)
- netdata_mutex_lock(&replication_globals.mutex);
+ netdata_spinlock_lock(&replication_globals.spinlock);
}
else if(mode == 'U') { // (U)nlock
if(--recursions == 0)
- netdata_mutex_unlock(&replication_globals.mutex);
+ netdata_spinlock_unlock(&replication_globals.spinlock);
}
else if(mode == 'C') { // (C)heck
if(recursions > 0)
@@ -736,6 +733,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
rq->indexed_in_judy = false;
+ rq->not_indexed_buffer_full = false;
return rse;
}
@@ -743,9 +741,20 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
freez(rse);
}
-static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) {
+static void replication_sort_entry_add(struct replication_request *rq) {
replication_recursive_lock();
+ if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
+ rq->indexed_in_judy = false;
+ rq->not_indexed_buffer_full = true;
+ replication_globals.unsafe.pending_no_room++;
+ replication_recursive_unlock();
+ return;
+ }
+
+ if(rq->not_indexed_buffer_full)
+ replication_globals.unsafe.pending_no_room--;
+
struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
// if(rq->after < (time_t)replication_globals.protected.queue.after &&
@@ -770,13 +779,12 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
*item = rse;
rq->indexed_in_judy = true;
+ rq->not_indexed_buffer_full = false;
if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
replication_globals.unsafe.first_time_t = rq->after;
replication_recursive_unlock();
-
- return rse;
}
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
@@ -806,7 +814,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
return inner_judy_deleted;
}
-static void replication_sort_entry_del(struct replication_request *rq) {
+static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
Pvoid_t *inner_judy_pptr;
struct replication_sort_entry *rse_to_delete = NULL;
@@ -819,6 +827,11 @@ static void replication_sort_entry_del(struct replication_request *rq) {
if (our_item_pptr) {
rse_to_delete = *our_item_pptr;
replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
+
+ if(buffer_full) {
+ replication_globals.unsafe.pending_no_room++;
+ rq->not_indexed_buffer_full = true;
+ }
}
}
@@ -877,44 +890,17 @@ static struct replication_request replication_request_get_first_available() {
while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
struct replication_sort_entry *rse = *our_item_pptr;
struct replication_request *rq = rse->rq;
- struct sender_state *s = rq->sender;
-
- if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
- // there is room for this request in the sender buffer
-
- bool sender_is_connected =
- rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
-
- bool sender_has_been_flushed_since_this_request =
- rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
- if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
- // skip this request, the sender is not connected, or it has reconnected
+ // copy the request to return it
+ rq_to_return = *rq;
+ rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
- replication_globals.unsafe.skipped_not_connected++;
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- // we removed the item from the outer JudyL
- break;
- }
- else {
- // this request is good to execute
+ // set the return result to found
+ rq_to_return.found = true;
- // copy the request to return it
- rq_to_return = *rq;
- rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
-
- // set the return result to found
- rq_to_return.found = true;
-
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- // we removed the item from the outer JudyL
- break;
- }
- }
- else {
- replication_globals.unsafe.skipped_no_room++;
-// replication_globals.protected.skipped_no_room_since_last_reset++;
- }
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
}
// call JudyLNext from now on
@@ -959,7 +945,20 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
replication_recursive_lock();
- if(!rq->indexed_in_judy) {
+ if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ // we can replace this command
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+
+ rq->after = rq_new->after;
+ rq->before = rq_new->before;
+ rq->start_streaming = rq_new->start_streaming;
+ }
+ else if(!rq->indexed_in_judy) {
replication_sort_entry_add(rq);
internal_error(
true,
@@ -991,7 +990,13 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma
rrdpush_sender_replicating_charts_minus_one(rq->sender);
if(rq->indexed_in_judy)
- replication_sort_entry_del(rq);
+ replication_sort_entry_del(rq, false);
+
+ else if(rq->not_indexed_buffer_full) {
+ replication_recursive_lock();
+ replication_globals.unsafe.pending_no_room--;
+ replication_recursive_unlock();
+ }
string_freez(rq->chart_id);
}
@@ -1046,6 +1051,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
cleanup:
string_freez(rq->chart_id);
+ worker_is_idle();
return ret;
}
@@ -1060,6 +1066,8 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.before = before,
.start_streaming = start_streaming,
.sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
+ .indexed_in_judy = false,
+ .not_indexed_buffer_full = false,
};
if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
@@ -1094,15 +1102,36 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
- if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
- s->replication.unsafe.reached_max = true;
+ if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
+ rrdpush_sender_replication_buffer_full_set(s, true);
+
+ struct replication_request *rq;
+ dfe_start_read(s->replication.requests, rq) {
+ if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) {
+ replication_sort_entry_del(rq, true);
+ }
+ }
+ dfe_done(rq);
- if(s->replication.unsafe.reached_max &&
- percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
- s->replication.unsafe.reached_max = false;
replication_recursive_lock();
-// replication_set_next_point_in_time(0, 0);
+ replication_globals.unsafe.senders_full++;
+ replication_recursive_unlock();
+ }
+ else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
+ rrdpush_sender_replication_buffer_full_set(s, false);
+
+ struct replication_request *rq;
+ dfe_start_read(s->replication.requests, rq) {
+ if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
+ replication_sort_entry_add(rq);
+ }
+ }
+ dfe_done(rq);
+
+ replication_recursive_lock();
+ replication_globals.unsafe.senders_full--;
replication_globals.unsafe.sender_resets++;
+ // replication_set_next_point_in_time(0, 0);
replication_recursive_unlock();
}
@@ -1188,17 +1217,17 @@ static void replication_initialize_workers(bool master) {
worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
+ worker_register_job_name(WORKER_JOB_WAIT, "wait");
if(master) {
worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
}
}
@@ -1210,8 +1239,10 @@ static int replication_execute_next_pending_request(void) {
worker_is_busy(WORKER_JOB_FIND_NEXT);
struct replication_request rq = replication_request_get_first_available();
- if(unlikely(!rq.found))
+ if(unlikely(!rq.found)) {
+ worker_is_idle();
return REQUEST_QUEUE_EMPTY;
+ }
// delete the request from the dictionary
worker_is_busy(WORKER_JOB_DELETE_ENTRY);
@@ -1221,9 +1252,12 @@ static int replication_execute_next_pending_request(void) {
replication_set_latest_first_time(rq.after);
- if(unlikely(!replication_execute_request(&rq, true)))
+ if(unlikely(!replication_execute_request(&rq, true))) {
+ worker_is_idle();
return REQUEST_CHART_NOT_FOUND;
+ }
+ worker_is_idle();
return REQUEST_OK;
}
@@ -1238,6 +1272,7 @@ static void *replication_worker_thread(void *ptr) {
while(!netdata_exit) {
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
sleep_usec(1 * USEC_PER_SEC);
}
@@ -1305,6 +1340,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
last_now_mono_ut = now_mono_ut;
+ worker_is_busy(WORKER_JOB_STATISTICS);
replication_recursive_lock();
size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
@@ -1321,19 +1357,21 @@ void *replication_thread_main(void *ptr __maybe_unused) {
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
}
- if(!replication_globals.unsafe.pending && --run_verification_countdown == 0) {
- // reset the statistics about completion percentage
- replication_globals.unsafe.first_time_t = 0;
- replication_set_latest_first_time(0);
+ if(--run_verification_countdown == 0) {
+ if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
+ // reset the statistics about completion percentage
+ replication_globals.unsafe.first_time_t = 0;
+ replication_set_latest_first_time(0);
- verify_all_hosts_charts_are_streaming_now();
+ verify_all_hosts_charts_are_streaming_now();
- run_verification_countdown = LONG_MAX;
- slow = true;
+ run_verification_countdown = LONG_MAX;
+ slow = true;
+ }
+ else
+ run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
}
- worker_is_busy(WORKER_JOB_STATISTICS);
-
time_t latest_first_time_t = replication_get_latest_first_time();
if(latest_first_time_t && replication_globals.unsafe.pending) {
// completion percentage statistics
@@ -1349,15 +1387,17 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_not_connected);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_no_room);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.main_thread.waits);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
replication_recursive_unlock();
+ worker_is_idle();
}
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+
+ worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();
// the timeout also defines now frequently we will traverse all the pending requests
@@ -1388,7 +1428,6 @@ void *replication_thread_main(void *ptr __maybe_unused) {
last_sender_resets = replication_globals.unsafe.sender_resets;
}
- replication_globals.main_thread.waits++;
replication_recursive_unlock();
worker_is_idle();