diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/replication.c | 213 |
1 files changed, 126 insertions, 87 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 8fa501061..d659d701d 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -4,7 +4,7 @@ #include "Judy.h" #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 -#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 +#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 #define WORKER_JOB_FIND_NEXT 1 @@ -14,17 +14,17 @@ #define WORKER_JOB_CHECK_CONSISTENCY 5 #define WORKER_JOB_BUFFER_COMMIT 6 #define WORKER_JOB_CLEANUP 7 +#define WORKER_JOB_WAIT 8 // master thread worker jobs -#define WORKER_JOB_STATISTICS 8 -#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9 -#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10 -#define WORKER_JOB_CUSTOM_METRIC_ADDED 11 -#define WORKER_JOB_CUSTOM_METRIC_DONE 12 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14 -#define WORKER_JOB_CUSTOM_METRIC_WAITS 15 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 +#define WORKER_JOB_STATISTICS 9 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 13 +#define WORKER_JOB_CUSTOM_METRIC_DONE 14 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16 #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 #define SECONDS_TO_RESET_POINT_IN_TIME 10 @@ -591,6 +591,7 @@ struct replication_request { Word_t unique_id; // auto-increment, later requests have bigger bool found; // used as a result boolean for the find call bool indexed_in_judy; // true when the request is indexed in judy + bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full }; // replication sort entry in JudyL array @@ -605,7 +606,7 @@ struct replication_sort_entry { // the global variables for the replication thread static struct replication_thread { - netdata_mutex_t mutex; + SPINLOCK spinlock; struct { size_t pending; // number of requests pending in the queue @@ -614,9 +615,8 @@ static struct replication_thread { // statistics size_t added; // number of requests added to the queue size_t removed; // number of requests removed from the queue - size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent - size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses -// size_t skipped_no_room_since_last_reset; + size_t pending_no_room; // number of requests skipped, because the sender has no room for responses + size_t senders_full; // number of times a sender reset our last position in the queue size_t sender_resets; // number of times a sender reset our last position in the queue time_t first_time_t; // the minimum 'after' we encountered @@ -634,7 +634,6 @@ static struct replication_thread { } atomic; // access should be with atomic operations struct { - size_t waits; size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time netdata_thread_t **threads_ptrs; @@ -642,17 +641,16 @@ static struct replication_thread { } main_thread; // access is allowed only by the main thread } replication_globals = { - .mutex = NETDATA_MUTEX_INITIALIZER, + .spinlock = NETDATA_SPINLOCK_INITIALIZER, .unsafe = { .pending = 0, .unique_id = 0, .added = 0, .removed = 0, - .skipped_not_connected = 0, - .skipped_no_room = 0, -// .skipped_no_room_since_last_reset = 0, + .pending_no_room = 0, .sender_resets = 0, + .senders_full = 0, .first_time_t = 0, @@ -667,7 +665,6 @@ static struct replication_thread { .latest_first_time = 0, }, .main_thread = { - .waits = 0, .last_executed = 0, .threads = 0, .threads_ptrs = NULL, @@ -682,11 +679,11 @@ static inline bool replication_recursive_lock_mode(char mode) { if(mode == 'L') { // (L)ock if(++recursions == 1) - netdata_mutex_lock(&replication_globals.mutex); + netdata_spinlock_lock(&replication_globals.spinlock); } else if(mode == 'U') { // (U)nlock if(--recursions == 0) - netdata_mutex_unlock(&replication_globals.mutex); + netdata_spinlock_unlock(&replication_globals.spinlock); } else if(mode == 'C') { // (C)heck if(recursions > 0) @@ -736,6 +733,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; rq->indexed_in_judy = false; + rq->not_indexed_buffer_full = false; return rse; } @@ -743,9 +741,20 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { freez(rse); } -static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) { +static void replication_sort_entry_add(struct replication_request *rq) { replication_recursive_lock(); + if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { + rq->indexed_in_judy = false; + rq->not_indexed_buffer_full = true; + replication_globals.unsafe.pending_no_room++; + replication_recursive_unlock(); + return; + } + + if(rq->not_indexed_buffer_full) + replication_globals.unsafe.pending_no_room--; + struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq); // if(rq->after < (time_t)replication_globals.protected.queue.after && @@ -770,13 +779,12 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); *item = rse; rq->indexed_in_judy = true; + rq->not_indexed_buffer_full = false; if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); - - return rse; } static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { @@ -806,7 +814,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor return inner_judy_deleted; } -static void replication_sort_entry_del(struct replication_request *rq) { +static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) { Pvoid_t *inner_judy_pptr; struct replication_sort_entry *rse_to_delete = NULL; @@ -819,6 +827,11 @@ static void replication_sort_entry_del(struct replication_request *rq) { if (our_item_pptr) { rse_to_delete = *our_item_pptr; replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr); + + if(buffer_full) { + replication_globals.unsafe.pending_no_room++; + rq->not_indexed_buffer_full = true; + } } } @@ -877,44 +890,17 @@ static struct replication_request replication_request_get_first_available() { while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) { struct replication_sort_entry *rse = *our_item_pptr; struct replication_request *rq = rse->rq; - struct sender_state *s = rq->sender; - - if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) { - // there is room for this request in the sender buffer - - bool sender_is_connected = - rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - - bool sender_has_been_flushed_since_this_request = - rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); - if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { - // skip this request, the sender is not connected, or it has reconnected + // copy the request to return it + rq_to_return = *rq; + rq_to_return.chart_id = string_dup(rq_to_return.chart_id); - replication_globals.unsafe.skipped_not_connected++; - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - // we removed the item from the outer JudyL - break; - } - else { - // this request is good to execute + // set the return result to found + rq_to_return.found = true; - // copy the request to return it - rq_to_return = *rq; - rq_to_return.chart_id = string_dup(rq_to_return.chart_id); - - // set the return result to found - rq_to_return.found = true; - - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - // we removed the item from the outer JudyL - break; - } - } - else { - replication_globals.unsafe.skipped_no_room++; -// replication_globals.protected.skipped_no_room_since_last_reset++; - } + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; } // call JudyLNext from now on @@ -959,7 +945,20 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __ replication_recursive_lock(); - if(!rq->indexed_in_judy) { + if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + // we can replace this command + internal_error( + true, + "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])", + rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item), + (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false", + (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false"); + + rq->after = rq_new->after; + rq->before = rq_new->before; + rq->start_streaming = rq_new->start_streaming; + } + else if(!rq->indexed_in_judy) { replication_sort_entry_add(rq); internal_error( true, @@ -991,7 +990,13 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma rrdpush_sender_replicating_charts_minus_one(rq->sender); if(rq->indexed_in_judy) - replication_sort_entry_del(rq); + replication_sort_entry_del(rq, false); + + else if(rq->not_indexed_buffer_full) { + replication_recursive_lock(); + replication_globals.unsafe.pending_no_room--; + replication_recursive_unlock(); + } string_freez(rq->chart_id); } @@ -1046,6 +1051,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor cleanup: string_freez(rq->chart_id); + worker_is_idle(); return ret; } @@ -1060,6 +1066,8 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .before = before, .start_streaming = start_streaming, .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), + .indexed_in_judy = false, + .not_indexed_buffer_full = false, }; if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) @@ -1094,15 +1102,36 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer); size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size; - if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) - s->replication.unsafe.reached_max = true; + if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) { + rrdpush_sender_replication_buffer_full_set(s, true); + + struct replication_request *rq; + dfe_start_read(s->replication.requests, rq) { + if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) { + replication_sort_entry_del(rq, true); + } + } + dfe_done(rq); - if(s->replication.unsafe.reached_max && - percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) { - s->replication.unsafe.reached_max = false; replication_recursive_lock(); -// replication_set_next_point_in_time(0, 0); + replication_globals.unsafe.senders_full++; + replication_recursive_unlock(); + } + else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) { + rrdpush_sender_replication_buffer_full_set(s, false); + + struct replication_request *rq; + dfe_start_read(s->replication.requests, rq) { + if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) { + replication_sort_entry_add(rq); + } + } + dfe_done(rq); + + replication_recursive_lock(); + replication_globals.unsafe.senders_full--; replication_globals.unsafe.sender_resets++; + // replication_set_next_point_in_time(0, 0); replication_recursive_unlock(); } @@ -1188,17 +1217,17 @@ static void replication_initialize_workers(bool master) { worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); + worker_register_job_name(WORKER_JOB_WAIT, "wait"); if(master) { worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); - worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); - worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL); - worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE); } } @@ -1210,8 +1239,10 @@ static int replication_execute_next_pending_request(void) { worker_is_busy(WORKER_JOB_FIND_NEXT); struct replication_request rq = replication_request_get_first_available(); - if(unlikely(!rq.found)) + if(unlikely(!rq.found)) { + worker_is_idle(); return REQUEST_QUEUE_EMPTY; + } // delete the request from the dictionary worker_is_busy(WORKER_JOB_DELETE_ENTRY); @@ -1221,9 +1252,12 @@ static int replication_execute_next_pending_request(void) { replication_set_latest_first_time(rq.after); - if(unlikely(!replication_execute_request(&rq, true))) + if(unlikely(!replication_execute_request(&rq, true))) { + worker_is_idle(); return REQUEST_CHART_NOT_FOUND; + } + worker_is_idle(); return REQUEST_OK; } @@ -1238,6 +1272,7 @@ static void *replication_worker_thread(void *ptr) { while(!netdata_exit) { if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); sleep_usec(1 * USEC_PER_SEC); } @@ -1305,6 +1340,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) { last_now_mono_ut = now_mono_ut; + worker_is_busy(WORKER_JOB_STATISTICS); replication_recursive_lock(); size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); @@ -1321,19 +1357,21 @@ void *replication_thread_main(void *ptr __maybe_unused) { replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; } - if(!replication_globals.unsafe.pending && --run_verification_countdown == 0) { - // reset the statistics about completion percentage - replication_globals.unsafe.first_time_t = 0; - replication_set_latest_first_time(0); + if(--run_verification_countdown == 0) { + if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) { + // reset the statistics about completion percentage + replication_globals.unsafe.first_time_t = 0; + replication_set_latest_first_time(0); - verify_all_hosts_charts_are_streaming_now(); + verify_all_hosts_charts_are_streaming_now(); - run_verification_countdown = LONG_MAX; - slow = true; + run_verification_countdown = LONG_MAX; + slow = true; + } + else + run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION; } - worker_is_busy(WORKER_JOB_STATISTICS); - time_t latest_first_time_t = replication_get_latest_first_time(); if(latest_first_time_t && replication_globals.unsafe.pending) { // completion percentage statistics @@ -1349,15 +1387,17 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED)); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_not_connected); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_no_room); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room); worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets); - worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.main_thread.waits); + worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full); replication_recursive_unlock(); + worker_is_idle(); } if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + + worker_is_busy(WORKER_JOB_WAIT); replication_recursive_lock(); // the timeout also defines now frequently we will traverse all the pending requests @@ -1388,7 +1428,6 @@ void *replication_thread_main(void *ptr __maybe_unused) { last_sender_resets = replication_globals.unsafe.sender_resets; } - replication_globals.main_thread.waits++; replication_recursive_unlock(); worker_is_idle(); |